CompletionService-及时获取任务返回值

ExecutorService

定义如下的 TaskCallable 类,返回值的延迟输出时间根据传入值决定:

1
2
3
4
5
6
7
public record TaskCallable(String taskID, Double value, Double rate) implements Callable<String> {
@Override
public String call() throws Exception {
TimeUnit.MILLISECONDS.sleep(value.longValue());
return String.format("taskID={ %s } result={ %s*%s -> %s }", taskID, value, rate, value * rate);
}
}

向线程池添加三个任务,执行时间有所区别,最后依次调用 get() 方法输出结果。

查看完整示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ExecutorService executorService = Executors.newFixedThreadPool(5);

var futureList = Arrays.asList(
executorService.submit(new TaskCallable("1aa8c994-281e-4fbb-a09b-cdf389eedf3b", 1000.0, 0.5)),
executorService.submit(new TaskCallable("321996c2-73c0-411a-8e66-fcfa04d94ae1", 5000.0, 1.0)),
executorService.submit(new TaskCallable("3df9d22e-17d9-4b48-a821-867974681d6e", 600.0, 1.2))
);

for (int index = 0; index < futureList.size(); index++) {
var result = futureList.get(index).get();
System.out.println(result);
}

executorService.shutdown();

输出结果:

1
2
3
4
taskID={ 1aa8c994-281e-4fbb-a09b-cdf389eedf3b } result={ 1000.0*0.5 -> 500.0 }
taskID={ 321996c2-73c0-411a-8e66-fcfa04d94ae1 } result={ 5000.0*1.0 -> 5000.0 }
taskID={ 3df9d22e-17d9-4b48-a821-867974681d6e } result={ 600.0*1.2 -> 720.0 }
计时器{ b10bbd47-63c4-4545-b236-15303f40cc1f }停止,耗时={ 5004ms }

由于 get() 方法是阻塞的,因此如果某个 Future 执行时间太长,整个遍历过程将会阻塞,无法及时从已完成的 Future 拿到返回值。上述例子中,即使 600ms 的任务,也只能等到前两个长时间任务都完成后才能输出。

CompletionService

CompletionSerive 接口的实现类 ExecutorCompletionService 优化了获取异步操作结果。ExecutorCompletionService 中内置了存放 Future 的队列 completionQueue,在任务调用完成后,将要返回的 future 放入到该队列。客户端通过 take() 方法得到 future,再调用 get() 方法从而获取任务执行结果。

1
2
3
4
5
6
7
8
9
10
11
private static class QueueingFuture<V> extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task,
BlockingQueue<Future<V>> completionQueue) {
super(task, null);
this.task = task;
this.completionQueue = completionQueue;
}
private final Future<V> task;
private final BlockingQueue<Future<V>> completionQueue;
protected void done() { completionQueue.add(task); }
}

向 CompletionService 提交任务的方式与 ExecutorService 一样。两者的区别在于取结果的方式。有了 CompletionService,可以不再需要 Future 集合。如果要得到最早的执行结果,调用 completionService.take().get() 即可:

查看完整示例代码

1
2
3
4
5
6
7
8
9
10
11
12
ExecutorService executorService = Executors.newFixedThreadPool(5);
CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);

completionService.submit(new TaskCallable("1aa8c994-281e-4fbb-a09b-cdf389eedf3b", 1000.0, 0.5));
completionService.submit(new TaskCallable("321996c2-73c0-411a-8e66-fcfa04d94ae1", 5000.0, 1.0));
completionService.submit(new TaskCallable("3df9d22e-17d9-4b48-a821-867974681d6e", 600.0, 1.2));

for (int index = 0; index < 3; index++) {
System.out.println(completionService.take().get());
}

executorService.shutdown();

输出结果

1
2
3
4
taskID={ 3df9d22e-17d9-4b48-a821-867974681d6e } result={ 600.0*1.2 -> 720.0 }
taskID={ 1aa8c994-281e-4fbb-a09b-cdf389eedf3b } result={ 1000.0*0.5 -> 500.0 }
taskID={ 321996c2-73c0-411a-8e66-fcfa04d94ae1 } result={ 5000.0*1.0 -> 5000.0 }
计时器{ 7223efb6-ec07-4294-8f23-8b73b536c9b4 }停止,耗时={ 5002ms }

CompletionService 结合异步实现多线程处理任务

定义一个任务队列,异步执行「入队」和「出队」,最终通过CompletionService获取全部任务的返回结果。

查看完整示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// 定义队列
LinkedBlockingDeque<TaskCallable> linkedBlockingDeque = new LinkedBlockingDeque<>();
// 模拟任务数量
final int taskCount = 5;
// 获取CPU核心数
int coreNum = Runtime.getRuntime().availableProcessors();
// 定义线程池
ThreadPoolExecutor customPool = new ThreadPoolExecutor(coreNum, coreNum, coreNum, TimeUnit.MINUTES, new LinkedBlockingDeque<>(),
new ThreadFactory() {
private final AtomicInteger customPoolCurrent = new AtomicInteger(1);

@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, String.format("customPool-thread-%s", customPoolCurrent.getAndIncrement()));
thread.setPriority(Thread.MAX_PRIORITY);
return thread;
}
}, new ThreadPoolExecutor.CallerRunsPolicy());

CompletionService<String> completionService = new ExecutorCompletionService<>(customPool);

var pushElement = CompletableFuture.runAsync(() -> {
for (int index = 0; index < taskCount; index++) {
var value = Math.random() * 1000;
var taskCallable = new TaskCallable(UUID.randomUUID().toString(), value, Math.random());
linkedBlockingDeque.push(taskCallable);
System.out.println(String.format("线程 %s 完成 %s 入队", Thread.currentThread().getName(), taskCallable));
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});

var submitTask = CompletableFuture.supplyAsync(() -> {
while (true) {
try {
boolean flagTask = pushElement.isDone();
boolean flagPeek = Optional.ofNullable(linkedBlockingDeque.peek()).isPresent();
// 当【入队任务完成】且【队列没有元素可取】时结束
if (flagTask == true && flagPeek == false) {
break;
}
if (flagPeek == false) {
// 仅【队列没有元素可取】时,跳过
continue;
}
// 【队列有元素可取】时,添加任务到线程池
TimeUnit.MILLISECONDS.sleep(500);
var taskCallable = linkedBlockingDeque.take();
completionService.submit(taskCallable);
System.out.println(String.format("线程 %s 将 %s 提交到线程池", Thread.currentThread().getName(), taskCallable));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return false;
});

boolean submit = submitTask.get();
if (!submit) {
for (int index = 0; index < taskCount; index++) {
System.out.println(completionService.take().get());
}
}

customPool.shutdown();
customPool.awaitTermination(3, TimeUnit.SECONDS);

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
线程 ForkJoinPool.commonPool-worker-1 完成 TaskCallable[taskID=29ad9f36-2e25-457a-8a50-40b2619ccd85, value=297.9894322218015, rate=0.39525736986090654] 入队
线程 ForkJoinPool.commonPool-worker-1 完成 TaskCallable[taskID=72434730-e52a-4819-a249-25b1e8450c81, value=861.735810794833, rate=0.0508273434713048] 入队
线程 ForkJoinPool.commonPool-worker-2 将 TaskCallable[taskID=72434730-e52a-4819-a249-25b1e8450c81, value=861.735810794833, rate=0.0508273434713048] 提交到线程池
线程 ForkJoinPool.commonPool-worker-1 完成 TaskCallable[taskID=6f25d9e6-9f38-4d84-ac59-259324f800f1, value=931.5744113116666, rate=0.8237427385465251] 入队
线程 ForkJoinPool.commonPool-worker-1 完成 TaskCallable[taskID=6e3de271-2169-49c7-a425-9973304379a9, value=754.2015420496596, rate=0.480198992823775] 入队
线程 ForkJoinPool.commonPool-worker-2 将 TaskCallable[taskID=6e3de271-2169-49c7-a425-9973304379a9, value=754.2015420496596, rate=0.480198992823775] 提交到线程池
线程 ForkJoinPool.commonPool-worker-1 完成 TaskCallable[taskID=ffcb4a5e-603b-46dd-b4d0-02dc2bc7204c, value=259.60270090020333, rate=0.404730459043419] 入队
线程 ForkJoinPool.commonPool-worker-2 将 TaskCallable[taskID=ffcb4a5e-603b-46dd-b4d0-02dc2bc7204c, value=259.60270090020333, rate=0.404730459043419] 提交到线程池
线程 ForkJoinPool.commonPool-worker-2 将 TaskCallable[taskID=6f25d9e6-9f38-4d84-ac59-259324f800f1, value=931.5744113116666, rate=0.8237427385465251] 提交到线程池
线程 ForkJoinPool.commonPool-worker-2 将 TaskCallable[taskID=29ad9f36-2e25-457a-8a50-40b2619ccd85, value=297.9894322218015, rate=0.39525736986090654] 提交到线程池
taskID={ 72434730-e52a-4819-a249-25b1e8450c81 } result={ 861.735810794833*0.0508273434713048 -> 43.7997420367923 }
taskID={ 6e3de271-2169-49c7-a425-9973304379a9 } result={ 754.2015420496596*0.480198992823775 -> 362.1668208783845 }
taskID={ ffcb4a5e-603b-46dd-b4d0-02dc2bc7204c } result={ 259.60270090020333*0.404730459043419 -> 105.06912030425069 }
taskID={ 29ad9f36-2e25-457a-8a50-40b2619ccd85 } result={ 297.9894322218015*0.39525736986090654 -> 117.78251922633414 }
taskID={ 6f25d9e6-9f38-4d84-ac59-259324f800f1 } result={ 931.5744113116666*0.8237427385465251 -> 767.3776567337393 }