ThreadPoolExecutor-线程池

1
2
3
4
5
6
7
                                        阻塞队列为空
shutdown() 线程池工作线程数为0
+--------------> SHUTDOWN ----------------+
| | terminated()
RUNNING ----+ +----> TIDYING ------------> TERMINATED
| shutdownNow() 线程池工作线程数为0 |
+--------------> STOP ----------------+

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

参数解析

  • corePoolSize:核心线程池的大小。创建了线程池后,默认情况下线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务。当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止;
  • unit:参数 keepAliveTime 的时间单位(DAYS、HOURS、MINUTES、SECONDS 等)
  • workQueue:阻塞队列,用来存储等待执行的任务
    • ArrayBlockingQueue,有界队列
    • LinkedBlockingQueue,无界队列
    • SynchronousQueue
  • threadFactory:线程工厂,主要用来创建线程
  • handler:拒绝处理任务的策略
    • AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常,默认策略
    • DiscardPolicy:丢弃任务,但不抛出异常
    • DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务,重复此过程
    • CallerRunsPolicy:由调用线程处理该任务

预定义线程池

FixedThreadPool

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  • corePoolSize 与 maximumPoolSize 相等,即其线程全为核心线程,是一个固定大小的线程池,是其优势;
  • keepAliveTime = 0 该参数默认对核心线程无效,而 FixedThreadPool 全部为核心线程;
  • workQueue 为 LinkedBlockingQueue(无界阻塞队列),队列最大值为 Integer.MAX_VALUE。如果任务提交速度持续大余任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。是其劣势;
  • FixedThreadPool 的任务执行是无序的;

适用场景:可用于 Web 服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。

CachedThreadPool

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
  • corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;
  • keepAliveTime = 60s,线程空闲 60s 后自动结束。
  • workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为 CachedThreadPool 线程创建无限制,不会有队列等待,所以使用 SynchronousQueue;

适用场景:快速处理大量耗时较短的任务,如 Netty 的 NIO 接受请求时,可使用 CachedThreadPool。

SingleThreadExecutor

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

这里多了一层 FinalizableDelegatedExecutorService 包装,这一层有什么用呢,写个 dome 来解释一下:

1
2
3
4
5
6
7
8
9
10
    public static void main(String[] args) {
ExecutorService fixedExecutorService = Executors.newFixedThreadPool(1);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) fixedExecutorService;
System.out.println(threadPoolExecutor.getMaximumPoolSize());
threadPoolExecutor.setCorePoolSize(8);

ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();
// 运行时异常 java.lang.ClassCastException
// ThreadPoolExecutor threadPoolExecutor2 = (ThreadPoolExecutor) singleExecutorService;
}

对比可以看出,FixedThreadPool 可以向下转型为 ThreadPoolExecutor,并对其线程池进行配置,而 SingleThreadExecutor 被包装后,无法成功向下转型。因此,SingleThreadExecutor 被定以后,无法修改,做到了真正的 Single。

ScheduledThreadPool

1
2
3
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
  • newScheduledThreadPool 调用的是 ScheduledThreadPoolExecutor 的构造方法,而 ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,构造是还是调用了其父类的构造方法。
1
2
3
4
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

对于 ScheduledThreadPool 本文不做描述,其特性请关注后续篇章。

自定义线程池

以下是自定义线程池,使用了有界队列,自定义 ThreadFactory 和拒绝策略的 demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class ThreadTest {

public static void main(String[] args) throws InterruptedException, IOException {
int corePoolSize = 2;
int maximumPoolSize = 4;
long keepAliveTime = 10;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
ThreadFactory threadFactory = new NameTreadFactory();
RejectedExecutionHandler handler = new MyIgnorePolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory, handler);
executor.prestartAllCoreThreads(); // 预启动所有核心线程

for (int i = 1; i <= 10; i++) {
MyTask task = new MyTask(String.valueOf(i));
executor.execute(task);
}

System.in.read(); //阻塞主线程
}

static class NameTreadFactory implements ThreadFactory {

private final AtomicInteger mThreadNum = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
System.out.println(t.getName() + " has been created");
return t;
}
}

public static class MyIgnorePolicy implements RejectedExecutionHandler {

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
doLog(r, e);
}

private void doLog(Runnable r, ThreadPoolExecutor e) {
// 可做日志记录等
System.err.println( r.toString() + " rejected");
// System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
}
}

static class MyTask implements Runnable {
private String name;

public MyTask(String name) {
this.name = name;
}

@Override
public void run() {
try {
System.out.println(this.toString() + " is running!");
Thread.sleep(3000); //让任务执行慢点
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public String getName() {
return name;
}

@Override
public String toString() {
return "MyTask [name=" + name + "]";
}
}
}

输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
my-thread-1 has been created
my-thread-2 has been created
my-thread-3 has been created
MyTask[name=1]is running!
MyTask[name=2]is running!
my-thread-4 has been created
MyTask[name=3]is running!
MyTask[name=6]is running!
MyTask[name=7]is rejected
MyTask[name=8]is rejected
MyTask[name=9]is rejected
MyTask[name=10]is rejected
MyTask[name=4]is running!
MyTask[name=5]is running!

1,由于线程预启动,首先创建了 1,2 号线程,然后 task1,task2 被执行;
2,但任务提交没有结束,此时任务 task3,task6 到达发现核心线程已经满了,进入等待队列;
3,等待队列满后创建任务线程 3,4 执行任务 task3,task6,同时 task4,task5 进入队列;
4,此时创建线程数(4)等于最大线程数,且队列已满,所以 7,8,9,10 任务被拒绝;
5,任务执行完毕后回头来执行 task4,task5,队列清空。