Java并发之线程池详解

news/2024/4/17 7:21:22

(/≧▽≦)/~┴┴ 嗨~我叫小奥 ✨✨✨
👀👀👀 个人博客:小奥的博客
👍👍👍:个人CSDN
⭐️⭐️⭐️:传送门
🍹 本人24应届生一枚,技术和水平有限,如果文章中有不正确的内容,欢迎多多指正!
📜 欢迎点赞收藏关注哟! ❤️

文章目录

  • Java并发之线程池详解
    • 一、线程池概述
      • 1.1 什么是线程池
      • 1.2 线程池的执行过程
    • 二、 自定义线程池
      • 2.1 阻塞队列
      • 2.2 自定义线程池
      • 2.3 拒绝策略实现
      • 2.4 测试
    • 三、ThreadPoolExecutor
      • 3.1 线程状态
      • (2) 构造方法
        • ① 核心参数解释
        • ② 拒绝策略源码
      • (3) 使用
        • ① 创建线程池
        • ② 提交任务
        • ③ 关闭线程池
      • (4) 源码分析
        • ① 成员变量
        • ② 内部类Worker
        • ③ submit()
        • ④ execute()
        • ⑤ 添加线程方法
        • ⑥ 运行方法
          • runWorker()
        • ⑦ 停止方法
          • shutdown()
          • shutdownNow()
      • (5) 合理配置线程池
      • (6) 线程池的监控

Java并发之线程池详解

一、线程池概述

1.1 什么是线程池

程序运行的本质就是占用系统的资源,优化资源的使用从而衍生出来的技术就是池化技术。

线程池其实就是一种多线程的处理方式,处理过程中可以将任务添加到队列中,然后创建线程处理这些任务。

线程池的优点:

  • 降低资源消耗。线程池通过复用利用已经创建的线程来降低线程的创建和销毁造成的消耗。
  • 提高响应速度。当有新的任务到达时,如果线程池中有空闲状态的线程,任务可以不需要等待线程的创建就可以立即执行。
  • 提高线程的可管理性。线程是一种稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以对线程进行统一的分配和管理。

1.2 线程池的执行过程

线程池的处理流程如下:

  • 线程池判断核心线程池里的线程是否都在执行任务。
    • 如果不是,则创建一个新的工作线程来执行任务。
    • 如果核心线程池里的线程都在执行任务,则进入下一个流程。
  • 线程池判断工作队列是否已满。
    • 如果工作队列没有满,则将新提交的任务存储在这个工作队列里面。
    • 如果工作队列满了,则进入下个流程。
  • 线程池判断线程池的线程是否都处于工作状态。
    • 如果没有,则创建一个新的工作线程来执行任务。
    • 如果已经满了,则交饱和策略(拒绝策略)来处理这个任务。

在这里插入图片描述

二、 自定义线程池

2.1 阻塞队列

// 阻塞队列
public class BlockingQueue<T> {private Deque<T> queue = new ArrayDeque<>(); // 任务队列private ReentrantLock lock = new ReentrantLock(); // 锁private Condition fullWaitSet = lock.newCondition(); // 生产者条件变量private Condition emptyWaitSet = lock.newCondition(); // 消费者条件变量private int cap; // 容量public BlockingQueue(int cap) {this.cap = cap;}// 阻塞获取public T take() {lock.lock();try {while(queue.isEmpty()) {try {emptyWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}// 带超时时间的阻塞获取public T poll(long timeout, TimeUnit timeUnit) {lock.lock();try {long l = timeUnit.toMillis(timeout); // 将timeout统一转化为纳秒while(queue.isEmpty()) {try {// 返回的是剩余的时间if (l <= 0) return null;l = emptyWaitSet.awaitNanos(l);} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}// 阻塞添加public void put(T ele) {lock.lock();try {while(queue.size() == cap) {try {fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}queue.addLast(ele);emptyWaitSet.signal();} finally {lock.unlock();}}// 带超时时间的阻塞添加public boolean offer(T ele, long time, TimeUnit unit) {lock.lock();try {long l = unit.toNanos(time);while(queue.size() == cap) {try {if (l <= 0) {return false;}l = fullWaitSet.awaitNanos(l);} catch (InterruptedException e) {e.printStackTrace();}}queue.addLast(ele);emptyWaitSet.signal();return true;} finally {lock.unlock();}}// 获取大小public int size() {lock.lock();try {return queue.size();} finally {lock.unlock();}}public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {// 判断队列是否已满if (queue.size() == cap) {// 队列已满rejectPolicy.reject(this, task); // 拒绝策略决定} else {// 队列还有空闲queue.addLast(task);emptyWaitSet.signal();}} finally {lock.unlock();}}
}

2.2 自定义线程池

public class ThreadPool {private BlockingQueue<Runnable> taskQueue; // 任务队列private HashSet<Worker> workers = new HashSet(); // 线程集合private int coreSize; // 核心线程数private long timeout;  // 获取任务的超时时间private TimeUnit unit; // 时间单位private RejectPolicy<Runnable> rejectPolicy; // 拒绝策略public ThreadPool(int coreSize, long timeout, TimeUnit unit, int queueCap, RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeout = timeout;this.unit = unit;this.taskQueue = new BlockingQueue<>(queueCap);this.rejectPolicy = rejectPolicy;}public void execute(Runnable task) {// 当任务数量没有超过核心线程数的时候,直接交给Worker对象处理// 如果任务超过核心线程数,需要加入任务队列暂存synchronized (workers) {if (workers.size() < coreSize) {Worker worker = new Worker(task);System.out.println("新增worker对象:" + worker + ", 任务对象: " + task);workers.add(worker);worker.start();} else {System.out.println("加入任务队列:" + task);taskQueue.put(task);// 调用拒绝策略taskQueue.tryPut(rejectPolicy, task);}}}class Worker extends Thread {private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {// 1、当task不为空,执行任务// 2、当task执行完毕,接着从任务队列获取任务并执行while (task != null || (task = taskQueue.poll(timeout, unit)) != null) {try {System.out.println("正在执行:" + task);task.run();} catch (Exception e) {e.printStackTrace();} finally {task = null;}}synchronized (workers) {System.out.println("worker被移除:" + this);workers.remove(this);}}}
}

2.3 拒绝策略实现

使用策略模式:

// 拒绝策略
@FunctionalInterface
public interface RejectPolicy<T> {void reject(BlockingQueue<T> queue, T task);
}

2.4 测试

public class Main {public static void main(String[] args) {// (1) 阻塞等待 BlockingQueue::put// (2) 带超时等待 (queue, task) -> { queue.offer(task, 500, TimeUnit.MILLISECONDS); }// (3) 让调用者自己放弃任务执行 (queue, task) -> { System.out.println("放弃task任务"); }// (4) 让调用者自己抛出异常 (queue, task) -> { throw new RuntimeException("任务执行失败" + task); }// (5) 让调用者自己执行任务 (queue, task) -> { task.run(); }ThreadPool threadPool = new ThreadPool(1, 1000,TimeUnit.MILLISECONDS, 1, (queue, task) -> {task.run();});for (int i = 0; i < 3; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " - " + j);});}}
}

三、ThreadPoolExecutor

3.1 线程状态

ThreadPoolExecutor使用int的高3位来表示线程池状态,低29位表示线程数量。

	// runState存储在高阶位中private static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;
状态名高3位接收新任务处理阻塞队列任务说明
RUNNING111YY
SHUTDOWN000NY不会接收新任务,但会处理阻塞队列中剩余任务
STOP001NN会中断正在执行的任务,并抛弃阻塞队列任务
TIDYING010--任务全执行完毕,活动线程为0即将进入终结
TERMINATED011--终结状态

注意,从数字上比较,TERMINATED>TIDYING>STOP>SHUTDOWN>RUNNING

为什么不使用两个整数来表示线程池状态和线程数量呢?

**因为目的是将线程池状态和线程个数合二为一,就可以使用一次CAS原子操作进行赋值了。**这些信息存储在原子变量ctl中。

    // 部分代码示例 // c为旧值,cltOf返回结果为新值ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))  // rs为高3位代表线程池状态,wc为低29位代表线程个数,ctl是合并它们private static int ctlOf(int rs, int wc) { return rs | wc; }

(2) 构造方法

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) 

七大参数:

  • corePoolSize:核心线程数。
  • maximumPoolSize:最大线程数。
  • keepAliveTime:当线程数大于核心时,这是多余的空闲线程将在终止前等待新任务的最大时间。
  • unit:时间单位。
  • workQueue:在执行任务之前用来保存任务的队列(阻塞队列)。
  • threadFactory:线程工厂。
  • RejectedExecutionHandler:拒绝策略。
① 核心参数解释

corePoolSize线程池核心线程数大小

线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,他们也不会被销毁,除非设置了allowCoreThreadTimeOut。这里的最小线程数量即是corePoolSize。任务提交到线程池后,首先会检查当前线程数是否达到了corePoolSize,如果没有达到的话,则会创建一个新线程来处理这个任务。

maximumPoolSize 线程池最大线程数量

当前线程数达到corePoolSize后,如果继续有任务被提交到线程池,会将任务缓存到工作队列中。如果队列也已满,则会去创建一个新线程来出来处理。线程池不会无限制的去创建新线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize指定。

keepAliveTime 空闲线程存活时间

一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由keepAliveTime来设定。

unit 空闲线程存活时间单位

unit 空闲线程存活时间单位。

workQueue 工作队列

新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。

几种阻塞队列:

  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先 进先出)原则对元素进行排序。
  • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按 FIFO 排序元 素,吞吐量通常要高于 ArrayBlockingQueue。静态工厂方法 Executors.newFixedThreadPool()使用了这个队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个 线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 Linked-BlockingQueue,静态工厂方法 Executors.newCachedThreadPool 使用了这 个队列。
  • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

threadFactory 线程工厂

创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等。

handler 拒绝策略

当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,这时就会采用拒绝策略的方式来处理这些任务。

② 拒绝策略源码

四种拒绝策略:

  • AbortPolicy:中止策略。默认的拒绝策略,抛出未检查的异常RejectedExecutionException。调用者可以通过捕获这个异常,来实现自己的处理逻辑。
  • DiscardPolicy:丢弃策略。当新提交的任务无法保存到队列中等待执行时,该策略会丢弃该任务。
  • DiscardOldestPolicy:丢弃最旧策略。当新提交的任务无法保存到队列中执行时,则会丢弃最先进入队列的任务,然后尝试提交新的任务。
  • CallerRunsPolicy:调用者运行策略。该策略实现了一种调节机制,既不会抛弃任务,也不会抛出异常,而是将任务回退到调用者(调用线程池执行任务的主线程)。
	public static class AbortPolicy implements RejectedExecutionHandler {// 创建一个中止策略,丢弃任务并抛出RejectedExecutionException异常public AbortPolicy() { }// 始终引发RejectedExecutionException异常public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}
	public static class DiscardPolicy implements RejectedExecutionHandler {// 创建一个丢弃策略public DiscardPolicy() { }// 什么也不做,这会导致丢弃任务public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}	
	public static class DiscardOldestPolicy implements RejectedExecutionHandler {// 创建一个丢弃最旧策略public DiscardOldestPolicy() { }// 丢弃队列最早的未处理任务,然后重新尝试执行任务。public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}
	public static class CallerRunsPolicy implements RejectedExecutionHandler {// 创建调用者运行策略public CallerRunsPolicy() { }// 在调用方线程中执行任务。除非执行器已经关闭,在这种情况下,任务将直接被丢弃。public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}

(3) 使用

① 创建线程池

通过ThreadPoolExecutor构造方法创建一个线程池:

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);

参数在上面已经介绍过了。

② 提交任务

可以使用两个方法向线程池提交任务,分别为execute()submit()方法。

execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。

		executor.execute(new Runnable() {@Overridepublic void run() {}});

submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过 future 的 get()方法来获取 返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

	Future<Object> future = executor.submit(task);try {Object o = future.get();} catch (InterruptedException e) {// 处理中断异常 } catch (ExecutionException e) {// 处理无法执行任务异常 } finally {// 关闭线程池 executor.shutdown();}
③ 关闭线程池

可以通过调用线程池的 shutdownshutdownNow 方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法 响应中断的任务可能永远无法终止

shutdownNow 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。

只要调用了这两个关闭方法中的任意一个,isShutdown 方法就会返回 true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用 shutdown 方法来关闭线程池,如果任务不一定要执行完,则可以调用 shutdownNow 方 法。

(4) 源码分析

① 成员变量
    // 线程池中存放worker的容器private final HashSet<Worker> workers = new HashSet<Worker>(); // 全局锁,增加或者减少worker、修改线程池运行状态时需要持有mainLockprivate final ReentrantLock mainLock = new ReentrantLock();// 可重入锁的条件变量private final Condition termination = mainLock.newCondition();// 线程池参数 private volatile int corePoolSize;private volatile int maximumPoolSize;private volatile long keepAliveTime;private final BlockingQueue<Runnable> workQueue;private volatile ThreadFactory threadFactory;private volatile RejectedExecutionHandler handler;// 线程池相关属性 private int largestPoolSize; // 记录线程生命周期内线程数的最大值 private long completedTaskCount; // 记录线程池完成任务总数// 控制核心线程数量内的线程是否可以被回收 private volatile boolean allowCoreThreadTimeOut; // false(默认)不可以被回收
② 内部类Worker
 private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{private static final long serialVersionUID = 6138294804551838833L;final Thread thread; // 内部的工作线程 Runnable firstTask; // 第一个执行的任务volatile long completedTasks; // 记录当前worker完成的任务数量 Worker(Runnable firstTask) {setState(-1); // AQS独占模式为初始化状态,不能被抢占 // firstTask不为空时,当worker启动后,内部线程会优先执行firstTask,执行完后会到queue中去获取下个任务this.firstTask = firstTask;// 使用线程工厂创建一个线程,并且【将当前worker指定为Runnable】,所以thread启动时会调用 worker.run()this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker  */public void run() {runWorker(this);}// Lock methods 不可重入锁 //// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock()        { acquire(1); }public boolean tryLock()  { return tryAcquire(1); }public void unlock()      { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}
	// DefaultThreadFactory#newThread()public Thread newThread(Runnable r) {// 将当前worker指定为thread执行方法,线程调用start会调用r.run()Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());try {if (t.isDaemon() != daemon) {t.setDaemon(daemon);}if (t.getPriority() != priority) {t.setPriority(priority);}} catch (Exception ignored) {// Doesn't matter even if failed to set.}return t;}
③ submit()

AbstractExecutorService#submit():提交任务,把 Runnable 或 Callable 任务封装成 FutureTask 执行,可以通过方法返回的任务对象,调用 get 阻塞获取任务执行的结果或者异常。

	public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException(); // 将Runnable封装成RunnableFuture任务,执行结果是null,也可以通过FutureTask#get 返回数据RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask); // 执行任务 return ftask;}public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask; // 返回RunnableFuture对象,用来获取返回值}
	protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {// Runnable 封装成 FutureTask,【指定返回值】return new FutureTask<T>(runnable, value);}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}
④ execute()

execute():执行任务,但是没有返回值,没办法获取任务执行结果,出现异常会直接抛出任务执行时的异常。根据线程池中的线程数,选择添加任务时的处理方式。

ThreadPoolExecutor执行execute方法分下面四种情况:

  • 如果当前运行的线程少于corePoolSize,则创建新的线程来执行任务(注意,执行这一步骤需要获取全局锁)。
  • 如果运行的线程数大于等于corePoolSize,则将任务加入BlockingQueue。
  • 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
  • 如果创建新线程使当前运行的线程超过maximumPoolSize,任务将会被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

在这里插入图片描述

public void execute(Runnable command) {// command 可以是普通的 Runnable 实现类,也可以是 FutureTask,不能是 Callableif (command == null)throw new NullPointerException();//获取 ctl 最新值赋值给 c,ctl 高 3 位表示线程池状态,低位29位 表示当前线程池线程数量。int c = ctl.get(); //【1】当前线程数小于核心线程数,此次提交任务直接创建一个新的worker,线程池中多了一个新线程if (workerCountOf(c) < corePoolSize) {// addWorker创建并添加线程的过程// 会创建 worker 对象并且将 command 作为 firstTask,优先执行if (addWorker(command, true))return;// 无需添加新的线程,但是存在并发现象或者线程池状态被改变// 重新获取线程池状态 c = ctl.get();}// 【2】线程数已经达到核心线程数或者无需添加新的线程// 判断当前线程池是否处于running状态并尝试将task添加到workQueue中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 线程池状态可能被外部线程修改了,可能是执行了shutdown()方法,该状态不能接收新提交的任务// 如果不是running状态,则需要把刚提交的任务删除,删除成功说明提交之后的线程池中的线程还未处理该任务 if (! isRunning(recheck) && remove(command))reject(command); // 拒绝任务,走拒绝策略 else if (workerCountOf(recheck) == 0)// 说明线程池是running状态,判断线程池中线程数量是否为0// 【担保机制】确保线程池在running任务下,最起码有一个线程在工作addWorker(null, false);}// 【3】阻塞队列已满,创建一个非核心线程执行任务,添加失败则拒绝任务 else if (!addWorker(command, false)) reject(command); // 直接走拒绝策略}
⑤ 添加线程方法

prestartAllCoreThreads():提前预热,创建所有的核心线程。

	// 启动所有的核心线程,让它们空闲地等待工作。public int prestartAllCoreThreads() {int n = 0;while (addWorker(null, true))++n;return n;}

addWorker():添加线程到线程池,返回 true 表示创建 Worker 成功,且线程启动。首先判断线程池是否允许添加线程,允许就让线程数量 + 1,然后去创建 Worker 加入线程池。

注意:SHUTDOWN 状态也能添加线程,但是要求新加的 Woker 没有 firstTask,而且当前 queue 不为空,所以创建一个线程来帮助线程池执行队列中的任务。

	// core表示是否是核心线程 private boolean addWorker(Runnable firstTask, boolean core) {retry:// 自旋判断当前线程池状态是否允许创建线程,允许就增加线程数 for (;;) {int c = ctl.get(); // 获取ctl的值 int rs = runStateOf(c); // 获取线程池的运行状态 // 判断当前线程池状态是否允许添加线程 // 如果线程池的状态小于 SHUTDOWN 或者等于 SHUTDOWN // 但是队列中任务不为空或者新任务不为空,则返回 false。if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c); // 获取线程池中线程数量 if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))// CAPACITY是(1 << 29) - 1// 根据core判断使用哪个大小限制线程数量,超过了返回faslereturn false;if (compareAndIncrementWorkerCount(c))// 记录线程数量加1,如果失败说明其他线程修改了数量 break retry; c = ctl.get();  // 再次获取线程池状态 if (runStateOf(c) != rs)// 如果线程池状态被其他线程改变了,返回外层循环判断是否能添加线程 continue retry;// else CAS failed due to workerCount change; retry inner loop}}// 开始创建线程 boolean workerStarted = false; // 运行标记,表示创建的worker是否已经启动boolean workerAdded = false; // 添加标记,表示创建的worker是否已经添加到线程池中Worker w = null;try {// 创建worker,底层通过线程工厂 newThread 方法创建执行线程,指定了首先执行的任务w = new Worker(firstTask); final Thread t = w.thread;// 这里的判断为了防止 程序员自定义的 ThreadFactory 实现类有 bug,创造不出线程if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock(); // 添加线程需要获取互斥锁try { int rs = runStateOf(ctl.get()); // 获取最新的线程池运行状态 // 判断线程池是否为RUNNING状态if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive())// 当线程start之后,isAlive()会返回true// 这里还没有启动线程,如果被启动了,就会报错throw new IllegalThreadStateException();workers.add(w); // 将新建的worker添加到线程池中int s = workers.size();if (s > largestPoolSize)largestPoolSize = s; // 更新largestPoolSizeworkerAdded = true; // 添加标记更新为true}} finally {mainLock.unlock(); // 解除互斥锁}if (workerAdded) {// 添加成功就启动线程执行任务 t.start();workerStarted = true; // 运行标记更新为true}}} finally {if (! workerStarted)addWorkerFailed(w); // 如果启动线程失败,清理工作 } return workerStarted; // 返回新创建的线程是否启动}

addWorkerFailed():清理任务

 	private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock(); // 操作线程需要获取互斥锁 try {if (w != null)// 将worker从线程池中移除 workers.remove(w);decrementWorkerCount(); // 线程池计数减一tryTerminate(); // 尝试停止线程池 } finally {mainLock.unlock(); // 解除互斥锁}}
⑥ 运行方法
runWorker()

Worker#run:Worker 实现了 Runnable 接口,当线程启动时,会调用 Worker 的 run() 方法。

		public void run() {runWorker(this); //ThreadPoolExecutor#runWorker() }

runWorker():线程启动就要执行任务,会一直 while 循环获取任务并执行。

	final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask; // 获取worker的第一个任务 w.firstTask = null; // 引用置空,防止复用该线程时重复执行任务 // 初始化worker时设置state=-1,表示不允许抢占锁 // 这里设置state=0和exclusiveOwnerThread=null,开始独占模式抢锁 w.unlock(); // 允许中断 boolean completedAbruptly = true; // 是否发生中断 try {// 如果task不为空就直接运行,否则去队列中获取任务// getTask()如果是阻塞获取任务,就会一直阻塞在take方法,直到获取任务 while (task != null || (task = getTask()) != null) {w.lock(); // worker加锁if ((runStateAtLeast(ctl.get(), STOP) ||// 说明线程处于RUNNING或者SHUTDOWN状态,清除打断标记 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt(); // 中断线程,设置线程中断标志为truetry {beforeExecute(wt, task); // 任务执行的前置处理 Throwable thrown = null;try {task.run(); // 执行任务 } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown); // 任务执行的后置处理 }} finally {task = null; // 将局部变量task置为null,代表任务执行完成w.completedTasks++; // 更新worker完成任务数量w.unlock(); // 解锁 }}// 表示queue为空并且线程空闲超过存活时间 completedAbruptly = false;} finally {// 正常退出 completedAbruptly = false// 异常退出 completedAbruptly = true,【从 task.run() 内部抛出异常】时,跳到这一行processWorkerExit(w, completedAbruptly);}}

getTask():获取任务,线程空闲时间超过 keepAliveTime 就会被回收,判断的依据是当前线程阻塞获取任务超过保活时间,方法返回 null 就代表当前线程要被回收了,返回到 runWorker 执行线程退出逻辑。线程池具有担保机制,对于 RUNNING 状态下的超时回收,要保证线程池中最少有一个线程运行,或者任务阻塞队列已经是空。

	private Runnable getTask() {boolean timedOut = false; // 超时标记,表示当前线程获取任务是否超时for (;;) {int c = ctl.get(); int rs = runStateOf(c); // 获取当前线程池运行状态 // 如果线程池状态为STOP或者线程池状态为SHUTDOWN并且队列已经为空if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount(); // CAS自旋方式让ctl值-1return null;}int wc = workerCountOf(c); // 获取线程池中的线程数量,没有区分是否是核心线程// timed=fasle表示当前线程获取task不支持超时机制,当前线程会使用queue.take() 阻塞获取// timed=true表示当前线程获取task支持超时机制,使用queue.poll()超时获取boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))// 如果线程数超过最大线程数,直接回收// 如果当前线程允许超时回收并且已经超时了,就应该被回收了// 由于【担保机制】还要做判断:// wc > 1说明线程池还有其他线程,当前线程可以回收// workQueue.isEmpty()前置条件为wc=1,如果当前任务队列为空,最后一个线程也可以回收&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))// CAS机制将ctl的值-1,成功返回nullreturn null;continue;}try {// 根据当前线程是否需要超时回收,选择从队列获取任务的方法 Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)// 获取到任务就返回return r;timedOut = true; // 获取任务为null说明超时,设置超时标记为true} catch (InterruptedException retry) {timedOut = false; // 阻塞线程被打断后重置超时标记为false,需要继续获取}}}

processWorkerExit():线程退出线程池,也有担保机制,保证队列中的任务被执行

	// 正常退出completedAbruptly为falseprivate void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // 表示当前worker是发生异常退出的,即task任务执行过程中向上抛出异常 decrementWorkerCount(); // 抛出异常后需要ctl-1final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 将当前 worker 完成的 task 数量,汇总到线程池的 completedTaskCountcompletedTaskCount += w.completedTasks;workers.remove(w); // 从线程池中移除worker} finally {mainLock.unlock();}tryTerminate(); // 尝试停止线程池,唤醒下一个线程int c = ctl.get(); if (runStateLessThan(c, STOP)) {// 如果线程池不是停止状态就应该有线程运行,即担保机制if (!completedAbruptly) {// 正常退出的逻辑,对空闲线程回收int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 最小值为0,但是如果queue不为空,需要一个线程来完成任务担保机制if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // 线程池中线程数量大于最小值可以直接返回 }// 执行 task 时发生异常,有个线程因为异常终止了,需要添加一个线程 // 或者线程池中的数量小于最小值,这里要创建一个新 worker 加进线程池addWorker(null, false);}}
⑦ 停止方法
shutdown()

shutdown():停止线程池

	public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 设置线程池状态为SHUTDOWN,如果线程池状态大于SHUTDOWN,就直接返回advanceRunState(SHUTDOWN); interruptIdleWorkers(); // 中断空闲线程 onShutdown(); // hook for ScheduledThreadPoolExecutor 空方法,可扩展 } finally {mainLock.unlock();}tryTerminate();}

interruptIdleWorkers():shutdown 方法会中断所有空闲线程,根据是否可以获取 AQS 独占锁判断是否处于工作状态。线程之所以空闲是因为阻塞队列没有任务,不会中断正在运行的线程,所以 shutdown 方法会让所有的任务执行完毕。

	private void interruptIdleWorkers() {interruptIdleWorkers(false); // false代表中断所有线程}private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {// !t.isInterrupted() 说明当前线程尚未中断 // w.tryLock()说明当前worker处于空闲状态,阻塞在take或者poll,因为worker执行task时需要加锁 try {// 中断线程,处于queue阻塞的线程会被唤醒,// 进入下一次自旋,返回null,执行退出逻辑t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne) break; // false代表中断所有线程 }} finally {mainLock.unlock();}}
shutdownNow()

shutdownNow():直接关闭线程池,不会等待任务执行完成。

	public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP); // 设置线程池状态为STOPinterruptWorkers(); // 中断所有线程 tasks = drainQueue(); // 从阻塞队列中获取未处理的task} finally {mainLock.unlock();}tryTerminate(); // 尝试停止线程池 return tasks;  // 返回当前任务队列中未处理的任务 }

tryTerminate():设置为 TERMINATED 状态 if either (SHUTDOWN and pool and queue empty) or (STOP and pool empty)。

	final void tryTerminate() {for (;;) {int c = ctl.get(); // 线程池正常运行,或者有其他线程执行了状态转换的方法,直接返回 if (isRunning(c) || runStateAtLeast(c, TIDYING) ||// 线程池是SHUTDOWN并且任务队列不为空,需要去处理队列中的任务,直接返回(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// 执行到这里说明 线程池状态为STOP 或者 线程池砖头盖为SHUTDOWN并且队列为空// 判断线程池中线程数量 if (workerCountOf(c) != 0) { // 中断一个线程 // 在 queue.take() | queue.poll() 阻塞空闲// 唤醒后的线程会在getTask()方法返回null// 执行 processWorkerExit 退出逻辑时会再次调用 tryTerminate() 唤醒下一个空闲线程interruptIdleWorkers(ONLY_ONE);return;}// 如果线程池中线程数量为0final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 尝试设置线程池状态为TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated(); // 结束线程池} finally {ctl.set(ctlOf(TERMINATED, 0)); // 设置线程池状态为TERMINATED状态。termination.signalAll(); // 唤醒所有调用awaitTermination()方法的线程}return;}} finally {mainLock.unlock();}// else retry on failed CAS}}

(5) 合理配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

  • 任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务。
  • 任务的优先级:高、中和低。
  • 任务的执行时间:长、中和短。
  • 任务的依赖性:是否依赖其他系统资源,如数据库连接。

性质不同的任务可以用不同规模的线程池分开处理。CPU 密集型任务应配置尽可能 小的线程,如配置 Ncpu+1 个线程的线程池。由于 IO 密集型任务线程并不是一直在执行 任务,则应配置尽可能多的线程,如 2*Ncpu。混合型的任务,如果可以拆分,将其拆分 成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差 太大,则没必要进行分解。可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的 CPU 个数。

优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优 先级高的任务先执行。

执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。

依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,等待的 时间越长,则 CPU 空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用 CPU。

(6) 线程池的监控

如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时, 可以根据线程池的使用状况快速定位问题。

可以通过线程池提供的参数进行监控,在监控线程池的时候可以使用以下属性。

  • taskCount:线程池需要执行的任务数量
  • completedTaskCount:线程池在运行过程中已经完成的任务数量,小于或者等于taskCount。
  • largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
  • getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减。
  • getActiveCount:获取活动的线程数。

通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的 beforeExecute、afterExecute 和 terminated 方法,也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最 小执行时间等。


https://www.xjx100.cn/news/3293534.html

相关文章

网工内推 | Base北京,上市公司高级网工,最高25K,带薪年假

01 北京中天瑞合科技有限公司 招聘岗位&#xff1a;高级网络工程师 职责描述&#xff1a; 1、负责高校、医院、企业等数据中心、骨干网络、园区网等有线、无线网络架构的规划、设计、调整、性能优化&#xff0c;制定大、中型网络相关的安全策略和规范&#xff1b; 2、负责网络…

【机器人最短路径规划问题(栅格地图)】基于蚁群算法求解

基于蚁群算法求解机器人最短路径规划问题的仿真结果 仿真结果 收敛曲线变化趋势 蚁群算法求解最优解的机器人运动路径 各代蚂蚁求解机器人最短路径的运动轨迹

web组态软件

1、强大的画面显示web组态功能 2、良好的开放性。 开放性是指组态软件能与多种通信协议互联&#xff0c;支持多种硬件设备&#xff0c;向上能与管理层通信&#xff0c;实现上位机和下位机的双向通信。 3、丰富的功能模块。 web组态提供丰富的控制功能库&#xff0c;满足用户的测…

解决RabbitMQ管理页面异常/不正确的问题

正确的页面&#xff1a;有Channels、Exchanges等 异常/不正确的页面&#xff1a; 问题原因 我的RabbitMQ是用docker安装的&#xff0c;应该不会是安装的环境有问题。 而且MQ的服务确实是启动了&#xff0c;后端能正常使用&#xff0c;并且管理界面的登录页面也是能正常登录的&…

如何在windows系统部署Lychee网站,并结合内网穿透打造个人云图床

文章目录 1.前言2. Lychee网站搭建2.1. Lychee下载和安装2.2 Lychee网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4.公网访问测试5.结语 1.前言 图床作为图片集中存放的服务网站&#xff0c;可以看做是云存储的一部分&#xff0c;既可…

2024水科技大会暨技术装备成果展览会——高品质供水和饮用水水源安全保障论坛

供水与饮水安全直接关系到人民群众的生活与健康&#xff0c;切实做好城市供水与饮水安全保障工作&#xff0c;是把以人为本真正落到实处的一项紧迫任务。近年来&#xff0c;中央和地方加大了城乡供水与饮水安全保障工作的力度&#xff0c;对标最优质供水城市建设要求&#xff0…

从0开始python学习-54.python中flask创建MD5和base64加密校验的接口

MD5加密接口 import hashlib from flask import Flask, request, jsonify# 初始化一个flask的对象 app Flask(__name__)# MD5加密校验数据请求 # 定义用户数据 user_data [{"username": "admin1", "password": "E10ADC3949BA59ABBE56E05…

【HarmonyOS】鸿蒙开发之Video组件——第3.7章

Video组件内VideoOptions属性简介 src&#xff1a;设置视频地址。currentProgressRate&#xff1a;设置视频播放倍速&#xff0c;参数说明如下&#xff1a; number|string&#xff1a;只支持 0.75 &#xff0c; 1.0 &#xff0c; 1.25 &#xff0c; 1.75 &#xff0c; 2.0 。P…