1
antipro 2023-04-25 11:52:43 +08:00 via Android
给 B ,C ,D 各建一个 Kafka
|
2
aijam 2023-04-25 11:57:02 +08:00
1F +1
|
3
cloudzhou 2023-04-25 12:09:09 +08:00
给 B ,C ,D 各建一个 Topic 就可以
|
4
dddd1919 2023-04-25 12:10:29 +08:00
如果只让 BCD 接收到自己的消息,那就在 push 时分三个 topic ,直接把消息隔离开,缺点就是负载可能不均服务利用率降低
如果只让 BCD 处理自己要的消息并忽略掉无意义消息,可以在各自 consumer 加 filterStrategy 过滤掉无关消息 |
5
NoKey OP 建多个 topic 的麻烦点就是,后续要不断的增加 topic ,有没有办法,一个 topic 就可以解决呢?😂
|
6
ChaYedan666 2023-04-25 12:37:41 +08:00
@NoKey 不可能吧,不论怎么说,只要是都发一个 topic ,那么 BCD 就得把里面的消息拉过来做过滤,过滤后再消费自己的;或者另一种就是你自己说的第二种,同一个消费者组,监听不同分区,根据 key 发不同的分区,分区不均衡啥的就你得自己控制了
|
7
wuYin 2023-04-25 12:59:56 +08:00 via iPhone
也许可以用 2 个 kafka 集群,A 写集群 1 ,自己写个 connector 做消息解析与分发,写到集群 2 的三个 topic ,再由 B C D 各自消费。
这种做法引入了新的集群和组件,成本和维护代价更高。可行但不建议 |
8
securityCoding 2023-04-25 13:02:37 +08:00 via Android
kafka 不好做,换阿里云 rocketmq 加 tag
|
9
kaddusabagei38 2023-04-25 13:39:54 +08:00
建议换队列
|
10
urnoob 2023-04-25 13:47:16 +08:00
B C D 各自作为一个消费者组。
|
11
waitwait365 2023-04-25 13:51:24 +08:00
用 rabbitmq
|
12
zgzhang 2023-04-25 14:04:05 +08:00
kafka stream 做个任务来处理
|
13
WhereverYouGo 2023-04-25 14:12:41 +08:00
在消息体里定义 business_type: B 、C 、D ,然后引进一个中间层 X ,X 直接消费 A 发送的消息,并根据 business_type 决定调用( HTTP 或 RPC ) B 、C 、D 。(计算机科学中的每个问题都可以用一间接层解决 doge )
|
14
WhereverYouGo 2023-04-25 14:14:48 +08:00
但是上述方案有个问题:B 、C 、D 直接接受流量的冲击,没有 MQ 来缓冲,服务可能会被打爆
|
15
fkdog 2023-04-25 14:20:54 +08:00
明明有现成的高速公路,多建两个 topic 的事,你非得要自己单独再修一条路。我不知道怎么评价你这个需求。。
“为了方便”,请问改成 3 个 topic 不方便在哪里? |
16
awinds 2023-04-25 14:51:37 +08:00
除非你真的有需求有另外的 E 同时消费所有数据,不然就多个 topic 吧
|
17
lower 2023-04-25 14:53:23 +08:00
@WhereverYouGo 感觉问题不大,X 其实已经在 mq 后面了,慢慢一个一个取消息就行
|
18
Super8 2023-04-25 15:14:49 +08:00
可以在消息的 key 或者 value 中添加标识,例如在消息的 key 中添加 B 、C 、D 等标识,表示该消息是发给 B 、C 、D 的,然后在消费者端使用带有过滤条件的消费者来消费消息,只消费自己需要的消息。具体可以使用 Kafka 的 Consumer API 提供的 subscribe 方法中的参数来实现,例如使用 subscribe(Collections.singleton(topic), new MyPartitionAssignor()) 方法,其中 MyPartitionAssignor 实现了 PartitionAssignor 接口,可以根据标识来分配分区。另外,也可以使用 Kafka Streams 来实现消息过滤和分发。
|
19
Super8 2023-04-25 15:15:32 +08:00
rocketmq 中 tag 最适合这个场景
|
20
zhaoyy0513 2023-04-25 16:12:30 +08:00
@Super8 我创建的 KafkaConsumer 用到的 api 里面没有这两个参数的方法啊老哥,你说的这个 kafka 是哪个版本的啊
|
21
zhaoyy0513 2023-04-25 16:23:45 +08:00
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; import java.util.Properties; public class KafkaStreamsExample { public static void main(String[] args) { // 设置 Kafka Streams 属性 Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 创建 Kafka Streams 实例 StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> stream = builder.stream("topic-A"); // 根据消息的 key 将消息路由到不同的分区中 stream.selectKey((key, value) -> key) .through("topic-A-shuffle") .groupByKey() .foreach((key, value) -> { // 处理消息 System.out.println("Processed message: " + value); }); // 将处理后的消息发送到下游服务 stream.mapValues(value -> "processed " + value) .to("topic-B", Produced.with(Serdes.String(), Serdes.String())); KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props); kafkaStreams.start(); } } 在上面的代码中,首先使用 selectKey()方法将消息的 key 作为新的 key ,然后使用 through()方法将消息发送到一个新的 Topic 中,这个新的 Topic 会使用 Kafka 默认的分区策略将消息路由到不同的分区中。然后,我们使用 groupByKey()方法将同一个 key 的消息分组,确保每个消费者只消费自己需要的消息。最后,我们使用 foreach()方法处理分组后的消息,并使用 mapValues()方法将处理后的消息发送到下游服务。 需要注意的是,使用分流操作可能会导致数据倾斜(data skew)问题,因为某些 key 的消息可能比其他 key 的消息更频繁,从而导致某些分区比其他分区拥有更多的消息。为了解决这个问题,可以使用一些分区策略(partitioning strategy),例如随机分配、循环分配、哈希分配等。 |
22
burymme11 2023-04-25 16:34:11 +08:00
可以中间自己加一个路由层。
新建一个中间层 AA ,来监听 topic ,处理上游服务 A 的消息,在 AA 里面,自己写代码做负载均衡,比如根据消息 ID 取模,给 B ,C ,D 分配好不同的 key ,最后所有消息再往新的 NewTopic 里丢。这样 B ,C ,D 就监听 NewTopic 就行,以后要加薪的下游服务,你只要改动 AA 层分发路由的代码就好。 |
23
Dlin 2023-04-25 16:37:01 +08:00
kafka 的 topic 和 rabbitmq 的 topic 不一样么。
|
24
zhaoyy0513 2023-04-25 16:37:30 +08:00
要实现上游系统 A 将消息发送到下游系统 B 、C 、D ,并确保每个下游系统只处理自己需要处理的消息,同时还要确保消息只被消费一次,可以采用以下方案:
使用 Kafka 作为消息中间件,将上游系统 A 发送的消息发布到一个名为"topic-A"的 Kafka 主题中。 在下游系统 B 、C 、D 中,创建三个不同的消费者组,分别为"group-B"、"group-C"、"group-D",并订阅"topic-A"主题。 在消费者端,使用 Kafka 中的消息过滤器来过滤掉不需要的消息,只选择要处理的消息。可以使用 Kafka 中的消息键(key)来实现过滤。例如,下游系统 B 只想处理键(key)为"key-B"的消息,可以使用以下代码来实现: java Copy // 创建 Kafka 消费者 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "group-B"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅"topic-A"主题 consumer.subscribe(Collections.singletonList("topic-A")); // 消费消息 while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { if (record.key().equals("key-B")) { // 处理消息 } } consumer.commitSync(); } ``` 为了确保消息只被消费一次,将消费者的 auto.offset.reset 属性设置为"earliest",并启用自动提交偏移量。这将确保消费者在启动时从最早可用的偏移量开始消费,以避免漏掉任何消息,并且将自动提交偏移量以确保每个消息只被消费一次。例如,可以使用以下代码来实现: java Copy // 创建 Kafka 消费者 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "group-B"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); props.put("enable.auto.commit", "true"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); ``` 使用上述方案,上游系统 A 可以将消息发送到"topic-A"主题中,下游系统 B 、C 、D 可以使用 Kafka 消费者订阅该主题,并使用消息过滤器来过滤掉不需要的消息,只选择要处理的消息。自动提交偏移量将确保每个消息只被消费一次。 上面两条回复都是 chatgpt 回复的 |
25
PythonYXY 2023-04-25 16:46:13 +08:00
为什么不建多个 topic 呢,如果下游服务不固定可以做成配置式的啊
|