1 2 3 4 5 6 7 阻塞队列为空 shutdown() 线程池工作线程数为0 +--------------> SHUTDOWN ----------------+ | | terminated() RUNNING ----+ +----> TIDYING ------------> TERMINATED | shutdownNow() 线程池工作线程数为0 | +--------------> STOP ----------------+
构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException (); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException (); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
参数解析
corePoolSize:核心线程池的大小。创建了线程池后,默认情况下线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务。当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列当中;
maximumPoolSize:线程池最大线程数;
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止;
unit:参数 keepAliveTime 的时间单位(DAYS、HOURS、MINUTES、SECONDS 等)
workQueue:阻塞队列,用来存储等待执行的任务
ArrayBlockingQueue,有界队列
LinkedBlockingQueue,无界队列
SynchronousQueue
threadFactory:线程工厂,主要用来创建线程
handler:拒绝处理任务的策略
AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常,默认策略
DiscardPolicy:丢弃任务,但不抛出异常
DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务,重复此过程
CallerRunsPolicy:由调用线程处理该任务
预定义线程池 FixedThreadPool
1 2 3 4 5 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); }
corePoolSize 与 maximumPoolSize 相等,即其线程全为核心线程,是一个固定大小的线程池,是其优势;
keepAliveTime = 0 该参数默认对核心线程无效,而 FixedThreadPool 全部为核心线程;
workQueue 为 LinkedBlockingQueue(无界阻塞队列),队列最大值为 Integer.MAX_VALUE。如果任务提交速度持续大余任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。是其劣势;
FixedThreadPool 的任务执行是无序的;
适用场景:可用于 Web 服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。
CachedThreadPool
1 2 3 4 5 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;
keepAliveTime = 60s,线程空闲 60s 后自动结束。
workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为 CachedThreadPool 线程创建无限制,不会有队列等待,所以使用 SynchronousQueue;
适用场景:快速处理大量耗时较短的任务,如 Netty 的 NIO 接受请求时,可使用 CachedThreadPool。
SingleThreadExecutor
1 2 3 4 5 6 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); }
这里多了一层 FinalizableDelegatedExecutorService 包装,这一层有什么用呢,写个 dome 来解释一下:
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) { ExecutorService fixedExecutorService = Executors.newFixedThreadPool(1 ); ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) fixedExecutorService; System.out.println(threadPoolExecutor.getMaximumPoolSize()); threadPoolExecutor.setCorePoolSize(8 ); ExecutorService singleExecutorService = Executors.newSingleThreadExecutor(); }
对比可以看出,FixedThreadPool 可以向下转型为 ThreadPoolExecutor,并对其线程池进行配置,而 SingleThreadExecutor 被包装后,无法成功向下转型。因此,SingleThreadExecutor 被定以后,无法修改,做到了真正的 Single。
ScheduledThreadPool
1 2 3 public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize) { return new ScheduledThreadPoolExecutor (corePoolSize); }
newScheduledThreadPool 调用的是 ScheduledThreadPoolExecutor 的构造方法,而 ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,构造是还是调用了其父类的构造方法。
1 2 3 4 public ScheduledThreadPoolExecutor (int corePoolSize) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue ()); }
对于 ScheduledThreadPool 本文不做描述,其特性请关注后续篇章。
自定义线程池 以下是自定义线程池,使用了有界队列,自定义 ThreadFactory 和拒绝策略的 demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public class ThreadTest { public static void main (String[] args) throws InterruptedException, IOException { int corePoolSize = 2 ; int maximumPoolSize = 4 ; long keepAliveTime = 10 ; TimeUnit unit = TimeUnit.SECONDS; BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue <>(2 ); ThreadFactory threadFactory = new NameTreadFactory (); RejectedExecutionHandler handler = new MyIgnorePolicy (); ThreadPoolExecutor executor = new ThreadPoolExecutor (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); executor.prestartAllCoreThreads(); for (int i = 1 ; i <= 10 ; i++) { MyTask task = new MyTask (String.valueOf(i)); executor.execute(task); } System.in.read(); } static class NameTreadFactory implements ThreadFactory { private final AtomicInteger mThreadNum = new AtomicInteger (1 ); @Override public Thread newThread (Runnable r) { Thread t = new Thread (r, "my-thread-" + mThreadNum.getAndIncrement()); System.out.println(t.getName() + " has been created" ); return t; } } public static class MyIgnorePolicy implements RejectedExecutionHandler { public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { doLog(r, e); } private void doLog (Runnable r, ThreadPoolExecutor e) { System.err.println( r.toString() + " rejected" ); } } static class MyTask implements Runnable { private String name; public MyTask (String name) { this .name = name; } @Override public void run () { try { System.out.println(this .toString() + " is running!" ); Thread.sleep(3000 ); } catch (InterruptedException e) { e.printStackTrace(); } } public String getName () { return name; } @Override public String toString () { return "MyTask [name=" + name + "]" ; } } }
输出结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 my-thread-1 has been created my-thread-2 has been created my-thread-3 has been created MyTask[name=1]is running! MyTask[name=2]is running! my-thread-4 has been created MyTask[name=3]is running! MyTask[name=6]is running! MyTask[name=7]is rejected MyTask[name=8]is rejected MyTask[name=9]is rejected MyTask[name=10]is rejected MyTask[name=4]is running! MyTask[name=5]is running!
1,由于线程预启动,首先创建了 1,2 号线程,然后 task1,task2 被执行; 2,但任务提交没有结束,此时任务 task3,task6 到达发现核心线程已经满了,进入等待队列; 3,等待队列满后创建任务线程 3,4 执行任务 task3,task6,同时 task4,task5 进入队列; 4,此时创建线程数(4)等于最大线程数,且队列已满,所以 7,8,9,10 任务被拒绝; 5,任务执行完毕后回头来执行 task4,task5,队列清空。