ThreadPoolExecutor 线程池

解决了两个问题:

1:通过减少任务间的调度开销 (主要是通过线程池中的线程被重复使用的方式),来提高大量任务时的执行性能; 2:提供了一种方式来管理线程和消费,维护基本数据统计等工作

整体架构

2020227152611

运行状态图

2020227153022

Wroker

在线程池中,最小的执行单位就是 Worker

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
{


        // 运行任务的线程
        final Thread thread;
        // 任务代码块
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            // 把自己作为一个代码块穿给线程
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            // 线程是通过线程工程创建的
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            // 这里就将任务的执行交给线程池了
            runWorker(this);
        }
        ...
}

任务提交

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 如果工作线程数小于coreSize
    if (workerCountOf(c) < corePoolSize) {
        // 则直接创建worker执行
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 如果线程池状态正常,并且工作队列还能入队
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 线程池状态异常,尝试从队列移除任务
        if (! isRunning(recheck) && remove(command))
            // 移除成功就拒绝任务
            reject(command);
        // 工作线程为0
        else if (workerCountOf(recheck) == 0)
            // 直接创建worker执行
            addWorker(null, false);
    }
    // 队列满了,如果线程数超过maxSize,拒绝任务
    else if (!addWorker(command, false))
        reject(command);
}

addWorker 方法首先是执行了一堆校验,然后使用 new Worker (firstTask) 新建了 Worker,最后使用 t.start () 执行 Worker,所以 t.start () 会执行到 Worker 的 run 方法上,到runWorker 方法里

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 校验各种状态
    for (int c = ctl.get();;) {
        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;
        for (;;) {
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 把任务交给worker,此时要执行的任务就已经传入给worker里面的thread了
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                // 检查线程池状态
                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    workerAdded = true;
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动线程,执行worker
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 所以说在这里,在不断地取任务执行
        // 如果要执行的task为空,则会去取一个task,取不到就阻塞
        while (task != null || (task = getTask()) != null) {
            // 锁住worker,防止一个任务多个线程执行
            w.lock();
            // 线程池stop了,让线程中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 执行前钩子函数
                beforeExecute(wt, task);
                try {
                    // 执行真正的任务
                    // 而执行这个任务的线程就是worker里面的thread
                    task.run();
                    // 执行后钩子函数
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    // 异常钩子函数
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
private Runnable getTask() {
    // 如果设置了线程超时时间,超过一定时间没有任务,超出coreSize部分的线程会被回收
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        // 检查线程池状态
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // 如果线程数大于maxSize但是存活时间还没超过keepalive,则跳过后面取任务的部分
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            // 超过keepAliveTime时间取不到数据就返回,此时线程不再运行,结束了,JVM会回收掉
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

应用场景

coreSize == maxSize

让线程一下子增加到 maxSize,并且不要回收线程,防止线程回收,避免不断增加回收的损耗

maxSize 无界 + SynchronousQueue

当任务被消费时,才会返回,这样请求就能够知道当前请求是已经在被消费了,如果是其他的队列的话,我们只知道任务已经被提交成功了,但无法知道当前任务是在被消费中,还是正在队列中堆积

比较消耗资源,大量请求到来时,我们会新建大量的线程来处理请求

maxSize 有界 + Queue 无界

对实时性要求不大,但流量忽高忽低的场景下,可以使用这种方式

当流量高峰时,大量的请求被阻塞在队列中,对于请求的实时性难以保证

maxSize 有界 + Queue 有界

把队列从无界修改成有界,只要排队的任务在要求的时间内,能够完成任务即可

keepAliveTime 设置无穷大

想要空闲的线程不被回收,我们可以设置 keepAliveTime 为无穷大值

线程池的公用和独立

查询和写入不公用线程池,如果公用的话,当查询量很大时,写入的请求可能会到队列中去排队,无法及时被处理

原则上来说,每个写入业务场景都独自使用自己的线程池,绝不共用,这样在业务治理、限流、熔断方面都比较容易 多个查询业务场景是可以公用线程池的

问题

对线程池的理解

线程池结合了锁、线程、队列等元素,在请求量较大的环境下,可以多线程的处理请求,充分的利用了系统的资源,提高了处理请求的速度

队列在线程池中起的作用

请求数大于 coreSize 时,可以让任务在队列中排队,让线程池中的线程慢慢的消费请求 当线程消费完所有的线程后,会阻塞的从队列中拿数据,通过队列阻塞的功能,使线程不消亡

results matching " "

No results matching " "

results matching " "

No results matching " "