linux rocketmq 4.7.1
rocketmq-spring-boot-starter 2.1.1
public class FilterProducerTag {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.42.131:9876");
producer.start();
String[] stringArr = {"TagA", "TagB", "TagC"};
for (int i = 0; i < 6; i++) {
Message message = new Message();
message.setTopic("TopicTestFilter");
// message.setTags(stringArr[i % stringArr.length]);
message.setTags("TagA");
message.setKeys("KEY" + i);
message.setBody(("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// Set some properties.
SendResult sendResult = producer.send(message);
System.out.println("=========================================");
System.out.println(sendResult);
System.out.println(message);
System.out.println("=========================================");
}
producer.shutdown();
}
}
如下
public class FilterConsumerTag {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("192.168.42.131:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// only subsribe messages have property a, also a >=0 and a <= 3
// consumer.subscribe("TopicTestFilter", MessageSelector.bySql("a between 0 and 2"));
consumer.subscribe("TopicTestFilter", "TagA || TagB");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : messageExtList) {
System.out.println("========================================================");
System.out.println(messageExt);
System.out.println(new String(messageExt.getBody()));
System.out.println("========================================================");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("consumer");
}
}
我在消费者方只收到生产者的四条消息,按道理我应该收到 6 条,全部
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
=========================================
SendResult [sendStatus=SEND_OK, msgId=64774C6B275C18B4AAC25A56EFC60000, offsetMsgId=C0A82A8300002A9F00000000000CF5C0, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=0], queueOffset=31]
Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY0, UNIQ_KEY=64774C6B275C18B4AAC25A56EFC60000, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}
=========================================
=========================================
SendResult [sendStatus=SEND_OK, msgId=64774C6B275C18B4AAC25A56EFEE0001, offsetMsgId=C0A82A8300002A9F00000000000CF68F, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=1], queueOffset=30]
Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY1, UNIQ_KEY=64774C6B275C18B4AAC25A56EFEE0001, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}
=========================================
=========================================
SendResult [sendStatus=SEND_OK, msgId=64774C6B275C18B4AAC25A56EFF80002, offsetMsgId=C0A82A8300002A9F00000000000CF75E, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=2], queueOffset=23]
Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY2, UNIQ_KEY=64774C6B275C18B4AAC25A56EFF80002, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}
=========================================
=========================================
SendResult [sendStatus=SEND_OK, msgId=64774C6B275C18B4AAC25A56EFFB0003, offsetMsgId=C0A82A8300002A9F00000000000CF82D, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=3], queueOffset=22]
Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY3, UNIQ_KEY=64774C6B275C18B4AAC25A56EFFB0003, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51], transactionId='null'}
=========================================
=========================================
SendResult [sendStatus=SEND_OK, msgId=64774C6B275C18B4AAC25A56EFFD0004, offsetMsgId=C0A82A8300002A9F00000000000CF8FC, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=0], queueOffset=32]
Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY4, UNIQ_KEY=64774C6B275C18B4AAC25A56EFFD0004, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='null'}
=========================================
=========================================
SendResult [sendStatus=SEND_OK, msgId=64774C6B275C18B4AAC25A56F0020005, offsetMsgId=C0A82A8300002A9F00000000000CF9CB, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=1], queueOffset=31]
Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY5, UNIQ_KEY=64774C6B275C18B4AAC25A56F0020005, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53], transactionId='null'}
=========================================
13:00:47.034 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.42.131:10911] result: true
13:00:47.036 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.42.131:9876] result: true
Process finished with exit code 0
========================================================
========================================================
MessageExt [brokerName=localhost.localdomain, queueId=0, storeSize=207, queueOffset=31, sysFlag=0, bornTimestamp=1602997246918, bornHost=/192.168.42.1:6700, storeTimestamp=1602997246973, storeHost=/192.168.42.131:10911, msgId=C0A82A8300002A9F00000000000CF5C0, commitLogOffset=849344, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTestFilter', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=33, KEYS=KEY0, CONSUME_START_TIME=1602997259086, UNIQ_KEY=64774C6B275C18B4AAC25A56EFC60000, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]
Hello RocketMQ 0
MessageExt [brokerName=localhost.localdomain, queueId=0, storeSize=207, queueOffset=32, sysFlag=0, bornTimestamp=1602997246973, bornHost=/192.168.42.1:6700, storeTimestamp=1602997247002, storeHost=/192.168.42.131:10911, msgId=C0A82A8300002A9F00000000000CF8FC, commitLogOffset=850172, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTestFilter', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=33, KEYS=KEY4, CONSUME_START_TIME=1602997259086, UNIQ_KEY=64774C6B275C18B4AAC25A56EFFD0004, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='null'}]
========================================================
========================================================
========================================================
Hello RocketMQ 4
========================================================
MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=207, queueOffset=31, sysFlag=0, bornTimestamp=1602997246978, bornHost=/192.168.42.1:6700, storeTimestamp=1602997247011, storeHost=/192.168.42.131:10911, msgId=C0A82A8300002A9F00000000000CF9CB, commitLogOffset=850379, bodyCRC=1424393152, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTestFilter', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=32, KEYS=KEY5, CONSUME_START_TIME=1602997259086, UNIQ_KEY=64774C6B275C18B4AAC25A56F0020005, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53], transactionId='null'}]
MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=207, queueOffset=30, sysFlag=0, bornTimestamp=1602997246958, bornHost=/192.168.42.1:6700, storeTimestamp=1602997246992, storeHost=/192.168.42.131:10911, msgId=C0A82A8300002A9F00000000000CF68F, commitLogOffset=849551, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTestFilter', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=32, KEYS=KEY1, CONSUME_START_TIME=1602997259086, UNIQ_KEY=64774C6B275C18B4AAC25A56EFEE0001, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]
Hello RocketMQ 5
========================================================
Hello RocketMQ 1
========================================================