想试用一下阻塞列队 做了个生产者和消费者的 demo 预期结果就是相互交替执行也就是 生产一个之后,消费一个
不允许连续生产或者连续消费
但是如果不让生产线程 sleep 就会无法实现交替执行的效果 我是没想到是什么原因
public static void main(String[] args) throws IOException {
BlockingQueue<Integer> bq = new LinkedBlockingQueue<Integer>(1);
Producer p1 = new Producer(bq);
p1.setName("producer01");
Customer c1 = new Customer(bq);
c1.setName("customer01");
p1.start();
c1.start();
}
public class Producer extends Thread {
private BlockingQueue<Integer> bq;
public Producer(BlockingQueue<Integer> bq) {
this.bq = bq;
}
@Override
public void run() {
while (true) {
try {
bq.put(produce());
Thread.sleep(0,1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private Integer produce() {
Integer number = (new Random().nextInt(100));
System.out.println(getName() + ":produced =====> " + number);
return number;
}
}
public class Customer extends Thread {
private BlockingQueue<Integer> bq;
public Customer(BlockingQueue<Integer> bq) {
this.bq = bq;
}
@Override
public void run() {
while (true) {
try {
consume();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void consume() throws InterruptedException {
System.out.println(getName() + ":consumed:" + bq.take());
}
}
1
watzds 2018-07-19 00:07:35 +08:00 via Android
什么叫交替执行?看输出不准吧
|
2
chocotan 2018-07-19 00:09:02 +08:00
Thread.sleep(0,1) 实际上是 sleep 了 1ms 吧
bq.take()耗时小于 1ms,所以看起来是交替执行 去掉 sleep 之后,bq.take()拿到数据比循环到下一个 produce()时要慢,所以看起来不是交替执行 |
3
zhady009 OP producer01:produced =====> 63
customer01:consumed:63 producer01:produced =====> 70 customer01:consumed:70 producer01:produced =====> 16 customer01:consumed:16 producer01:produced =====> 25 customer01:consumed:25 像这样的如果不加 sleep 会如下, producer01:produced =====> 70 producer01:produced =====> 16 customer01:consumed:70 customer01:consumed:16 producer01:produced =====> 25 customer01:consumed:25 |
4
sagaxu 2018-07-19 00:15:32 +08:00 via Android
take 和 put 是交替执行的,但 println 不是
|
5
zhady009 OP Thread.sleep(0,1) 是一纳秒吧 Thread.sleep(1)才是 1 毫秒
put 方法如果队列满了,将阻塞当前线程 take 方法列队为空,将阻塞当前线程 |
6
chocotan 2018-07-19 00:20:36 +08:00
@zhady009 你看一下这个方法的源码
``` if (nanos >= 500000 || (nanos != 0 && millis == 0)) { millis++; } sleep(millis); ``` |
7
zhady009 OP 那如何让
System.out.println(getName() + ":consumed:" + bq.take()); 变成原子性 |
8
lcorange 2018-07-19 00:21:48 +08:00
比如这句 System.out.println(getName() + ":consumed:" + bq.take());
可以保证一定是 bq.take()之后,生产者才能 bq.put(),这个可以保证顺序 但是外层的 System.out.println 函数你是无法保证他一定会紧接着 bq.take()后面执行,拖延到生产者 sysout 后也是有可能的 |
10
pwrliang 2018-07-19 00:26:25 +08:00
我一开始也是认为 sysout 的问题,但是我统计了调用序列,也是交替的啊。
import java.io.IOException; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; public class Test { public static void main(String[] args) throws IOException, InterruptedException { BlockingQueue<Integer> bq = new LinkedBlockingQueue<Integer>(1); AtomicInteger seq =new AtomicInteger(0); Producer p1 = new Producer(bq,seq); p1.setName("producer01"); Customer c1 = new Customer(bq,seq); c1.setName("customer01"); p1.start(); c1.start(); } } class Producer extends Thread { AtomicInteger seq; private BlockingQueue<Integer> bq; public Producer(BlockingQueue<Integer> bq,AtomicInteger seq) { this.bq = bq; this.seq = seq; } @Override public void run() { while (true) { try { bq.put(produce()); // Thread.sleep(0,1); } catch (InterruptedException e) { e.printStackTrace(); } } } private Integer produce() { Integer number = (new Random().nextInt(100)); int sid=seq.addAndGet(1); System.out.println("seq:"+sid+getName() + ":produced =====> " + number); System.out.flush(); return number; } } class Customer extends Thread { AtomicInteger seq; private BlockingQueue<Integer> bq; public Customer(BlockingQueue<Integer> bq,AtomicInteger seq) { this.bq = bq; this.seq = seq; } @Override public void run() { while (true) { try { consume(); } catch (Exception e) { e.printStackTrace(); } } } private void consume() throws InterruptedException { int sid=seq.addAndGet(1); System.out.println("seq:" + sid + getName() + ":consumed:" + bq.take()); System.out.flush(); } } ---------------------------------------- seq:2producer01:produced =====> 44 seq:3producer01:produced =====> 97 seq:1customer01:consumed:44 seq:4customer01:consumed:97 seq:5producer01:produced =====> 19 seq:7producer01:produced =====> 88 seq:6customer01:consumed:19 seq:8producer01:produced =====> 90 seq:9customer01:consumed:88 seq:10producer01:produced =====> 93 seq:11customer01:consumed:90 seq:12producer01:produced =====> 40 |
12
lcorange 2018-07-19 00:38:30 +08:00 1
@zhady009 这个是无解的,除非整个函数都包上锁,这时这个队列就变得毫无疑义了
如果按照命令的顺序拆分,生产者分成 P,消费者分成 C P1 print number P2 bq.put(number) P3 print number P4 bq.put(number) P5 print number P6 bq.put(number) C1 bq.take() C2 print number C3 bq.take() C4 print number C5 bq.take() C6 print number 当按照以下顺序执行的时候 P1 P2 P3 C1 C2 P4 C3 C4 ...就会出现你所说的两条日志 其实内部的 P2 C1 P4 C3 还是保证了两边的顺序的 |
13
cheneydog 2018-07-19 00:41:36 +08:00
我觉得是打印输出的问题,队列本身应该没问题,只是两个线程共用一个输出流 System.out ,结果无法控制。
|
14
lcorange 2018-07-19 00:43:22 +08:00
@pwrliang AtomicInteger LinkedBlockingQueue 只保证调用这两个对象的函数时能够保证原子性,但是整个 product 和 consume 函数上没有这样的锁,所以执行顺序是不能保证的
|
16
lcorange 2018-07-19 00:50:54 +08:00
@zhady009 只是运气好加系统负载不大,sleep 的时间里让 print 函数有机会执行,加大负载,长时间测试一样会出现这个现象
|
17
sagaxu 2018-07-19 09:50:50 +08:00 via Android
@zhady009 因为 sleep 改变了占空比,cpu 大部分时间是空闲的,错开了你的两组操作。试想一下,往平底锅里,同时扔 8 个鸡蛋,鸡蛋之间一定会有碰撞,发生空间的争抢,但是同时扔 8 粒芝麻,很大概率是散落不碰撞的。
|
18
reus 2018-07-19 12:42:42 +08:00
线程是并发执行的,当然不能保证交替执行。
|
19
pwrliang 2018-07-19 22:12:15 +08:00
这回可以了,要保证 put+sysout, take+sysout 是原子性的,只能加个全局锁。
public class Test { public static void main(String[] args) throws IOException, InterruptedException { Lock lock = new ReentrantLock(); BlockingQueue<Integer> bq = new LinkedBlockingQueue<Integer>(1); AtomicInteger seq =new AtomicInteger(0); Producer p1 = new Producer(bq,seq,lock); p1.setName("producer01"); Customer c1 = new Customer(bq,seq,lock); c1.setName("customer01"); p1.start(); c1.start(); } } class Producer extends Thread { AtomicInteger seq; Lock lock; private BlockingQueue<Integer> bq; public Producer(BlockingQueue<Integer> bq,AtomicInteger seq,Lock lock) { this.bq = bq; this.seq = seq; this.lock = lock; } @Override public void run() { while (true) { try { if(bq.size()==1)continue; lock.lock(); bq.put(produce()); lock.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } } } private Integer produce() { Integer number = (new Random().nextInt(100)); int sid=seq.addAndGet(1); System.out.println("seq:"+sid+getName() + ":produced =====> " + number); System.out.flush(); return number; } } class Customer extends Thread { AtomicInteger seq; private BlockingQueue<Integer> bq; Lock lock; public Customer(BlockingQueue<Integer> bq,AtomicInteger seq,Lock lock) { this.bq = bq; this.seq = seq; this.lock = lock; } @Override public void run() { while (true) { try { if (bq.size()==0)continue; lock.lock(); consume(); lock.unlock(); } catch (Exception e) { e.printStackTrace(); } } } private void consume() throws InterruptedException { int tk = bq.take(); int sid=seq.addAndGet(1); System.out.println("seq:" + sid + getName() + ":consumed:" + tk); System.out.flush(); } } ------------------------------------------------- public class Test { public static void main(String[] args) throws IOException, InterruptedException { Lock lock = new ReentrantLock(); BlockingQueue<Integer> bq = new LinkedBlockingQueue<Integer>(1); AtomicInteger seq =new AtomicInteger(0); Producer p1 = new Producer(bq,seq,lock); p1.setName("producer01"); Customer c1 = new Customer(bq,seq,lock); c1.setName("customer01"); p1.start(); c1.start(); } } class Producer extends Thread { AtomicInteger seq; Lock lock; private BlockingQueue<Integer> bq; public Producer(BlockingQueue<Integer> bq,AtomicInteger seq,Lock lock) { this.bq = bq; this.seq = seq; this.lock = lock; } @Override public void run() { while (true) { try { if(bq.size()==1)continue; lock.lock(); bq.put(produce()); lock.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } } } private Integer produce() { Integer number = (new Random().nextInt(100)); int sid=seq.addAndGet(1); System.out.println("seq:"+sid+getName() + ":produced =====> " + number); System.out.flush(); return number; } } class Customer extends Thread { AtomicInteger seq; private BlockingQueue<Integer> bq; Lock lock; public Customer(BlockingQueue<Integer> bq,AtomicInteger seq,Lock lock) { this.bq = bq; this.seq = seq; this.lock = lock; } @Override public void run() { while (true) { try { if (bq.size()==0)continue; lock.lock(); consume(); lock.unlock(); } catch (Exception e) { e.printStackTrace(); } } } private void consume() throws InterruptedException { int tk = bq.take(); int sid=seq.addAndGet(1); System.out.println("seq:" + sid + getName() + ":consumed:" + tk); System.out.flush(); } } |
20
pwrliang 2018-07-19 22:13:26 +08:00
@pwrliang 刚刚结果粘贴错了
-------------------------------------------------- seq:1producer01:produced =====> 45 seq:2customer01:consumed:45 seq:3producer01:produced =====> 20 seq:4customer01:consumed:20 seq:5producer01:produced =====> 78 seq:6customer01:consumed:78 seq:7producer01:produced =====> 45 seq:8customer01:consumed:45 seq:9producer01:produced =====> 90 seq:10customer01:consumed:90 seq:11producer01:produced =====> 57 seq:12customer01:consumed:57 |