oracle JDK8 有关内容的文档:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
创建异步任务 runAsync 执行 CompletableFuture 任务,没有返回值
1 2 static CompletableFuture<Void> runAsync (Runnable runnable) static CompletableFuture<Void> runAsync (Runnable runnable, Executor executor)
supplyAsync 执行 CompletableFuture 任务,可有返回值
1 2 static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier) static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier, Executor executor)
如果不指定 Executor 实现,则使用 ForkJoinPool.commonPool()
作为执行异步代码的线程池
创建异步任务后,可根据需求进行如下的操作:
方法名称
类型
传参
返回值
thenRun
单任务消费
无传参
无返回值
thenRunAsync
单任务消费
无传参
无返回值
thenApply
单任务消费
要传参
有返回值
thenApplyAsync
单任务消费
要传参
有返回值
thenAccept
单任务消费
要传参
无返回值
thenAcceptAsync
单任务消费
要传参
无返回值
thenCombine
双任务消费(与)
要传参(两个任务的执行结果)
有返回值
thenCombineAsync
双任务消费(与)
要传参(两个任务的执行结果)
有返回值
thenAcceptBoth
双任务消费(与)
要传参(两个任务的执行结果)
无返回值
thenAcceptBothAsync
双任务消费(与)
要传参(两个任务的执行结果)
无返回值
runAfterBoth
双任务消费(与)
无传参
无返回值
runAfterBothAsync
双任务消费(与)
无传参
无返回值
applyToEither
双任务消费(或)
要传参(已完成任务的执行结果)
有返回值
applyToEitherAsync
双任务消费(或)
要传参(已完成任务的执行结果)
有返回值
acceptEither
双任务消费(或)
要传参(已完成任务的执行结果)
无返回值
acceptEitherAsync
双任务消费(或)
要传参(已完成任务的执行结果)
无返回值
runAfterEither
双任务消费(或)
无传参
无返回值
runAfterEitherAsync
双任务消费(或)
无传参
无返回值
whenComplete
单任务消费
要传参(正常返回值和异常)
无返回值
whenCompleteAsync
单任务消费
要传参(正常返回值和异常)
无返回值
handle
单任务消费
要传参(正常返回值和异常)
有返回值
handleAsync
单任务消费
要传参(正常返回值和异常)
有返回值
exceptionally
单任务消费
要传参 (异常)
无返回值
thenCompose
单任务消费
要传参
有返回值
allOf
多任务消费(与)
要传参(任务列表)
无返回值
anyOf
多任务消费(或)
要传参(任务列表)
无返回值
不带 Async 版本由上一个任务的线程继续执行该任务,Async 版本可以指定执行该异步任务的 Executor 实现,如果不指定,默认使用 ForkJoinPool.commonPool()
单任务消费
回调方法
类型
传参
返回值
thenRun
单任务消费
无传参
无返回值
thenRunAsync
单任务消费
无传参
无返回值
thenAccept
单任务消费
要传参
无返回值
thenAcceptAsync
单任务消费
要传参
无返回值
thenApply
单任务消费
要传参
有返回值
thenApplyAsync
单任务消费
要传参
有返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public static void main (String[] args) throws Exception { var executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); var supplyAsyncTask = CompletableFuture.supplyAsync(() -> { System.out.println("supplyAsyncTask=" + Thread.currentThread().getName()); try { Thread.sleep(1000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "" ; }, executor); var thenApplyTask = supplyAsyncTask.thenApply((param) -> { System.out.println("thenApplyTask=" + Thread.currentThread().getName()); return "" ; }); var thenApplyAsyncTask = supplyAsyncTask.thenApplyAsync((param) -> { System.out.println("thenApplyAsyncTask=" + Thread.currentThread().getName()); return "" ; }); var thenApplyAsyncTask2 = supplyAsyncTask.thenApplyAsync((param) -> { System.out.println("thenApplyAsyncTask2=" + Thread.currentThread().getName()); return "" ; }, executor); thenApplyAsyncTask.get(); thenApplyAsyncTask2.get(); executor.shutdown(); }
输出结果:
1 2 3 4 supplyAsyncTask=pool-1-thread-1 thenApplyAsyncTask2=pool-1-thread-2 thenApplyTask=pool-1-thread-2 thenApplyAsyncTask=ForkJoinPool.commonPool-worker-3
双任务消费(与) 将两个 CompletableFuture 组合起来,只有这两个都正常执行完了,才会执行某个任务。
方法名称
类型
传参
返回值
thenCombine
双任务消费(与)
有传参(两个任务的执行结果)
有返回值
thenCombineAsync
双任务消费(与)
有传参(两个任务的执行结果)
有返回值
thenAcceptBoth
双任务消费(与)
有传参(两个任务的执行结果)
无返回值
thenAcceptBothAsync
双任务消费(与)
有传参(两个任务的执行结果)
无返回值
runAfterBoth
双任务消费(与)
无传参
无返回值
runAfterBothAsync
双任务消费(与)
无传参
无返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public static void main (String[] args) throws Exception { var task1 = CompletableFuture.supplyAsync(() -> "task1" ); var task2 = CompletableFuture.supplyAsync(() -> "task2" ); var task3 = CompletableFuture.supplyAsync(() -> "task3" ); task1.thenCombine(task2, (param1, param2) -> { System.out.println(param1 + param2); return param1 + param2; }).thenCombine(task3, (param12, param3) -> { System.out.println(param12 + param3); return param12 + param3; }); task1.thenAcceptBoth(task2, (param1, param2) -> { System.out.println(param1 + param2); }).thenAcceptBoth(task3, (param12, param3) -> { System.out.println(param12 + param3); }); task1.runAfterBoth(task2, () -> { System.out.println("task1 and task2" ); }); }
双任务消费(或) 将两个 CompletableFuture 组合起来,只要其中一个执行完了,就执行回调方法。
方法名称
类型
传参
返回值
applyToEither
双任务消费(或)
有传参(已完成任务的执行结果)
有返回值
applyToEitherAsync
双任务消费(或)
有传参(已完成任务的执行结果)
有返回值
acceptEither
双任务消费(或)
有传参(已完成任务的执行结果)
无返回值
acceptEitherAsync
双任务消费(或)
有传参(已完成任务的执行结果)
无返回值
runAfterEither
双任务消费(或)
无传参
无返回值
runAfterEitherAsync
双任务消费(或)
无传参
无返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public static void main (String[] args) throws Exception { var task1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "task1" ; }); var task2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "task2" ; }); var task3 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "task3" ; }); task1.applyToEither(task2, (param) -> { System.out.println("applyToEither=" + param); return param; }).acceptEither(task3, (param) -> { System.out.println("acceptEither=" + param); }).get(); task1.runAfterEither(task2,()-> System.out.println("task1 or task2" )); }
其他 whenComplete、whenCompleteAsync 某个任务执行完成后,执行的回调方法,无返回值。可以访问 CompletableFuture 的结果和异常作为参数,使用它们并执行想要的操作。此方法并不能转换完成的结果。会内部抛出异常。其正常返回的 CompletableFuture 的结果来自上个任务。
handle、handleAsync 不论正常返回还是出异常都会进入 handle,参数通常为 new BiFunction<T, Throwable, R>();
,其中
T:上一任务传入的对象类型
Throwable:上一任务传入的异常
R:返回的对象类型
handle 和 thenApply 的区别:如果任务出现异常不会进入 thenApply;任务出现异常也会进入 handle,可对异常处理。
handle 和 whenComplete 的区别:handle 可对传入值 T 进行转换,并产生自己的返回结果 R;whenComplete 的返回值和上级任务传入的结果一致,不能转换。
whenComplete、whenCompleteAsync、handle 和 handleAsync 的输入参数一个是正常结果一个是异常结果,而 exceptionally 的输入参数为异常结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void main (String[] args) throws Exception { var supplyAsyncTask = CompletableFuture.supplyAsync(() -> { return "supplyAsyncTask" ; }); var handle = supplyAsyncTask.handle((s, throwable) -> { if (Optional.ofNullable(throwable).isPresent()) { return throwable.getMessage(); } return new ArrayList () {{ add(s); }}; }); System.out.println(handle.get()); }
exceptionally 某个任务执行抛出异常时执行的回调方法。抛出异常作为参数,传递到回调方法。仅处理异常情况。如果任务成功完成,那么将被跳过。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) throws Exception { var supplyAsyncTask = CompletableFuture.supplyAsync(() -> { double error=1 /0 ; return "ok" ; }, executor).exceptionally((e)->{ System.out.println(e.getMessage()); return "error" ; }); System.out.println(supplyAsyncTask.get()); }
complete 如果尚未完成,则将 get()
和相关方法返回的值设置为给定值。如果此调用导致此 CompletableFuture 转换到完成状态,则返回 true,否则返回 false。文档描述:
1 2 3 4 5 6 If not already completed, sets the value returned by get() and related methods to the given value. Params: value – the result value Returns: true if this invocation caused this CompletableFuture to transition to a completed state, else false
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static void main (String[] args) throws Exception { var task1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(15 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return 10 ; }); System.out.println(task1.complete(5 )); System.out.println(task1.get()); System.out.println(task1.complete(50 )); }
thenCompose 源码定义
1 2 3 public <U> CompletableFuture<U> thenCompose (Function<? super T, ? extends CompletionStage<U>> fn) ;public <U> CompletableFuture<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn) ;public <U> CompletableFuture<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
thenCompose 方法会在某个任务执行完成后,将该任务的执行结果作为入参,执行指定的方法。该方法会返回一个新的 CompletableFuture 实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public static void main (String[] args) throws Exception { var task1 = CompletableFuture.supplyAsync(() -> 10 ); var task2 = task1.thenCompose(param -> { System.out.println("this is task2 param=" + param); return CompletableFuture.supplyAsync(() -> { System.out.println("this is task2 square" ); return Math.pow(param, 2 ); }); }).thenApply(param -> { System.out.println("thenApply get the square=" + param); return param; }); var task3 = task1.thenCompose(param -> { System.out.println("this is task3 param=" + param); return CompletableFuture.runAsync(() -> { System.out.println("this is task3 square" ); System.out.println(Math.pow(param, 2 )); }); }); System.out.println("task2 get=" + task2.get()); System.out.println("task3 get=" + task3.get()); } 输出: this is task2 param=10 this is task2 squarethenApply get the square=100.0 this is task3 param=10 this is task3 square100.0 task2 get=100.0 task3 get=null
allOf 静态方法,阻塞等待所有给定的 CompletableFuture 执行结束后,返回一个 CompletableFuture<Void>
结果。所有任务都执行完成后,才执行 allOf 的回调方法。如果任意一个任务异常,执行 get 方法时会抛出异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public static void main (String[] args) throws Exception { var task1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "task1" ; }); var task2 = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } int value = 1 / 0 ; System.out.println("task2 is over" ); }); CompletableFuture.allOf(task1, task2).whenComplete((param, throwable) -> { System.out.println(param); }).exceptionally(throwable -> { System.out.println("task3 allOf throwable=" + throwable.getMessage()); return null ; }).get(); }
anyOf 静态方法,阻塞等待任意一个给定的 CompletableFuture 对象执行结束后,返回一个 CompletableFuture<Void>
结果。任意一个任务执行完,就执行 anyOf 的回调方法。如果执行的任务异常,执行 get 方法时会抛出异常。