同事碰到一个问题,我写了个 demo 复现,研究了好几天还是没头绪,多线程程场景也没有调试思路,干脆发个帖,想看看有没有大佬可以指点一二。
模拟场景:三个消费组消费异步消费,每组有三个任务,任务之间异步执行,但必须都执行完毕后消费组才算结束。 设计上,消费组线程池给 3 个线程,控制每次只有三个组能消费。 任务线程池给的是大于 3*3,按我的理解是,外层 3 个消费组,每组三个任务,实时任务应该不会超过 9 个。
但程序执行一会儿就发现会有消费组批量涌入,导致里层线程池触发 reject 。
Demo 如下:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor outter = new ThreadPoolExecutor(3, 3, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(197));
ThreadPoolExecutor inner = new ThreadPoolExecutor(9, 12, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));
for (int i = 0; i < 200; i++) {
int group = i;
outter.execute(() -> {
System.out.println("开始第 " +group+" 组消费");
CountDownLatch countDownLatch = new CountDownLatch(3);
for (int j = 0; j < 3; j++) {
int task = j;
inner.execute(() -> {
System.out.println(group + "--消费数据:" + task);
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
System.out.println(group + "--消费完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
晚上从头到尾读了一下ThreadPoolExecutor源码,从注释开始读起,对线程池有了一个更加明确的认识,同时也发现昨天的理解任然有些不对。
昨天提到的 execute 中的 这段:
if (workerCountOf(c) < corePoolSize) { @1
if (addWorker(command, true)) @2
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { @3
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) @4
reject(command);
else if (workerCountOf(recheck) == 0) @5
addWorker(null, false);
}
还有很多细节,并为理解的很透彻,这里结论也写的比较简洁和枯燥,如有错误的还请大佬不吝指正。
1
micean 2020-11-05 02:21:44 +08:00
个人猜测
countDownLatch.countDown() 之后,outter 完成了这次任务并开始下一个了,但是 inner 还没有完成,队列塞不下 把 inner 的队列从 1 调高一些就行了 |
2
season8 OP @micean
countDownLatch 不是最后一个 inner 线程执行完成后唤醒 outter 线程吗?那 outter 线程结束应该就意味着有三个 inner 结束。 而且,尝试过,countDownLatch.await();之后 sleep,也是存在这个问题 |
3
Vedar 2020-11-05 09:27:07 +08:00
你这个 outer 不停的在刷,第五组消费的时候就已经超过 inner 的容纳能力了 肯定会 reject 掉,主要原因还是你 outer 没有阻塞
|
5
micean 2020-11-05 10:31:38 +08:00
@season8
outter 线程结束并不意味着有三个 inner 结束,countDownLatch 释放之后,outter 和 3 个 inner 可没有先后执行顺序 |
6
lancelee01 2020-11-05 10:33:24 +08:00
3L 正解,countDownLatch 没什么用,感觉你的场景需要的是限流器,全局限流即可。同时 LinkedBlockingQueue 这个队列的可能和你想的不太一样,网上的八股文不对,你试试-_-!
|
7
wysnylc 2020-11-05 10:42:27 +08:00
别用 countDownLatch,换成 Completablefuture
|
8
1194129822 2020-11-05 12:00:48 +08:00
建议创建线程池时传 ThreadFactory 参数,打印不要用 System.out.println,请换成 log 可以查看是具体那条线程。inner 线程池最大任务数 = 12 + 1 (建议不要设置非核心线程)。出现这个问题并没有什么高深的原理,仅仅是线程运行的不确定性,第一轮 outter 给 inner 提交了 9 个 task,此时 inner 正常,outter 三个线程被正确的阻塞。当 inner 运行所有 countDown 后,第一轮 inner 执行还没完全结束,outter 三个线程被唤醒,**注意**,此时线程执行没有了先后顺序和逻辑关系,完全靠 os 调度器调度,如果第二轮 outter 线程三个线程先提交任务,此时 inner 线程池最多可以接受 4 个任务,就是说这一轮已经可能出现错误了。而且一旦触发 RejectedExecutionException,try-catch 没有捕获这个异常,则直接杀死 outter 的核心线程,造成 outter 线程池,execute->Rejected->kill thread->create thread->execute 的恶性循环。代码根本没走到 await,所以一旦 Rejected 就不再阻塞了。
|
9
micean 2020-11-05 12:51:15 +08:00
源码里是这样的:
final void runWorker(Worker w) { Runnable task = w.firstTask ... try { while (task != null || (task = getTask()) != null) { //queueSize-1 (返回 null 时 workerSize-1 ) ... task.run(); // 任务跑完了 ... } } finally { processWorkerExit(w, completedAbruptly); // workerSize-1 } } 而 reject 的条件是 [queue 满] 或者 [worker 满] ,你觉得 countDown 结束了,其实只是跑完了 run()而已 |
10
zoharSoul 2020-11-05 13:37:58 +08:00
这直接用 rxjava 多方便啊...
|
11
yexiangyang 2020-11-05 14:16:03 +08:00
@micean 这个源码分析很有道理啊!
|
12
season8 OP @Vedar @1194129822 @lancelee01 @micean 感谢各位的热情解答,我很受启发。再结合朋友给的例子,我仔细读了下源码,已经大致能复盘这个错误了。
**inner 线程池 reject 的原因:** 1. 主要原因:队列太小,这里给的是 1,实际每个 outer 线程要产生 3 个任务 2. 次要原因:outter 线程里面使用 countdownlatch 确实不能起到很好的限流作用, **次要原因分析:** 如 runWorker()源码所示,run 执行完毕并不能代表线程任务执行完毕。这意味着 outter 线程与 inner 线程的空闲线程数可能不是 1:3 的关系。但这里可以通过让 outter 线程 sleep 等待 inner 先执行完成,规避这个因素的影响。规避后,问题还是会存在,说明不是主要原因。 **主要原因分析:** 先来看个案例 ``` static class MyLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> { public MyLinkedBlockingQueue(int capacity) { super(capacity); } @Override public boolean offer(E o) { System.out.println("任务加入,当前队列数:" + this.size()); return super.offer(o); } } public static void main(String[] args) throws InterruptedException { BlockingQueue queue = new MyLinkedBlockingQueue<>(1); // 3 个线程的线程池 ThreadPoolExecutor taskPoolExecutor = new ThreadPoolExecutor(3, 3, 30, TimeUnit.SECONDS, queue); // 先将线程池拉满 for (int i = 0; i < 3; i++) { final int finalI = i; taskPoolExecutor.execute(() -> { logger.info("{}", finalI); }); } // 等待全部任务执行完 Thread.sleep(1000); // 再次执行任务,发现每一个任务都触发加入队列操作。 for (int i = 10; i < 12; i++) { final int finalI = i; // 多线程更容易触发 reject // new Thread(()-> taskPoolExecutor.execute(() -> logger.info("{}", finalI))).start(); taskPoolExecutor.execute(() -> logger.info("{}", finalI)); } } ``` 执行结果: > 23:12:39.988 [pool-1-thread-3] INFO c.r.s.Demo8.lambda$main$0:34 - 2 23:12:39.988 [pool-1-thread-2] INFO c.r.s.Demo8.lambda$main$0:34 - 1 23:12:39.988 [pool-1-thread-1] INFO c.r.s.Demo8.lambda$main$0:34 - 0 任务加入,当前队列数:0 23:12:40.997 [pool-1-thread-3] INFO c.r.s.Demo8.lambda$null$1:46 - 10 任务加入,当前队列数:0 23:12:41.000 [pool-1-thread-2] INFO c.r.s.Demo8.lambda$null$1:46 - 11 跑完这个案例我感觉我根本不懂线程池,我翻了下源码: ``` public void execute(Runnable command) { ... int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 线程池满了后,直接不创建核心线程了 // 这里 isRunning 看的我懵逼,明明任务都执行完了,为啥还是 isRunning,先接受,后面再研究 [1] // 然后就触发入队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); ``` 我以为的线程池是:只要有空闲线程,任务是直接丢给线程去执行的。 **实际情况是:当核心线程数满,不管已有线程是否空闲,任务是先丢到队列,然后空闲线程从队列里面自取。** 案例中,我给的队列大小是 1,当队列满的时候,会扩容线程池到最大线程池大小到 12,此时如果队列是满的(不管线程是否空闲),继续添加就会 reject 。案例中每组有三个任务,只要线程从队列 take 任务不及时,队列很容易满,从而触发 reject 。 **验证:** 1. countDownLatch.await(); 后面加上 sleep,让 outter 线程等 inner 线程结束,排除最开始说的第二个因素的影响。 2. 将队列改成 3,适当调整线程执行时间(也可以不调),reject 很少触发或不触发。 3. 将队列改成 9,没有触发 reject **总结:** 1. 这个任务表面是多线程嵌套调用,内外线程调度不确定性导致的线程池问题,其实本质是对线程池理解不对导致线程池滥用的问题。 2. 任务是添加到队列,空闲线程调用 take()获取,而不是有空闲线程就直接丢到空闲线程(实际任务也难以主动去找空闲线程,还容易造成等待,让线程自取则是生产消费的模式。) 3. isRunning(c) 这个方法以及相关机制,还要再研究一下。 再次感谢各位,如有不对的地方,还请指出。。 |
13
season8 OP 啊。。评论不支持 md,排版好丑,又有点长,各位见谅。
|