在线客服
扫描二维码
下载博学谷APP扫描二维码
关注博学谷微信公众号
线程池作为存放线程的池子,能存放很多可以复用的线程。线程池的优点主要是可以降低系统资源消耗,提高响应速度以及提高线程的可管理性。本文将附上源码为大家详解线程池的实现原理。内容主要包括提交任务、创建线程、工作线程的实现原理和线程复用机制。
一、提交任务
线程池框架提供了两种方式提交任务,submit()和execute(),通过submit()方法提交的任务可以返回任务执行的结果,通过execute()方法提交的任务不能获取任务执行的结果。
1、submit()的实现源码
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
2、execute()的实现源码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取线程池控制状态
int c = ctl.get();
// (1)
//worker数量小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
//创建worker,addWorker方法boolean参数用来判断是否创建核心线程
if (addWorker(command, true))
//成功则返回
return;
//失败则再次获取线程池控制状态
c = ctl.get();
}
//(2)
//线程池处于RUNNING状态,将任务加入workQueue任务缓存队列
if (isRunning(c) && workQueue.offer(command)) {
// 再次检查,获取线程池控制状态,防止在任务入队的过程中线程池关闭了或者线程池中没有线程了
int recheck = ctl.get();
//线程池不处于RUNNING状态,且将任务从workQueue移除成功
if (! isRunning(recheck) && remove(command))
//采取任务拒绝策略
reject(command);
//worker数量等于0
else if (workerCountOf(recheck) == 0)
//创建worker
addWorker(null, false);
}
//(3)
else if (!addWorker(command, false)) //创建worker
reject(command); //如果创建worker失败,采取任务拒绝策略
}
总结:若线程池工作线程数量小于corePoolSize,则创建新线程来执行任务;若工作线程数量大于或等于core PoolSize,则将任务加入BlockingQueue;若无法将任务加入BlockingQueue(BlockingQueue已满),且工作线程数量小于maximumPoolSize,则创建新的线程来执行任务;若工作线程数量达到maximumPoolSize,则创建线程失败,采取任务拒绝策略。
二、创建线程
1、实现代码:
//addWorker有两个参数:Runnable类型的firstTask,用于指定新增的线程执行的第一个任务;boolean类型的core,表示是否创建核心线程
//该方法的返回值代表是否成功新增一个线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// (1)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//线程数超标,不能再创建线程,直接返回
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS操作递增workCount
//如果成功,那么创建线程前的所有条件校验都满足了,准备创建线程执行任务,退出retry循环
//如果失败,说明有其他线程也在尝试往线程池中创建线程(往线程池提交任务可以是并发的),则继续往下执行
if (compareAndIncrementWorkerCount(c))
break retry;
//重新获取线程池控制状态
c = ctl.get();
// 如果线程池的状态发生了变更,如有其他线程关闭了这个线程池,那么需要回到外层的for循环
if (runStateOf(c) != rs)
continue retry;
//如果只是CAS操作失败的话,进入内层的for循环就可以了
}
}
//到这里,创建线程前的所有条件校验都满足了,可以开始创建线程来执行任务
//worker是否已经启动
boolean workerStarted = false;
//是否已将这个worker添加到workers这个HashSet中
boolean workerAdded = false;
Worker w = null;
try {
//创建一个worker,从这里可以看出对线程的包装
w = new Worker(firstTask);
//取出worker中的线程对象,Worker的构造方法会调用ThreadFactory来创建一个新的线程
final Thread t = w.thread;
if (t != null) {
//获取全局锁, 并发的访问线程池workers对象必须加锁,持有锁的期间线程池也不会被关闭
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//重新获取线程池的运行状态
int rs = runStateOf(ctl.get());
//小于SHUTTDOWN即RUNNING
//等于SHUTDOWN并且firstTask为null,不接受新的任务,但是会继续执行等待队列中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//worker里面的thread不能是已启动的
if (t.isAlive())
throw new IllegalThreadStateException();
//将新创建的线程加入到线程池中
workers.add(w);
int s = workers.size();
// 更新largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//线程添加线程池成功,则启动新创建的线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//若线程启动失败,做一些清理工作,例如从workers中移除新添加的worker并递减wokerCount
if (! workerStarted)
addWorkerFailed(w);
}
//返回线程是否启动成功
return workerStarted;
}
2、总结:
addWorker()方法完成了一些任务。比如原子性的增加workerCount;将用户给定的任务封装成为一个worker,并将此worker添加进workers集合中;启动worker对应的线程;若线程启动失败,回滚worker的创建动作,即从workers中移除新添加的worker,并原子性的减少workerCount。
三、工作线程的实现原理
1、Worker类
Worker类继承自AQS类,具有锁的功能;实现了Runable接口,可以将自身作为一个任务在线程中执行。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
2、Worker的主要字段:
//用来封装worker的线程,线程池中真正运行的线程,通过线程工厂创建而来
final Thread thread;
//worker所对应的第一个任务,可能为空
Runnable firstTask;
//记录当前线程完成的任务数
volatile long completedTasks;
3、Worker的构造函数:
Worker(Runnable firstTask) {
//设置AQS的state为-1,在执行runWorker()方法之前阻止线程中断
setState(-1);
//初始化第一个任务
this.firstTask = firstTask;
//利用指定的线程工厂创建一个线程,注意,参数是Worker实例本身this
//也就是当执行start方法启动线程thread时,真正执行的是Worker类的run方法
this.thread = getThreadFactory().newThread(this);
}
四、线程复用机制
worker中的线程start 后,执行的是worker的run()方法,而run()方法最终会调用ThreadPoolExecutor类的runWorker()方法,runWorker()方法实现了线程池中的线程复用机制。下面我们来看一下runWorker()方法的实现。
final void runWorker(Worker w) {
//获取当前线程
Thread wt = Thread.currentThread();
//获取w的firstTask
Runnable task = w.firstTask;
//设置w的firstTask为null
w.firstTask = null;
// 释放锁,设置AQS的state为0,允许中断
w.unlock();
//用于标识线程是否异常终止,finally中processWorkerExit()方法会有不同逻辑
boolean completedAbruptly = true;
try {
//循环调用getTask()获取任务,不断从任务缓存队列获取任务并执行
while (task != null || (task = getTask()) != null) {
//进入循环内部,代表已经获取到可执行的任务,则对worker对象加锁,保证线程在执行任务过程中不会被中断
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) || //若线程池状态大于等于STOP,那么意味着该线程要中断
(Thread.interrupted() && //线程被中断
runStateAtLeast(ctl.get(), STOP))) && //且是因为线程池内部状态变化而被中断
!wt.isInterrupted()) //确保该线程未被中断
//发出中断请求
wt.interrupt();
try {
//开始执行任务前的Hook方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
//到这里正式开始执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行任务后的Hook方法
afterExecute(task, thrown);
}
} finally {
//置空task,准备通过getTask()获取下一个任务
task = null;
//completedTasks递增
w.completedTasks++;
//释放掉worker持有的独占锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
//到这里,线程执行结束,需要执行结束线程的一些清理工作
//线程执行结束可能有两种情况:
//1.getTask()返回null,也就是说,这个worker的使命结束了,线程执行结束
//2.任务执行过程中发生了异常
//第一种情况,getTask()返回null,那么getTask()中会将workerCount递减
//第二种情况,workerCount没有进行处理,这个递减操作会在processWorkerExit()中处理
processWorkerExit(w, completedAbruptly);
}
}
五、关闭线程池
1、shutdown()方法的实现原理
shutdown()方法将线程池运行状态设置为SHUTDOWN,此时线程池不会接受新的任务,但会处理阻塞队列中的任务。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
//获取全局锁
mainLock.lock();
try {
//检查shutdown权限
checkShutdownAccess();
//设置线程池运行状态为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断所有空闲worker
interruptIdleWorkers();
//用onShutdown()钩子方法
onShutdown();
} finally {
//释放锁
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
}
shutdown()方法首先会检查是否具有shutdown的权限,然后设置线程池的运行状态为SHUTDOWN,之后中断所有空闲的worker,再调用onShutdown()钩子方法,最后尝试终止线程池。
2、shutdownNow()的实现原理
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
//获取全局锁
mainLock.lock();
try {
//遍历workers集合
for (Worker w : workers)
//调用Worker类的interruptIfStarted()方法中断线程
w.interruptIfStarted();
} finally {
//释放锁
mainLock.unlock();
}
}
看到这里,相信大家对于线程池的实现原理已经有了一个大致的认识。如果想要学习更加深入的知识,提高自身的Java技能,不妨在博学谷多看看海量的视频学习资源,课程除了视频教学还配有相应的源码和学习资料,大家还在等什么,现在一起来体验看看吧!
— 申请免费试学名额 —
在职想转行提升,担心学不会?根据个人情况规划学习路线,闯关式自适应学习模式保证学习效果
讲师一对一辅导,在线答疑解惑,指导就业!
相关推荐 更多
Java免费视频教程学习来博学谷
Java免费视频教程学习,小编整理了博学谷在线IT培训平台关于Java开发的免费课程,大家可以在线视频学习Java基础的专业知识及Java发展趋势及职业规划等内容,Java免费视频教程主要介绍:Java秒杀系统实战、2小时看清Java未来规划、Java基础语法、JavaEE与人工智能、Springmvc+Mybatis课程。
7339
2019-08-01 16:08:02
如何自学Java编程?零基础小白入门须知
如何自学Java编程?相信这个问题在刚开始自学时就困扰着每一个Java小白。确实对于零基础的初学者来讲,Java并不是一门简单的编程语言,要想从入门到精通Java,必须要有详细的学习规划和方法,最好还要有具备自身Java开发经验老师的指导。不然只是头脑一热就决定学习Java,多半在半途就容易放弃。下面小编来和大家好好谈谈零基础小白入门Java要做到哪些准备,大家如果对Java学习又兴趣不妨看一看。
6294
2019-10-16 18:52:00
线上教育平台好吗?Java在线学习效果如何?
当今人们为了节约时间,越来越多的人选择进行线上教育平台进行学习和提高,而Java行业作为一门热门高薪行业,也吸引了一大批爱好者的线上学习。但是在鱼龙混杂的平台中,Java行业是否适合在线上学习吗?从当今整个大环境来说,线上教育可能才是最好最安全的办法,而且线上教育也不必线下差,没有巨大的差异,时间更加灵活,Java线上学习也能学习到和线下一样的知识。
3960
2020-06-17 17:24:23
南京哪些Java培训班靠谱?怎么选?
java培训机构很多可以选几家机构综合对比一下。想先了解一下可以去观看一下Java培训机构相关视频,了解课程内容适不适合自己,学不学的会,择Java培训前一定要慎重。
3500
2021-04-02 16:35:52
JavaEE就业班课程怎么样?有哪些优势?
Java就业班是博学⾕重磅推出的⾼端课程产品,在保证学习效果的前提下通过个性化的私⼈订制学习路径,制定⾼效的学习计划,提升学习效率,缩短学习周期以学习效果为保障。
2226
2022-09-29 16:42:05