使用 Golang 实现了一个简单的消费者模式, 主要解决每分钟百万请求问题的技术方案。 基本原理:建立固定的工作线程去缓冲池中取数据处理。以此来控制固定时间内处理的请求数
https://github.com/qianguozheng/go-workerpool.git
实际使用场景来自 http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/
国内最近翻译的也有,可以自行搜索。
1
kslr 2017-01-14 23:59:39 +08:00
Nice
|
2
pubby 2017-01-15 02:40:12 +08:00
实际上 MaxQueue 没起到作用,发生问题后任务压在这个相对轻量一些的 goroutine 上而已
for{ select{ case job:= <-JobQueue: // a job request has been received fmt.Println("Store a job into jobChannel") go func(job Job){ //try to obtain a worker job channel that is available. //this will block until a worker is idle jobChannel := <- d.WorkerPool //dispatch the job to the worker job channel jobChannel <- job }(job) } } |
3
qianguozheng OP @pubby 通过 jobqueue 的长度,可以控制工作者 routine 取数据的速率吧
|
4
pubby 2017-01-15 11:40:08 +08:00 via Android
@qianguozheng 你从 JobQueue 取任务后直接起 goroutine 来等待空闲 worker 。相当于把 JobQueue 的长度又不可控的延长了
|
5
qianguozheng OP @pubby JobQueue 的长度, workder 的数量都是可以调控的, 正是通过调控两者的长度来控制处理的速率。
|
6
pubby 2017-01-16 17:30:14 +08:00
@qianguozheng 处理速度是控制了
但是从 JobQueue 里取任务的速度没控制,前面不管有多少 job 进来,这边都在新开 go func()来等待 worker 如果因为其他原因所有 worker 都需要长时间处理的话, goroutine 数量就会猛增,只是这个 goroutine 比较轻量,只是等待空闲 worker 而已,所以系统资源没有那么快炸 |
7
qianguozheng OP @pubby 我觉得你理解有误。
1. 本程序的目标就是 通过调控 JobQueue 的长度, worker 的数量来达到最大处理能力,在输入压力再大的情况,处理速率都是不变的。 2. 从 JobQueue 里面取任务的是 worker ,而 worker 的数量是固定的。不会新增任何的 goroutine 来处理。你说的就永远不会发生。 |
8
pubby 2017-01-17 10:54:36 +08:00
@qianguozheng 是的,你输入压力再大, worker 处理速度也不变,那么再大的处理压力去哪里了呢?
你这些压力都去了这里: go func(job Job){ //try to obtain a worker job channel that is available. //this will block until a worker is idle jobChannel := <- d.WorkerPool //dispatch the job to the worker job channel jobChannel <- job }(job) 你 worker 数量有限,那么来不及处理的那些压力就会在这里变成 goroutine 来等待 <-d.WorkderPool |
9
qianguozheng OP |
10
ijustdo 2017-02-27 09:27:28 +08:00
没错 这个方法是简单粗暴有效的 我以前类似的 服务端程序 跑了 6 年多 还没嗝屁
全局任务队列 加锁 结果处理队列 加锁 N(参数)个处理任务的线程 每个线程 while 1 就是干活的 干什么由 task_handle 引用传入 每个线程有一个 event 控制暂停 启动 停止 有个参数控制 每个任务处理后的间隔 定时取任务线程(这个有必要, 当任务数量是不可预期的, 或者是动态的) 当然现在类似 rabbitmq 消息队列之类 好搞多了 为什么上面线程要加暂停呢 因为当取任务都没取到(干完了) 任务队列又为空 可以暂停所有干活的线程 等到 某时刻 来任务了 在通知兄弟们干起来 呵呵 处理结果线程(可以多个) |