V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
yuandj
V2EX  ›  Kafka

关于 PHP Rdkafka 消费者性能讨论

  •  
  •   yuandj · 2021-05-11 11:19:04 +08:00 · 2001 次点击
    这是一个创建于 1296 天前的主题,其中的信息可能已经有所发展或是发生改变。

    当下遇到的问题:

    服务提供商:
    1. 集群每个节点的吞吐量在 1.5 MB/s 左右,远小于服务的吞吐量
    2. 3 个节点每个 topic 设置 90 个分区, 3 副本,这个使用方式不太合理,服务需要对每个 topic 维护 90x3 个 replica 进程,io process 也要维护 90x3 个,原来顺序的读写也会退化为随机读写,网络 process 需要维护 90 个
    3. 看历史监控记录,副本延迟在过去是会频繁发生的
    4. 之前有建议您修改分区到 6 ~ 9 个 您这边反馈分区数调低之后消费者有延迟,实际您这边的吞吐量远没有达到服务应该有的吞吐量,怀疑是客户端方面有问题,需要您在消费端打印每次 poll 的时间和 poll 下来的消息条数,确定消费者行为,这样我们可以进一步分析
    
    现在我们这边的解决方案还是和之前的建议一样,topic 分区数调整到 6 ~ 9 个,消费延迟的问题需要从客户端出发解决
    
    开发者:
    调整为 6 个分区之后,不是消费延迟问题,是单个消费者的能力不足,跟不上生产的速度。之前已经试过了,10 来分钟就堆积了 100 万消息。
    
    服务提供商:
    6 个分区的话,可以使用 6 个消费者,6 个消费者的能力远不止这么差.
    max.poll.records,可以用于指定批量消费条数的
    配合配置 max.partition.fetch.byte 和 fetch.max.wait.ms 两个参数 可以实现批量消费 kafka 的消息。您看看 php 的客户端是否有设置这些参数的地方,或者有其他地方可以设置消费者的批量消费的,因为一条条的消费,效率是极低的
    
    开发者:
    rdkafka 扩展里,好像没这个相关的参数
    

    当前是 1 个 topic,90 个分区,分区数太多引起 kafka 集群副本同步时的性能下降问题。服务商建议减少分区数,但是减少分区数会有大量的消息堆积,rdkafka 如何提升单消费者的性能呢?

    消费者大致代码如下:

    $this->RdKafkaConf = new RdKafka\Conf();
    
    $this->RdKafkaConf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
                switch ($err) {
                    case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                        $kafka->assign($partitions);
                        break;
                    case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                        $kafka->assign(null);
                        break;
                    default:
                        throw new \Exception($err);
                }
            });
    
            $this->RdKafkaConf->set('group.id', $groupid);
            // Initial list of Kafka brokers
            $this->RdKafkaConf->set('metadata.broker.list', $configs);
            $this->RdKafkaConf->set('socket.keepalive.enable', 'true');
            $this->RdKafkaConf->set('enable.auto.commit', 'true');
            $this->RdKafkaConf->set('auto.commit.interval.ms', '100');
            $this->RdKafkaConf->set('auto.offset.reset', 'smallest');
    
            $topic = is_array($topic) ? $topic : [$topic];
            $consumer = new RdKafka\KafkaConsumer($this->RdKafkaConf);
            $consumer->subscribe($topic);
            while (true) {
                $message = $consumer->consume($timeout * 1000);
                switch ($message->err) {
                    case RD_KAFKA_RESP_ERR_NO_ERROR:
                        call_user_func_array($callback, [$message]);
    //                    $consumer->commitAsync($message);
                        break;
                    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
    //                    Log::get('consumer')->info("No more messages; will wait for more");
                        break;
                    case RD_KAFKA_RESP_ERR__TIMED_OUT:
    //                    Log::get('consumer')->error("Timed out");
                        break;
                    default:
                        throw new \Exception($message->errstr(), $message->err);
                }
            }
    
    
    //callback function
    if (count(self::$queue) >= 10 || (time() - $this->lastWriteTimestamp) >= 1) {
                    self::$queue[] = $msg;
                    $queue = self::$queue;
                    self::$queue = [];
                    $this->lastWriteTimestamp = time();
                    $reportData = [];
                    
                    foreach ($queue as $message) {
                        $data = json_decode($message->payload, true);
                        // 入库
                    }
    } else {
                    self::$queue[] = $msg;
    }
    
    第 1 条附言  ·  2021-05-15 19:37:09 +08:00

    问题已解决:

    1. 从代码可以看到,callback已经采用了批处理,想要消费的更快,调大批量条数就可以了。
    2. kafka集群性能的问题,是由于kafka生产者写入数据时,没有使用连接池,每次请求都会新建一个tcp连接,当请求量上来时就导致tcp连接数过多,kafka集群CPU性能受到影响导致的。

    使用连接池前,API服务器tcp time_wait个数保持在3w左右,大多是kafka集群的连接。服务器cpu在50%左右。 使用连接池后,同样的请求量,tcp time_wait个数一直在1000内,cpu使用率降到了17%左右。。。

    又踩了tcp的坑。说到底还是自己功底不扎实。继续努力吧

    踩坑记录:

    1. hyperf的连接池中max_connections参数,是针对单woker设置的,如果想要在单服务器配置500个连接的话,需要配置500/woker_num
    2. nginx做代理时,指定http版本为1.1,开启长链接(减少后端服务器对于nginx服务器的time_wait状态个数)
    server {
        ...
    
        location /http/ {
            proxy_pass http://http_backend;
            proxy_http_version 1.1; 
            proxy_set_header Connection ""; 
            ...
        }
    }
    
    6 条回复    2021-05-11 14:51:59 +08:00
    iyaozhen
        1
    iyaozhen  
       2021-05-11 13:07:22 +08:00
    callback 慢呗,可以多进程( 1-2 倍分区数)同一个 group.id 并行消费。
    yuandj
        2
    yuandj  
    OP
       2021-05-11 13:17:05 +08:00
    @iyaozhen 用 swoole 的协程试过,多个协程之间会重复消费数据
    iyaozhen
        3
    iyaozhen  
       2021-05-11 13:40:02 +08:00
    @yuandj 不用携程,多进程最合适。确定是相同 group id ?
    yuandj
        4
    yuandj  
    OP
       2021-05-11 13:50:50 +08:00
    @iyaozhen 一个 topic 下的一个分区,在同一时间,不是只能被一个消费者消费吗?
    JKeita
        5
    JKeita  
       2021-05-11 14:16:14 +08:00
    一个 topic 每个分区只会被消费者组里的一个消费者消费。
    iyaozhen
        6
    iyaozhen  
       2021-05-11 14:51:59 +08:00 via Android
    @yuandj 是啊,但你可以 n 个主进程消费,然后扔给 task 进程入库
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1395 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 19ms · UTC 17:32 · PVG 01:32 · LAX 09:32 · JFK 12:32
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.