V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
weiweiwitch
V2EX  ›  Go 编程语言

go 的 channel 的一个疑问

  •  1
     
  •   weiweiwitch · 2017-03-20 20:51:41 +08:00 · 1901 次点击
    这是一个创建于 2790 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我们现在项目中使用 channel 来实现多个协程间通讯时遇到一个问题,想请教下这里的大牛。

    我们有多个生产端协程(数千个)会不断的产生数据,塞入到 channel 中,然后一个消费端协程不断的从 channel 中获取数据。

    现在,我们的多个生产端协程会不定时的产生短暂的峰值,这个峰值的量很大,而 channel 是有容量的。当 channel 的容量满了后,生产端就阻塞住了。这影响到了生产端的正常逻辑的执行。

    我们考虑过使用 select 和 default 的方式来避免阻塞。但如何在 channel 有空余容量时,生产端协程及时将积压的消息再次推入 channel ?

    由于消费端逻辑的特殊性,我们无法创建多个消费端协程来提高消费的速度。

    13 条回复    2017-03-30 21:28:22 +08:00
    Muninn
        1
    Muninn  
       2017-03-20 21:10:17 +08:00 via Android   ❤️ 2
    这和 golang 没关系 传统的队列一样的 只能加大缓冲或者提高消费能力

    评估下内存够用不 不够了需要改架构持久化
    weiweiwitch
        2
    weiweiwitch  
    OP
       2017-03-20 21:28:28 +08:00
    @Muninn 内存是够的。我们现在暂时使用了 go-datastructures 的无边界 queue 来缓解这个问题。只是这个数据结构和其他 channel 没法太好的搭配使用。

    绝大部分时间,生产端的产生速度是很缓慢的,所以为了偶尔的波峰为 channel 分配巨量的缓冲,感觉比较浪费。
    znood
        3
    znood  
       2017-03-20 21:31:11 +08:00   ❤️ 1
    如 1 楼所说,加大缓冲,如果消费能力不足的情况下最好在 channel 和消费者中间加个持久化队列,如 kafka ,如果对延迟要求不是很高可以直接把 channel 换成 kafka
    pkking
        4
    pkking  
       2017-03-20 21:36:06 +08:00
    可以借鉴下拥塞算法
    PhilC
        5
    PhilC  
       2017-03-20 22:06:27 +08:00
    你可以看看 nsq 的代码,用 select ,当 channel 满了就写到文件里
    jiumingmao
        6
    jiumingmao  
       2017-03-20 22:25:11 +08:00
    使用 channel 的 channel a , a 不满的时候生产者定义一个长度为 1 的子 channel b ,往 b 中放一个元素,然后放到 a ; b 满的时候,生产者定义一个长度比较大(需要估计一下峰值大概多大)的子 channel c ,然后数据放入 c ,直接 a 不满,把 c 放入 a 。
    jiumingmao
        7
    jiumingmao  
       2017-03-20 22:27:21 +08:00
    不过 channel 都会有一个问题,进程挂了就啥都没有了,使用 kafka 可以防止数据丢失。
    iot
        8
    iot  
       2017-03-20 22:48:43 +08:00
    生产者写入 channel 时候能不能判断下, 如果快满了就再创建一个更大的 channel 替换旧的
    ghbai
        9
    ghbai  
       2017-03-21 08:38:42 +08:00
    gocrawl(开源爬虫类库)的一种方案
    https://github.com/PuerkitoBio/gocrawl/blob/master/popchannel.go

    ```
    type popChannel chan []*URLContext
    // The stack function ensures the specified URLs are added to the pop channel
    // with minimal blocking (since the channel is stacked, it is virtually equivalent
    // to an infinitely buffered channel).
    func (pc popChannel) stack(cmd ...*URLContext) {
    toStack := cmd
    for {
    select {
    case pc <- toStack:
    return
    case old := <-pc:
    // Content of the channel got emptied and is now in old, so append whatever
    // is in toStack to it, so that it can either be inserted in the channel,
    // or appended to some other content that got through in the meantime.
    toStack = append(old, toStack...)
    }
    }
    }
    ```
    weiweiwitch
        10
    weiweiwitch  
    OP
       2017-03-21 09:17:45 +08:00
    @ghbai 如果我理解的没问题的话,这个方案是无法保证同一个生产者产生的 cmd 被有序的消费。
    ghbai
        11
    ghbai  
       2017-03-21 12:44:15 +08:00
    @weiweiwitch 对,是不能保证有序的。
    khowarizmi
        12
    khowarizmi  
       2017-03-21 15:13:59 +08:00
    在你的 unbounded queue 前后接上两个 channel ,然后用两个 worker 搬数据,伪装成 unbounded channel 。
    gwind
        13
    gwind  
       2017-03-30 21:28:22 +08:00
    好比一条 TCP 连接达到最大吞吐,你再塞就没有意义。
    建议考虑下 ZeroMQ, nanomsg 等,重新定义模型。
    纯 golang 的 nanomsg : https://github.com/go-mangos/mangos
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2755 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 14:44 · PVG 22:44 · LAX 06:44 · JFK 09:44
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.