在线客服
扫描二维码
下载博学谷APP扫描二维码
关注博学谷微信公众号
线程池作为存放线程的池子,能存放很多可以复用的线程。线程池的优点主要是可以降低系统资源消耗,提高响应速度以及提高线程的可管理性。本文将附上源码为大家详解线程池的实现原理。内容主要包括提交任务、创建线程、工作线程的实现原理和线程复用机制。
一、提交任务
线程池框架提供了两种方式提交任务,submit()和execute(),通过submit()方法提交的任务可以返回任务执行的结果,通过execute()方法提交的任务不能获取任务执行的结果。
1、submit()的实现源码
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
2、execute()的实现源码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取线程池控制状态
int c = ctl.get();
// (1)
//worker数量小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
//创建worker,addWorker方法boolean参数用来判断是否创建核心线程
if (addWorker(command, true))
//成功则返回
return;
//失败则再次获取线程池控制状态
c = ctl.get();
}
//(2)
//线程池处于RUNNING状态,将任务加入workQueue任务缓存队列
if (isRunning(c) && workQueue.offer(command)) {
// 再次检查,获取线程池控制状态,防止在任务入队的过程中线程池关闭了或者线程池中没有线程了
int recheck = ctl.get();
//线程池不处于RUNNING状态,且将任务从workQueue移除成功
if (! isRunning(recheck) && remove(command))
//采取任务拒绝策略
reject(command);
//worker数量等于0
else if (workerCountOf(recheck) == 0)
//创建worker
addWorker(null, false);
}
//(3)
else if (!addWorker(command, false)) //创建worker
reject(command); //如果创建worker失败,采取任务拒绝策略
}
总结:若线程池工作线程数量小于corePoolSize,则创建新线程来执行任务;若工作线程数量大于或等于core PoolSize,则将任务加入BlockingQueue;若无法将任务加入BlockingQueue(BlockingQueue已满),且工作线程数量小于maximumPoolSize,则创建新的线程来执行任务;若工作线程数量达到maximumPoolSize,则创建线程失败,采取任务拒绝策略。
二、创建线程
1、实现代码:
//addWorker有两个参数:Runnable类型的firstTask,用于指定新增的线程执行的第一个任务;boolean类型的core,表示是否创建核心线程
//该方法的返回值代表是否成功新增一个线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// (1)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//线程数超标,不能再创建线程,直接返回
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS操作递增workCount
//如果成功,那么创建线程前的所有条件校验都满足了,准备创建线程执行任务,退出retry循环
//如果失败,说明有其他线程也在尝试往线程池中创建线程(往线程池提交任务可以是并发的),则继续往下执行
if (compareAndIncrementWorkerCount(c))
break retry;
//重新获取线程池控制状态
c = ctl.get();
// 如果线程池的状态发生了变更,如有其他线程关闭了这个线程池,那么需要回到外层的for循环
if (runStateOf(c) != rs)
continue retry;
//如果只是CAS操作失败的话,进入内层的for循环就可以了
}
}
//到这里,创建线程前的所有条件校验都满足了,可以开始创建线程来执行任务
//worker是否已经启动
boolean workerStarted = false;
//是否已将这个worker添加到workers这个HashSet中
boolean workerAdded = false;
Worker w = null;
try {
//创建一个worker,从这里可以看出对线程的包装
w = new Worker(firstTask);
//取出worker中的线程对象,Worker的构造方法会调用ThreadFactory来创建一个新的线程
final Thread t = w.thread;
if (t != null) {
//获取全局锁, 并发的访问线程池workers对象必须加锁,持有锁的期间线程池也不会被关闭
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//重新获取线程池的运行状态
int rs = runStateOf(ctl.get());
//小于SHUTTDOWN即RUNNING
//等于SHUTDOWN并且firstTask为null,不接受新的任务,但是会继续执行等待队列中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//worker里面的thread不能是已启动的
if (t.isAlive())
throw new IllegalThreadStateException();
//将新创建的线程加入到线程池中
workers.add(w);
int s = workers.size();
// 更新largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//线程添加线程池成功,则启动新创建的线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//若线程启动失败,做一些清理工作,例如从workers中移除新添加的worker并递减wokerCount
if (! workerStarted)
addWorkerFailed(w);
}
//返回线程是否启动成功
return workerStarted;
}
2、总结:
addWorker()方法完成了一些任务。比如原子性的增加workerCount;将用户给定的任务封装成为一个worker,并将此worker添加进workers集合中;启动worker对应的线程;若线程启动失败,回滚worker的创建动作,即从workers中移除新添加的worker,并原子性的减少workerCount。
三、工作线程的实现原理
1、Worker类
Worker类继承自AQS类,具有锁的功能;实现了Runable接口,可以将自身作为一个任务在线程中执行。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
2、Worker的主要字段:
//用来封装worker的线程,线程池中真正运行的线程,通过线程工厂创建而来
final Thread thread;
//worker所对应的第一个任务,可能为空
Runnable firstTask;
//记录当前线程完成的任务数
volatile long completedTasks;
3、Worker的构造函数:
Worker(Runnable firstTask) {
//设置AQS的state为-1,在执行runWorker()方法之前阻止线程中断
setState(-1);
//初始化第一个任务
this.firstTask = firstTask;
//利用指定的线程工厂创建一个线程,注意,参数是Worker实例本身this
//也就是当执行start方法启动线程thread时,真正执行的是Worker类的run方法
this.thread = getThreadFactory().newThread(this);
}
四、线程复用机制
worker中的线程start 后,执行的是worker的run()方法,而run()方法最终会调用ThreadPoolExecutor类的runWorker()方法,runWorker()方法实现了线程池中的线程复用机制。下面我们来看一下runWorker()方法的实现。
final void runWorker(Worker w) {
//获取当前线程
Thread wt = Thread.currentThread();
//获取w的firstTask
Runnable task = w.firstTask;
//设置w的firstTask为null
w.firstTask = null;
// 释放锁,设置AQS的state为0,允许中断
w.unlock();
//用于标识线程是否异常终止,finally中processWorkerExit()方法会有不同逻辑
boolean completedAbruptly = true;
try {
//循环调用getTask()获取任务,不断从任务缓存队列获取任务并执行
while (task != null || (task = getTask()) != null) {
//进入循环内部,代表已经获取到可执行的任务,则对worker对象加锁,保证线程在执行任务过程中不会被中断
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) || //若线程池状态大于等于STOP,那么意味着该线程要中断
(Thread.interrupted() && //线程被中断
runStateAtLeast(ctl.get(), STOP))) && //且是因为线程池内部状态变化而被中断
!wt.isInterrupted()) //确保该线程未被中断
//发出中断请求
wt.interrupt();
try {
//开始执行任务前的Hook方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
//到这里正式开始执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行任务后的Hook方法
afterExecute(task, thrown);
}
} finally {
//置空task,准备通过getTask()获取下一个任务
task = null;
//completedTasks递增
w.completedTasks++;
//释放掉worker持有的独占锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
//到这里,线程执行结束,需要执行结束线程的一些清理工作
//线程执行结束可能有两种情况:
//1.getTask()返回null,也就是说,这个worker的使命结束了,线程执行结束
//2.任务执行过程中发生了异常
//第一种情况,getTask()返回null,那么getTask()中会将workerCount递减
//第二种情况,workerCount没有进行处理,这个递减操作会在processWorkerExit()中处理
processWorkerExit(w, completedAbruptly);
}
}
五、关闭线程池
1、shutdown()方法的实现原理
shutdown()方法将线程池运行状态设置为SHUTDOWN,此时线程池不会接受新的任务,但会处理阻塞队列中的任务。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
//获取全局锁
mainLock.lock();
try {
//检查shutdown权限
checkShutdownAccess();
//设置线程池运行状态为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断所有空闲worker
interruptIdleWorkers();
//用onShutdown()钩子方法
onShutdown();
} finally {
//释放锁
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
}
shutdown()方法首先会检查是否具有shutdown的权限,然后设置线程池的运行状态为SHUTDOWN,之后中断所有空闲的worker,再调用onShutdown()钩子方法,最后尝试终止线程池。
2、shutdownNow()的实现原理
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
//获取全局锁
mainLock.lock();
try {
//遍历workers集合
for (Worker w : workers)
//调用Worker类的interruptIfStarted()方法中断线程
w.interruptIfStarted();
} finally {
//释放锁
mainLock.unlock();
}
}
看到这里,相信大家对于线程池的实现原理已经有了一个大致的认识。如果想要学习更加深入的知识,提高自身的Java技能,不妨在博学谷多看看海量的视频学习资源,课程除了视频教学还配有相应的源码和学习资料,大家还在等什么,现在一起来体验看看吧!
— 申请免费试学名额 —
在职想转行提升,担心学不会?根据个人情况规划学习路线,闯关式自适应学习模式保证学习效果
讲师一对一辅导,在线答疑解惑,指导就业!
相关推荐 更多
初中学历零基础能学好Java吗?
在如今的社会,想要进入好的企业获得一份高薪工作,学历可谓是一块敲开企业大门的敲门砖。现在有些学历不够高或者是不够好的小伙伴可能要说了,我的学历不高但是我想改变自己的命运,难道不能通过学Java获得高薪的工作?本篇文章小编就和读者们探讨一下初中学历零基础能学好Java吗?
7526
2019-07-08 15:44:40
Java在线教育学习效果怎么样?有哪些优势?
随着网络技术的发展,IT培训早已不仅仅局限于线下教学了。然而,还是有不少小伙伴对在线教育方式,仍旧抱有怀疑态度。为了打消大家不必要的担忧,本篇文章以博学谷在线课程为例,带大家看看,Java在线教育学习效果怎么样?有哪些优势?
4350
2019-09-11 17:25:24
Java线上培训班怎么选择?
Java线上培训班授课的优势不受时间、空间、教学环境的限制;教学资源较为优质,避免了要去外省或外市上课的时间成本。传统模式是线下授课,新兴起的是在线授课,其实两种授课方式各有所长,适应不同的人,两种学习方式相辅相成。
5372
2019-11-12 17:03:06
Java架构师视频教程学习大纲
对于通往Java架构师之路的朋友来讲,分布式和微服务都是必备的知识和技能。在学习分布式和微服务的时候,为了避免大家陷入“只见树木,不见森林”的学习误区,本文将给大家提供博学谷相关教程的学习大纲,大家可以作为学习的一个参考。当然《Java架构师之分布式和微服务》教程本身的视频内容也是十分优质的,大家可以深入了解一下。
5107
2019-12-13 17:44:47
Java线上网课线下培训怎么选?
Java线上网课线下培训怎么选?Java作为IT行业目前最热门的编程语言,不少想入行IT的朋友都想要报班学习。除了传统去线下机构学习的方式,在网上参与直播授课的朋友也不少。毕竟现在在线教育行业发展迅猛,许多行业培训也纷纷开设线上课程。上课形式丰富通常包括直播授课、社群教学也有老师在线答疑
4268
2020-06-17 14:58:10