public static void test() {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
String supplyAsyncResult = " "+Thread.currentThread().getName()+" Hello world! ";
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(supplyAsyncResult);
return supplyAsyncResult;
}).thenApplyAsync(r -> { //添加后续任务
String thenApplyResult = Thread.currentThread().getName()+r + " thenApply! ";
System.out.println(thenApplyResult);
return thenApplyResult;
});
try {
System.out.println(completableFuture.get() + " finish!");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
打印:
ForkJoinPool.commonPool-worker-9 Hello world!
ForkJoinPool.commonPool-worker-9 ForkJoinPool.commonPool-worker-9 Hello world! thenApply!
ForkJoinPool.commonPool-worker-9 ForkJoinPool.commonPool-worker-9 Hello world! thenApply! finish!
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null; //只是为了防止内存泄漏,方便 GC
if (d.result == null) {
try {
d.completeValue(f.get()); //执行 task
} catch (Throwable ex) { //执行 task 期间抛出了异常
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
从源码上来看,supplyAsync 新起了一个线程,等到线程执行完 task,开始执行 d.postComplete(),即开始执行后续 task,然后 postComplete 会执行后续 task 的 completion 对象的 tryFire 方法。
static final class UniApply<T,V> extends UniCompletion<T,V> {
Function<? super T,? extends V> fn;
UniApply(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src,
Function<? super T,? extends V> fn) {
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniApply(a = src, fn, mode > 0 ? null : this))//这里会发现前一个 stage 执行完毕,但提供了线程池
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
}
final <S> boolean uniApply(CompletableFuture<S> a,
Function<? super S,? extends T> f,
UniApply<S,T> c) {
Object r; Throwable x;
if (a == null || (r = a.result) == null || f == null)
return false;
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
if (c != null && !c.claim())//会执行到这里,然后发现 claim 返回 false
return false;
@SuppressWarnings("unchecked") S s = (S) r;
completeValue(f.apply(s));
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
final boolean claim() {
Executor e = executor;
if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
if (e == null)
return true;
executor = null; // disable
e.execute(this); //会执行到这里,然后把 this completion 对象提交给线程池执行,当前线程即将返回
}
return false;
}
我的问题在于,当 worker-9 线程执行完第一个 task 之后,它把第二个 task 提交给了 executor (e.execute(this)
),然后线程就返回了(从 claim 函数一层一层返回,直到返回 postComplete )。那为什么第二个 task 从打印结果来看,还是同一个 worker-9 线程来执行的?
还是说,只是因为我的例子比较简单,所以 executor 没有分配一个新的线程出来,其他情况下,thenApplyAsync 里面在执行e.execute(this)
时,还是有可能新起一个线程的吗?
1
passerbytiny 2020-08-22 15:47:17 +08:00 via Android
supplyAsync 和 thenApplyAsync 虽然都是异步调用,但它们两个之间是串行的,为什么就不能在一个线程(执行器)中被执行。
|
2
amiwrong123 OP @passerbytiny
没说不可以,它们之间肯定是串行的,但不一定是同一个线程吧。从源码上可见,supplyAsync 的线程并不是直接执行下一个 task 的,因为它 e.execute(this)之后就马上返回了。 |
3
zyoo 2020-08-22 16:05:42 +08:00
async 的语义是不一定同一个线程,所以这个只能说是巧合了,你可以多试几把?
|
4
amiwrong123 OP @zyoo
多试几次也一样。我怀疑这跟 ForkJoinPool.commonPool()的线程调度有关系,但我现在还没来得及看它的原理呢。。 |
5
passerbytiny 2020-08-22 17:02:26 +08:00 via Android
@amiwrong123 异步任务都是将任务提交给执行器去执行的,而不是从线程池取出一个线程用来执行任务。选择哪个线程是由执行器自行决定的,任务的提交者很难也不该对线程选择产生影响。
从高层次上看,thenApplyAsync 是在 applyAsync 完成之后执行的,所以最优选择就是两者使用同一个线程(线程唤醒也是有成本的)。从源码看,你要主要看的应该是 Executor 的源码。 |
6
amiwrong123 OP @passerbytiny
好吧,大概理解了。主要之前我以为我这个例子,applyAsync 和 thenApplyAsync 的执行线程肯定是同一个线程,但从源码上看发现 前一个线程只是提交任务给 Executor 而已。 所以,applyAsync 和 thenApplyAsync 的执行线程不一定是同一个呗。只是这个例子里,线程池是这样调度的。 |
7
XuHuan1025 2020-08-22 22:20:55 +08:00
木宝厉害哦
|
8
RedBeanIce 2020-08-24 19:15:47 +08:00
@amiwrong123 #5
@passerbytiny #6 JDK8 求问一下,我也是看到这里将任务提交到 Executor e.execute(new AsyncRun(d, f)); 那么下一步应该看 forkjoinpool ?因为默认是他。 |