线程池

作用:

  • 线程复用 控制最大并发数
  • 实现任务缓存策略以及拒绝策略
  • 定期执行 周期执行
  • 隔离不同业务的线程执行环境

Executor框架

public interface Executor {
  void execute(Runnable command);
}

ExecutorService继承了Executor,增加了一些方法

public interface ExecutorService extends Executor {
    // 平缓关闭
    void shutdown();

    // 粗暴关闭
    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
}

Callable

  • 拥有返回值

Future

用来执行一些较长时间的计算,通过get来获取结果(阻塞或者超时)

用于异步获取执行结果或取消执行任务的场景

FutureTask<Integer> futureTask = new FutureTask<>(() -> {
    int result = 0;
    for (int i = 0; i < 100; i++) {
        Thread.sleep(10);
        result += i;
    }
    return result;
});
new Thread(futureTask).start();
System.out.println(futureTask.get());
Future<String> future = pool.submit(() -> {
    Thread.sleep(3000);
    return "java";
});
String s = future.get();

Future模式

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyFuture myFuture = new MyFuture();
        // 在这里 main thread 可以做其他事情
        // 下一行代码将阻塞直到结果可用
        System.out.println(myFuture.getData());
    }
}

class MyFuture{
    private volatile boolean FLAG = false;
    private String data;

    public MyFuture() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("future 任务开始 睡眠 3000ms");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("future 任务结束");
                setData("jntm");
            }
        }).start();
    }

    private synchronized void setData(String data){
        if (FLAG){
            return;
        }
        this.data = data;
        FLAG = true;
        notify();
    }

    public synchronized String getData(){
        while (!FLAG){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return data;
    }
}

线程池分类

  • ThreadPollExecutor
  • ForkJoinPool
    • 分解汇总的任务
    • 用很少的线程可以执行很多的任务(子任务) TPE做不到先执行子任务
    • CPU密集型

ThreadPollExecutor

Executors:线程池工厂( 不推荐使用

  • newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。这个线程池的最大线程数能达到整数的最大值
  • newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。同样 线程最大数也是整数最大值
  • newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
  • newWorkSealingPool jdk8引入 使用多个队列来减少竞争

这个线程工厂大部分都使用了无界队列 如果瞬间请求量大 很有可能造成oom

原理

批注 2020-07-07 131242

自定义

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
  2/*实际运行线程数*/,
  3/*最多可创建的线程数*/,
  0L /* 让线程存活的时间 0为永久 */,
  TimeUnit.SECONDS,
  new ArrayBlockingQueue<>(4)/*  线程池的内部队列 */,
  Executors.defaultThreadFactory()/* 产生线程的方式 */,
  new ThreadPoolExecutor.DiscardOldestPolicy() /* 线程池满时的拒绝策略 */
  );

参数:

  1. corePoolSize 如果等于0 则任务执行结束后就会销毁所有线程 如果大于0 任务执行后这些线程不会被销毁
  2. maximumPoolSize 能最大同时容纳的线程数 如果任务数量大于这个数 那么剩下的任务就要被缓存在一个阻塞队列中
  3. keepAliveTime 表示线程池中的线程空闲时间 多于corePoolSize数量的部分线程会被销毁
  4. 时间单位
  5. workQUeue 缓存队列
  6. threadFactory 定义线程池线程的产生方式
  7. handler 任务拒绝策略

如何配置:

  • CPU密集型
  • IO密集型

自定义线程工厂 为线程指定有意义的名称和相应的序列号 方便出错排查

定义好拒绝策略 宁愿抛出异常 也不要使用DiscardPolicy 这个策略会静悄悄的抛弃任务

线程池的大小

N cpu = CPU数量 U cpu = 预期CPU使用率 W/C = 等待时间/计算时间

最优大小等于 N cpu U cpu (1 + W/C)

ForkJoinPool

results matching " "

No results matching " "

results matching " "

No results matching " "