V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX 提问指南
fruitmonster
V2EX  ›  问与答

请教各位 Golang 大神一段代码

  •  
  •   fruitmonster · 322 天前 · 1733 次点击
    这是一个创建于 322 天前的主题,其中的信息可能已经有所发展或是发生改变。

    背景:

    这是一个批量接口,目的是接口接收数据,数据交给协程存入 Kafka ,接口立即响应成功,越快越好,请求频率每秒 70 次,一次请求数组携带 1000 条数据。

    压测的时候发现这个接口使用协程会导致服务内存暴涨,昨天查了一下午也没有头绪,即使什么也不做,只打印个数组元素的长度,发现如果不使用协程直接 for 循环处理 requestData 数据,内存就不会上涨,但这样会影响接口的响应速度,使用协程处理的话内存会立即上涨,这是为什么呢? 如果开启协程占用了内存,可是协程只有几 KB 呀,若协程内的数据处理占用,那不用协程也占用了内存啊,我的理解用了协程处理能力应该会提高才对啊,为什么反倒下降了呢?附上 pprof 内存图

    
    func PushBatch(c *gin.Context) {
    	appGin := app.Gin{C: c}
    
    	gameId := c.Query("game_id")
    	if gameId == "" {
    		// 记录错误日志···
    		return
    	}
    
    	// 读取请求体
    	bodyBytes, err := io.ReadAll(c.Request.Body)
    	if err != nil {
    		// 记录错误日志···
    		return
    	}
    
    	// JSON 反序列化
    	var requestData []map[string]interface{}
    	if err := json.Unmarshal(bodyBytes, &requestData); err != nil {
    		// 记录错误日志···
    		return
    	}
    
    
    	// 1.没有使用协程
    	for _, entry := range requestData {
    		fmt.Println(len(entry))
            
                    // Todo 数据写入 Kafaka
    	}
    
    	// 2.数据放入协程
    	//go func(requestData []map[string]interface{}) {
    	//	
    	//	for _, entry := range requestData {
    	//		//handleEntry(gameId, entry)
    	//		fmt.Println(len(entry))
    	//	}
    	//}(requestData)
    
    	// 返回状态 200
    	appGin.Response( http.StatusOK, e.SUCCESS, nil)
    }
    
    
    20 条回复    2024-01-08 10:13:23 +08:00
    lifei6671
        1
    lifei6671  
       322 天前
    使用了协程后,HTTP 请求会立即返回,如果 qps 很大或持续时间很长,会导致你的协程数量暴增,且每个协程都会保持一个 map 数据,当然会导致内存暴增了。
    建议使用协程池,保持一定的协程数量,每个协程用来处理写 kafka 的数据,这样可以控制协程数量不会暴涨。还有你这个 json 序列化也有问题,建议直接序列化为结构体,可以使用流式 json 解析,而不是 read 所有数据。
    GooMS
        2
    GooMS  
       322 天前 via Android
    其次携程也是要回收的,处理好错误和超时
    zsj1029
        3
    zsj1029  
       322 天前
    都已经用 gin 框架了,自带协程池处理请求,没必要自己写协程处理了,func 里面写正常的 Kafka insert 逻辑即可,一秒上千条的处理很轻松的
    zdt3476
        4
    zdt3476  
       322 天前
    你有看过请求测试的时候 goroutine 的数量吗?大概率就是开了太多 goroutine 了。你固定开几个 goroutine 执行 kafka 写入的操作。PushBatch 这个接口通过写入一个带 buffer 的 channel 把数据投递到前面的 goroutine 中就好了。
    iyaozhen
        5
    iyaozhen  
       322 天前
    你不要说多少条,一条多大呢

    你说的暴涨是多大呢,这个预期就是会暴涨的,不要说 2G 到 4G 。

    kafka 的吞吐到了多少呢,这里卡住也会影响性能的

    我猜你这个服务是接收日志或者打点上报的。我之前拿 php-Swoole 做过,3-4w qps 没有问题

    你给的信息太少了,大概猜测下你说的问题:
    1. 发现如果不使用协程直接 for 循环处理 requestData 数据,内存就不会上涨
    那是因为这样接口耗时会增加,这样发压端压力上不来。不知道你是什么发压模型
    2. 使用协程处理的话内存会立即上涨,这是为什么呢?
    同 1 ,耗时下降,这样单位时间内,你收到的请求更多了,请求 data 都在内存里,可不就上涨了

    我的建议是,发送到 kafka 就不要协程了,让这点消耗体现在接口耗时上
    sujin190
        6
    sujin190  
       322 天前
    好像 golang 的 goroutine 调度并不是平衡调度,所以并发很高接口提交速度这么快的话,消费不足肯定导致大量数据拥塞在内存中内存使用量肯定高了,不过你可以试试看其实应该没有一直涨,看你这量不平衡导致内存使用估计得接近 10G 级别了吧,线程的调度平衡性要好的多,但是量特别大也是要考虑线程调度的平衡性影响的

    但是换个方向如果你生成业务提交数据不是一直这么高的话,内存充足无所谓的吧,否则如果一直都这么高,或者对内存使用有极度需求,那么你使用 goroutine 提交到 Kafaka 并不能提高你整个系统的吞吐,整体来看对接口延时也帮助不大,整体还是受限于内存大小和 Kafaka 写入速度,异步 goroutine 提交没啥用吧,多余了
    fruitmonster
        7
    fruitmonster  
    OP
       322 天前
    确实,找到问题了,大量且时间长的请求,协程中的 for 循环会把 CPU 跑满,导致一直积压,不能及时处理,但请求依旧,依旧在启动新的协程,导致内存暴涨。因为接口的请求体字段是不固定的,不知道数量,不知道类型,所以我就想的是使用了 map

    ```go

    // 2.数据放入协程
    go func(requestData []map[string]interface{}) {

    for _, entry := range requestData {
    //handleEntry(gameId, entry)
    fmt.Println(len(entry))
    }
    }(requestData)

    ````
    fruitmonster
        8
    fruitmonster  
    OP
       322 天前
    @GooMS 确实,协程一直在启动 CPU 跑满,协程不能被正常释放,一直在处理 for 循环的数据
    FreeEx
        9
    FreeEx  
       322 天前
    1. 删除协程。
    2. 发送 kafka 改成异步批量发送。
    fruitmonster
        10
    fruitmonster  
    OP
       322 天前
    @iyaozhen
    一次请求,数据体的大小大概 500kb
    暴涨就是从服务启动的几十兆,到 7 G 到 10G ,只要给压力会持续暴涨,无限上涨,若服务不停,内存用完为止

    您猜测的没错,之前我的目的是为了接口尽可能快的返回数据,我就把数据丢给了协程,让携程去处理,并且在协程中愚蠢的增加了验证、解析的逻辑,等把这部分数据整理完之后再写入 kafka ,问题就出在整理这里,目前发现的问题是:当请求量非常大,原有代码的 for 循环,本身会占用 CPU 的能力,大量请求把 CPU 全部跑满,CPU 处理不了,数据就会在内存中越积越多,所以内存就会上涨
    fruitmonster
        11
    fruitmonster  
    OP
       322 天前
    @sujin190 是的,您分析的没错,我之前把数据交给协程去处理,因为我想在协程里加一些验证,能过通过验证才会写入 Kafka ,现在看来我完全是错的
    fruitmonster
        12
    fruitmonster  
    OP
       322 天前
    @zsj1029 大哥,请明示,没找到 Gin 自带协程池处理请求的相关内容哇
    kkbblzq
        13
    kkbblzq  
       322 天前
    你这写的有点无力吐槽。。
    1. 你这样开协程只是提前返回罢了,对处理效率并没有改进,完全没有必要开,你开了无非是多了一堆挂在后台跑的协程,内存不大才怪了。真开协程应该是按 kafka 单包大小进行分片,比如 100 条数据一个协程发送,且应该开 wg 等协程跑完再返回的。
    2. 既然你不需要了解结构,单纯解数组的话,[]json.RawMessage 就可以了,何必解成 map ,你发 kafka 又得序列化回去,反复的消耗 cpu 何必呢。
    ...
    fruitmonster
        14
    fruitmonster  
    OP
       322 天前
    @kkbblzq 条条在理,正在改
    leyoumake1997
        15
    leyoumake1997  
       322 天前
    将请求数据写入到队列,然后另外一个服务去消费到 kafka 里面
    leonshaw
        16
    leonshaw  
       322 天前
    读 request body 时就已经占了一部分内存了,不开新协程时是因为没有提前返回导致并发数受到了限制,确认一下这种情况下并发瓶颈是在客户端还是服务端。你需要的是在服务端读 request body 之前限制并发。
    lsk569937453
        17
    lsk569937453  
       321 天前
    这是一个批量接口,目的是接口接收数据,数据交给协程存入 Kafka ,接口立即响应成功,越快越好,请求频率每秒 70 次,一次请求数组携带 1000 条数据。单次请求数据大小 500kb 。
    ============================
    先说最简答的方案,加机器。后端服务本来就是无状态的,kafka 也绝对不是瓶颈。而且你自己也说了后端服务内存压力很大,那直接加机器就好了。

    其次,在不加机器的情况下,你的代码还有可以继续优化的空间。后端收到请求后不校验,直接完整的将 http 请求写入 kafka ,省去序列化( http 请求到 struct)和反序列化(struct 到 kafka 的 body)的 cpu 。由后续的消费者去一次消费 1000 条消息做处理/校验。

    技术没有银弹,完全看你怎么取舍。以前我可能采取第二种方案,现在年龄大了,不想折腾了,我只想会加机器。
    PiersSoCool
        18
    PiersSoCool  
       321 天前
    我处理每秒最高十几万 QPS ,这种不用开协程,直接用 gin 就好;这里直接用 k8s 扩容即可,甚至可以压测出 QPS 按照 QPS 自动扩容

    confluent kafka golang 那边,以及 kafka 公有云上会有 bug ,小心踩坑
    fruitmonster
        19
    fruitmonster  
    OP
       321 天前
    @PiersSoCool 请问怎么直接用 Gin 啊,我新手,没听太明白,谢谢啦~
    PiersSoCool
        20
    PiersSoCool  
       318 天前
    @fruitmonster gin 直接处理请求,和上面类似的不开协程,gin 每个 http 请求本身就是个协程,直接往 kafka 写数据
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1177 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 18:06 · PVG 02:06 · LAX 10:06 · JFK 13:06
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.