我们现在项目中使用 channel 来实现多个协程间通讯时遇到一个问题,想请教下这里的大牛。
我们有多个生产端协程(数千个)会不断的产生数据,塞入到 channel 中,然后一个消费端协程不断的从 channel 中获取数据。
现在,我们的多个生产端协程会不定时的产生短暂的峰值,这个峰值的量很大,而 channel 是有容量的。当 channel 的容量满了后,生产端就阻塞住了。这影响到了生产端的正常逻辑的执行。
我们考虑过使用 select 和 default 的方式来避免阻塞。但如何在 channel 有空余容量时,生产端协程及时将积压的消息再次推入 channel ?
由于消费端逻辑的特殊性,我们无法创建多个消费端协程来提高消费的速度。
1
Muninn 2017-03-20 21:10:17 +08:00 via Android 2
这和 golang 没关系 传统的队列一样的 只能加大缓冲或者提高消费能力
评估下内存够用不 不够了需要改架构持久化 |
2
weiweiwitch OP @Muninn 内存是够的。我们现在暂时使用了 go-datastructures 的无边界 queue 来缓解这个问题。只是这个数据结构和其他 channel 没法太好的搭配使用。
绝大部分时间,生产端的产生速度是很缓慢的,所以为了偶尔的波峰为 channel 分配巨量的缓冲,感觉比较浪费。 |
3
znood 2017-03-20 21:31:11 +08:00 1
如 1 楼所说,加大缓冲,如果消费能力不足的情况下最好在 channel 和消费者中间加个持久化队列,如 kafka ,如果对延迟要求不是很高可以直接把 channel 换成 kafka
|
4
pkking 2017-03-20 21:36:06 +08:00
可以借鉴下拥塞算法
|
5
PhilC 2017-03-20 22:06:27 +08:00
你可以看看 nsq 的代码,用 select ,当 channel 满了就写到文件里
|
6
jiumingmao 2017-03-20 22:25:11 +08:00
使用 channel 的 channel a , a 不满的时候生产者定义一个长度为 1 的子 channel b ,往 b 中放一个元素,然后放到 a ; b 满的时候,生产者定义一个长度比较大(需要估计一下峰值大概多大)的子 channel c ,然后数据放入 c ,直接 a 不满,把 c 放入 a 。
|
7
jiumingmao 2017-03-20 22:27:21 +08:00
不过 channel 都会有一个问题,进程挂了就啥都没有了,使用 kafka 可以防止数据丢失。
|
8
iot 2017-03-20 22:48:43 +08:00
生产者写入 channel 时候能不能判断下, 如果快满了就再创建一个更大的 channel 替换旧的
|
9
ghbai 2017-03-21 08:38:42 +08:00
gocrawl(开源爬虫类库)的一种方案
https://github.com/PuerkitoBio/gocrawl/blob/master/popchannel.go ``` type popChannel chan []*URLContext // The stack function ensures the specified URLs are added to the pop channel // with minimal blocking (since the channel is stacked, it is virtually equivalent // to an infinitely buffered channel). func (pc popChannel) stack(cmd ...*URLContext) { toStack := cmd for { select { case pc <- toStack: return case old := <-pc: // Content of the channel got emptied and is now in old, so append whatever // is in toStack to it, so that it can either be inserted in the channel, // or appended to some other content that got through in the meantime. toStack = append(old, toStack...) } } } ``` |
10
weiweiwitch OP @ghbai 如果我理解的没问题的话,这个方案是无法保证同一个生产者产生的 cmd 被有序的消费。
|
11
ghbai 2017-03-21 12:44:15 +08:00
@weiweiwitch 对,是不能保证有序的。
|
12
khowarizmi 2017-03-21 15:13:59 +08:00
在你的 unbounded queue 前后接上两个 channel ,然后用两个 worker 搬数据,伪装成 unbounded channel 。
|
13
gwind 2017-03-30 21:28:22 +08:00
好比一条 TCP 连接达到最大吞吐,你再塞就没有意义。
建议考虑下 ZeroMQ, nanomsg 等,重新定义模型。 纯 golang 的 nanomsg : https://github.com/go-mangos/mangos |