概述做后端的应该都知道线程池,即使你没亲自使用过,那也一定听过或者了解过。有时候也会去深入理解,结果往往是当时觉得自己理解了,过一段时间就忘了。因为在日常的开发中,我们都不需要用到线程池,很多都是使用
概述
做后端的应该都知道线程池,即使你没亲自使用过,那也一定听过或者了解过。有时候也会去深入理解,结果往往是当时觉得自己理解了,过一段时间就忘了。因为在日常的开发中,我们都不需要用到线程池,很多都是使用的工具和框架写好,我们直接调接口就完事了。
很多东西没有亲自实践和深入的思考过的,单单看文章和书籍是不可能真正的理解的。以前我看了好多次线程池相关的文章,然后过个半年忘得差不多,又重新看,结果还是没有真正的理解。所以,我就打算动手实践一番,既然平时开发的时候用不到,那我就自己做一个项目来用上。
说做就做,我就选了一个Redis分布式锁作为练手项目,里面有一个定时续期的功能,就是使用线程池定时的运行提交的任务,将key续期,详细的不说了,想了解可以看这个 Redis分布式锁的实现-Redisson。
在实现Redis续期功能的时候,一边看别人定时任务怎么实现的,一边看线程池的源码。这时候我仿佛打开了新世界的大门,彻底理解了线程池运行逻辑,也了解了一些线程池设计的艺术。
接下来我想以一个设计者的角度,带领大家从零去设计和实现一个线程池,一步一个脚印,彻底的理解线程池的实现以及一些设计的艺术。
线程池出现的目的和意义
我们要明白,任何技术都是为了解决问题而出现的。那么线程池的出现解决了什么问题呢,答案是:解决了线程资源复用的问题
。
如果没有线程池,我们处理一个任务就要新开一个线程去执行,当任务完成时,该线程就停止了。如果说这个任务是重复的,总不能来一个就新建一个线程吧,多浪费,而且线程是稀缺资源,重复的创建销毁很耗时间。
有了线程池,我们就可以建立几个固定线程。有任务来了唤醒闲置的线程去处理,任务处理完成后继续处理后续的任务,如果暂时没有任务,可以将线程休眠,有新任务时再唤醒。这样一来就可以更高效的利用线程资源,提高系统并发效率。
任务:抽象的工作单元
在程序中,都是围绕着任务执行
来构造的,任务通常是一些抽象的工作单元。比如可以把一个 http请求 当做是一个任务,把一次与数据库的交互当做任务等等。在线程池中,我们把要处理的东西抽象成一个任务单元
, 这样可以简化线程池的结构,以此更好的构建线程池的模型。
线程:抽象的工作者
在线程池中,我们可以把每一个线程当做是一个worker
,即"工人"的意思。它会不断的尝试获得任务来执行,如果没有任务,则休眠或者做其他处理。
线程池的功能设计
那么,线程池通常要具备和提供什么功能呢,这里把核心的功能需求给罗列一下:
线程池的开启和关闭
线程池作为一个工具,需要有自己的生命周期,可以抽象成三个:
- 开启状态
- 运行状态
- 结束状态
其中结束状态下线程池的处理和考虑的东西要多一些,执行完线程池的关闭接口后:
- 正在运行的任务怎么处理?
- 在任务队列的任务要怎么处理?
- 此时线程池是否还能继续添加任务?
这些东西都是要考虑的并且去处理的。在Java的ExecutorService
提供了两个关闭接口
shutdown
: 有序的关闭,已提交的任务会被逐一处理,但不会接受任何新任务shutdownNow
: 尝试停止所有正在执行的任务,放弃在队列中等待的任务,并返回正在等待执行的任务列表
线程的构建和管理
线程池里线程该怎么构建,构建完后怎么管理,是固定的几个还是动态的构建。这里给出几个模式:
固定的线程数量
:在线程池启动时就构建固定数量的线程池,且不会关闭任何线程动态构建线程
:启动时不新建任何线程,当有任务来临时才会去创建线程。如果任务比较少,则不会继续新建线程,如果任务比较多,则继续构建线程数,直到数量达到最大值。有闲置期限的线程
:线程在构建时会有一个闲置的期限,当闲置的时间超过期限时,该线程就会进行回收处理。这个在数据库连接池比较常用到单个线程
:只有一个线程,任务按提交的时间顺序执行。
任务管理
在线程池中,会建立一个任务队列,当没有空闲线程时,新来的任务会放到队列中,等待线程执行。
线程池要提供任务执行的接口。
另外,很多任务都会将处理结果作为返回值的,这时任务要有一个完成后的处理机制,在任务完成时做某些操作。(这里就要涉及到FutureTask
相关概念了)
任务相关的功能如下:
- 任务的提交
- 任务处理结果
- 任务的取消和中断
线程池模型的构建
梳理了线程池的一些基本功能和要考虑的点,那么线程池的执行过程是怎样,要怎么设计呢。废话不说,直接上图:
当有新任务时查看是否有空闲线程,如果有,直接处理,如果没有则放到任务队列中,等待线程处理。
其实梳理一下线程池,可以发现它的逻辑并不复杂,复杂的是各种情况的处理,比如线程怎么管理,任务取消怎么处理,线程中断如何处理等等,还有各种并发操作的处理。
使用代码实现简易的线程池
接下来实现一个固定数量的线程池,实现的功能如下:
线程池要提供的接口
- 任务的提交
线程池内部要实现的功能
- 任务队列的实现
- 线程管理
咱们暂时将线程池的核心功能简单的实现,了解线程池的执行逻辑,其他的之后慢慢添加。
创建任务单元
首先将任务单元给实现了,直接实现Runnable
接口即可。
当然,可以不实现 Runnable
接口,随便写一个类,给一个执行接口,但是呢这样线程池就不够通用了,还是直接实现Runnable
接口,往后任意实现该接口的任务都可以交给线程池执行。
static class Task implements Runnable{ private int tag; public Task(int tag){ this.tag = tag; } @Override public void run() { System.out.printf("任务 %d 开始执行 /n",tag); System.out.printf("任务 %d 执行中 /n",tag); System.out.printf("任务 %d 执行结束/n",tag); }}
线程池的实现
详细的说明在注释中,看注释就可以了
package steap1;import java.util.concurrent.BlockingQueue;import java.util.concurrent.locks.ReentrantLock;public class ThreadPoolExecutor { //工作线程数组 private Worker[] workers; //任务阻塞队列,是线程安全的,里面每个操作都会加锁处理 private BlockingQueue<Task> queue; // 当前工作线程的数量 private int workerSize = 0; //线程池最大的工作线程数量 private int poolSize; public ThreadPoolExecutor(int poolSize, BlockingQueue<Task> queue) { this.poolSize = poolSize; this.workers = new Worker[poolSize]; this.queue = queue; } //任务添加接口 public void execute(Task task) { //如果线程池的线程数量小于最大值,则添加线程 //否则将任务放入队列中 if (workerSize < poolSize) { addWorker(task); } else { this.queue.add(task); } } //添加worker工作线程,并立即执行 private synchronized void addWorker(Task task) { //这里做个双重判定,判定线程数量是否小于最大值 if (workerSize >= poolSize) { this.queue.add(task); return; } //构建worker,并启动线程 workers[workerSize] = new Worker(task); workers[workerSize].t.start(); workerSize++; } //实际运行的代码 void runWorker(Worker worker){ Task task =(Task) worker.task; try { while (true){ //线程在这个循环中不断的获取任务来执行 // queue.task() 方法是一个线程安全的阻塞方法 //如果队列没有任务,那么所有工作线程都会在这里阻塞,等待获取可用的任务 if(task == null){ task = this.queue.take(); } task.run(); task = null; } }catch (InterruptedException e){ e.printStackTrace(); } } //工作线程包装类 private class Worker implements Runnable { private Runnable task; final Thread t; public Worker(Runnable task) { this.task = task; this.t = new Thread(this); } @Override public void run() { runWorker(this); } } //任务类 static class Task implements Runnable { private int tag; public Task(int tag) { this.tag = tag; } @Override public void run() { System.out.printf("任务 %d 开始执行 /n", tag); System.out.printf("任务 %d 执行中 /n", tag); System.out.printf("任务 %d 执行结束/n", tag); } }}
简单的使用
public static void main(String[] args){ ThreadPoolExecutor executor = new ThreadPoolExecutor(8,new LinkedBlockingQueue<>()); for(int i=0;i<1000;i++){ executor.execute(new ThreadPoolExecutor.Task(i)); } }
执行结果
任务 923 开始执行 任务 923 执行中 任务 923 执行结束任务 912 开始执行 任务 912 执行中 任务 912 执行结束
总结
至此,一个简单的线程池就编写完毕,线程池主要的功能都实现了,整个执行过程也进行了详细的描述。
其实这里还有很多东西没写上,线程的生命周期管理,任务的取消和线程的中断等等,这些东西在下一篇章完善吧。
结尾附上项目的源代码,本章的内容在step1
中。

- 0