ExecutorService
定义如下的 TaskCallable 类,返回值的延迟输出时间根据传入值决定:
1 2 3 4 5 6 7
| public record TaskCallable(String taskID, Double value, Double rate) implements Callable<String> { @Override public String call() throws Exception { TimeUnit.MILLISECONDS.sleep(value.longValue()); return String.format("taskID={ %s } result={ %s*%s -> %s }", taskID, value, rate, value * rate); } }
|
向线程池添加三个任务,执行时间有所区别,最后依次调用 get()
方法输出结果。
查看完整示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| ExecutorService executorService = Executors.newFixedThreadPool(5);
var futureList = Arrays.asList( executorService.submit(new TaskCallable("1aa8c994-281e-4fbb-a09b-cdf389eedf3b", 1000.0, 0.5)), executorService.submit(new TaskCallable("321996c2-73c0-411a-8e66-fcfa04d94ae1", 5000.0, 1.0)), executorService.submit(new TaskCallable("3df9d22e-17d9-4b48-a821-867974681d6e", 600.0, 1.2)) );
for (int index = 0; index < futureList.size(); index++) { var result = futureList.get(index).get(); System.out.println(result); }
executorService.shutdown();
|
输出结果:
1 2 3 4
| taskID={ 1aa8c994-281e-4fbb-a09b-cdf389eedf3b } result={ 1000.0*0.5 -> 500.0 } taskID={ 321996c2-73c0-411a-8e66-fcfa04d94ae1 } result={ 5000.0*1.0 -> 5000.0 } taskID={ 3df9d22e-17d9-4b48-a821-867974681d6e } result={ 600.0*1.2 -> 720.0 } 计时器{ b10bbd47-63c4-4545-b236-15303f40cc1f }停止,耗时={ 5004ms }
|
由于 get()
方法是阻塞的,因此如果某个 Future 执行时间太长,整个遍历过程将会阻塞,无法及时从已完成的 Future 拿到返回值。上述例子中,即使 600ms 的任务,也只能等到前两个长时间任务都完成后才能输出。
CompletionService
CompletionSerive 接口的实现类 ExecutorCompletionService 优化了获取异步操作结果。ExecutorCompletionService 中内置了存放 Future 的队列 completionQueue,在任务调用完成后,将要返回的 future 放入到该队列。客户端通过 take()
方法得到 future,再调用 get()
方法从而获取任务执行结果。
1 2 3 4 5 6 7 8 9 10 11
| private static class QueueingFuture<V> extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task, BlockingQueue<Future<V>> completionQueue) { super(task, null); this.task = task; this.completionQueue = completionQueue; } private final Future<V> task; private final BlockingQueue<Future<V>> completionQueue; protected void done() { completionQueue.add(task); } }
|
向 CompletionService 提交任务的方式与 ExecutorService 一样。两者的区别在于取结果的方式。有了 CompletionService,可以不再需要 Future 集合。如果要得到最早的执行结果,调用 completionService.take().get()
即可:
查看完整示例代码
1 2 3 4 5 6 7 8 9 10 11 12
| ExecutorService executorService = Executors.newFixedThreadPool(5); CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
completionService.submit(new TaskCallable("1aa8c994-281e-4fbb-a09b-cdf389eedf3b", 1000.0, 0.5)); completionService.submit(new TaskCallable("321996c2-73c0-411a-8e66-fcfa04d94ae1", 5000.0, 1.0)); completionService.submit(new TaskCallable("3df9d22e-17d9-4b48-a821-867974681d6e", 600.0, 1.2));
for (int index = 0; index < 3; index++) { System.out.println(completionService.take().get()); }
executorService.shutdown();
|
输出结果
1 2 3 4
| taskID={ 3df9d22e-17d9-4b48-a821-867974681d6e } result={ 600.0*1.2 -> 720.0 } taskID={ 1aa8c994-281e-4fbb-a09b-cdf389eedf3b } result={ 1000.0*0.5 -> 500.0 } taskID={ 321996c2-73c0-411a-8e66-fcfa04d94ae1 } result={ 5000.0*1.0 -> 5000.0 } 计时器{ 7223efb6-ec07-4294-8f23-8b73b536c9b4 }停止,耗时={ 5002ms }
|
CompletionService 结合异步实现多线程处理任务
定义一个任务队列,异步执行「入队」和「出队」,最终通过CompletionService获取全部任务的返回结果。
查看完整示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| LinkedBlockingDeque<TaskCallable> linkedBlockingDeque = new LinkedBlockingDeque<>();
final int taskCount = 5;
int coreNum = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor customPool = new ThreadPoolExecutor(coreNum, coreNum, coreNum, TimeUnit.MINUTES, new LinkedBlockingDeque<>(), new ThreadFactory() { private final AtomicInteger customPoolCurrent = new AtomicInteger(1);
@Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, String.format("customPool-thread-%s", customPoolCurrent.getAndIncrement())); thread.setPriority(Thread.MAX_PRIORITY); return thread; } }, new ThreadPoolExecutor.CallerRunsPolicy());
CompletionService<String> completionService = new ExecutorCompletionService<>(customPool);
var pushElement = CompletableFuture.runAsync(() -> { for (int index = 0; index < taskCount; index++) { var value = Math.random() * 1000; var taskCallable = new TaskCallable(UUID.randomUUID().toString(), value, Math.random()); linkedBlockingDeque.push(taskCallable); System.out.println(String.format("线程 %s 完成 %s 入队", Thread.currentThread().getName(), taskCallable)); try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { throw new RuntimeException(e); } } });
var submitTask = CompletableFuture.supplyAsync(() -> { while (true) { try { boolean flagTask = pushElement.isDone(); boolean flagPeek = Optional.ofNullable(linkedBlockingDeque.peek()).isPresent(); if (flagTask == true && flagPeek == false) { break; } if (flagPeek == false) { continue; } TimeUnit.MILLISECONDS.sleep(500); var taskCallable = linkedBlockingDeque.take(); completionService.submit(taskCallable); System.out.println(String.format("线程 %s 将 %s 提交到线程池", Thread.currentThread().getName(), taskCallable)); } catch (InterruptedException e) { throw new RuntimeException(e); } } return false; });
boolean submit = submitTask.get(); if (!submit) { for (int index = 0; index < taskCount; index++) { System.out.println(completionService.take().get()); } }
customPool.shutdown(); customPool.awaitTermination(3, TimeUnit.SECONDS);
|
输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| 线程 ForkJoinPool.commonPool-worker-1 完成 TaskCallable[taskID=29ad9f36-2e25-457a-8a50-40b2619ccd85, value=297.9894322218015, rate=0.39525736986090654] 入队 线程 ForkJoinPool.commonPool-worker-1 完成 TaskCallable[taskID=72434730-e52a-4819-a249-25b1e8450c81, value=861.735810794833, rate=0.0508273434713048] 入队 线程 ForkJoinPool.commonPool-worker-2 将 TaskCallable[taskID=72434730-e52a-4819-a249-25b1e8450c81, value=861.735810794833, rate=0.0508273434713048] 提交到线程池 线程 ForkJoinPool.commonPool-worker-1 完成 TaskCallable[taskID=6f25d9e6-9f38-4d84-ac59-259324f800f1, value=931.5744113116666, rate=0.8237427385465251] 入队 线程 ForkJoinPool.commonPool-worker-1 完成 TaskCallable[taskID=6e3de271-2169-49c7-a425-9973304379a9, value=754.2015420496596, rate=0.480198992823775] 入队 线程 ForkJoinPool.commonPool-worker-2 将 TaskCallable[taskID=6e3de271-2169-49c7-a425-9973304379a9, value=754.2015420496596, rate=0.480198992823775] 提交到线程池 线程 ForkJoinPool.commonPool-worker-1 完成 TaskCallable[taskID=ffcb4a5e-603b-46dd-b4d0-02dc2bc7204c, value=259.60270090020333, rate=0.404730459043419] 入队 线程 ForkJoinPool.commonPool-worker-2 将 TaskCallable[taskID=ffcb4a5e-603b-46dd-b4d0-02dc2bc7204c, value=259.60270090020333, rate=0.404730459043419] 提交到线程池 线程 ForkJoinPool.commonPool-worker-2 将 TaskCallable[taskID=6f25d9e6-9f38-4d84-ac59-259324f800f1, value=931.5744113116666, rate=0.8237427385465251] 提交到线程池 线程 ForkJoinPool.commonPool-worker-2 将 TaskCallable[taskID=29ad9f36-2e25-457a-8a50-40b2619ccd85, value=297.9894322218015, rate=0.39525736986090654] 提交到线程池 taskID={ 72434730-e52a-4819-a249-25b1e8450c81 } result={ 861.735810794833*0.0508273434713048 -> 43.7997420367923 } taskID={ 6e3de271-2169-49c7-a425-9973304379a9 } result={ 754.2015420496596*0.480198992823775 -> 362.1668208783845 } taskID={ ffcb4a5e-603b-46dd-b4d0-02dc2bc7204c } result={ 259.60270090020333*0.404730459043419 -> 105.06912030425069 } taskID={ 29ad9f36-2e25-457a-8a50-40b2619ccd85 } result={ 297.9894322218015*0.39525736986090654 -> 117.78251922633414 } taskID={ 6f25d9e6-9f38-4d84-ac59-259324f800f1 } result={ 931.5744113116666*0.8237427385465251 -> 767.3776567337393 }
|