现在 Python 脚本跑任务跑在一台机器上,已经跑了一天了,还没跑完。
感觉应该有办法并行处理
比如: 生产者一次性创建所有的任务( 1 ), 然后创建 N 个消费者(可以部署到多台机器上), 所有任务入 MQ,然后生产者通过 pull 去拿任务去消费
求推荐的框架或者更好的思路?
1
fansfans 2021-04-08 10:05:54 +08:00 1
celery
|
2
Vegetable 2021-04-08 10:08:16 +08:00
你这个任务之间互相独立的话,可以直接在取数据时做筛选,然后脚本直接放在不同的机器上跑。
举个例子,在数据库取数据时,A 机器只取 id % 2 == 1 的,B 取 id % 2 ==0 的,这样就实现了两台机器同时处理互不冲突了鸭 不过,你这个任务貌似还不需要多机器,看起来时 io 密集型的,非阻塞编程可能就够了 |
3
MegrezZhu 2021-04-08 10:13:03 +08:00
spark / map-reduce?
|
4
jiedadada 2021-04-08 10:13:49 +08:00
celery 可以完美解决你的需求,消息队列用 redis 。
|
5
zealinux OP @Vegetable 感谢,但我的 id 是 uuid,还不大好分片。
但是可以用工具分成 1w 个 id 一个数组,然后存成文件, 再把文件拷贝到另一个机器上,或者再起一个脚本进程然后跑。 但是我这种感觉好麻烦 |
6
shoaly 2021-04-08 10:14:45 +08:00
你可以反过来, 不要 消费者去取, 而是生产者去调用 ... 这样就不会冲突了
|
7
maocat 2021-04-08 10:16:14 +08:00
一台 redis,N 个 celery
|
8
wjidea 2021-04-08 10:24:30 +08:00
我目前做的是 rmq 和 celery 。 请问 rmq 和 redis 性能方面有差别吗?
|
9
ch2 2021-04-08 10:27:13 +08:00
为什么不尝试对第二步并行呢?第二步是 IO 密集型还是 CPU 密集型的任务?
|
10
hanssx 2021-04-08 10:36:47 +08:00
celery bug 贼多,用着用着就出现各种问题,解决了一个就会出现另一个,比如 https://github.com/celery/celery/issues/4226,不知道现在 5.x 好点了没,而且定时任务需要重启才能修改,有空可以试试 apscheduler + dramatiq
|
12
Mars2333 2021-04-08 10:42:58 +08:00
不知道 faust 合不合适
https://github.com/robinhood/faust |
13
ch2 2021-04-08 10:50:59 +08:00
@zealinux #11 IO 密集型单机到达带宽上限也是很可观的,多进程不需要,多线程反正 python 的线程只会吃一个核心,开个 64 线程的池子也无所谓
|
14
wuwukai007 2021-04-08 11:05:09 +08:00
celery 跟 redis 搭配,各种问题停不下来,建议跟 rabbitmq 搭配
|
15
MintZX 2021-04-08 11:26:56 +08:00
这个数据量大是有多大?一般情况下遇到极大的数据的情况我们都是直接做快照弄个三四天然后再增量把新增数据处理一下。一天弄不完挺正常的?至于 UUID 的问题可以新建一个 indexed column 写个 procedure 搞一个 int ID 出来。。
|
16
hanyceZ 2021-04-08 11:29:42 +08:00
celery 纯 beat 用了三个月,bug 不断,各种 bug,flower 卡在 started 状态,定时任务重复执行,还有心跳连接错误导致日志爆了。。。服了
|
17
liprais 2021-04-08 11:31:18 +08:00
处理后每条再重新更新到数据库中
积累几万条再一块 load 呗 |
18
lithiumii 2021-04-08 12:03:23 +08:00 via Android
rq 吧,celery 我反正是用不明白
|
19
dayeye2006199 2021-04-08 12:07:36 +08:00
这种一次性,简单并行循环的任务,建议可以考虑[dask]( https://dask.org/)框架。代码非常好写,改动很小,纯 python 框架,不依赖外部服务(数据库、消息队列等)。
建立一个 scheduler 和多台 worker 机器的集群也十分简单。 |
20
xchaoinfo 2021-04-08 12:30:22 +08:00 via Android
分批次取数据,例如,一次取 10w 条,pandas 处理后,在更新回去。
|
21
Jat001 2021-04-08 12:46:17 +08:00 via iPhone
@hanssx n 年前我用 celery 的时候也发现有一堆 bug,不知道是现在好点了还是一堆人跟风,实际没怎么用过,反正当时是真后悔选 celery 了
|
22
Selenium39 2021-04-08 12:49:16 +08:00
才用完 celery,一大堆坑,但是功能还是很强大
|
23
Jat001 2021-04-08 12:53:01 +08:00 via iPhone
https://github.com/celery/celery/issues/3864
17 年提的 feature request,一直在 milestones 里往后拖,现在已经在 5.2 的 todo list 里了😂 |
24
ipwx 2021-04-08 12:56:32 +08:00
这个场景不适合用任何工作队列。因为大部分时间在 IO 上。主线程读进来然后再分派是什么鬼?
---- 建议先读进来所有 ID,对 ID 分片,然后在子进程根据 ID 取数据、处理、然后写回去。如果可以,那就直接要求每个子进程读比如“尾部是 XXX” 的 ID 的记录。 |
25
nthhdy 2021-04-08 13:14:50 +08:00
一次性的任务,还是重复性的工作?
另外,你预计单机的算力能支持吗? 如果是一次性的任务,并且单机多核算力没问题,上 multiprocessing 就行了,多进程,代码很好写。 如果否,上任务队列吧。你这个需求简单,celery,或者 rq 都可以。 |
26
DoctorCat 2021-04-08 13:18:35 +08:00
如果调度策略很基础没复杂的设计,RQ 就行了 https://github.com/rq/rq
|
27
SjwNo1 2021-04-08 14:15:04 +08:00
concurrent.futures
|
28
tisswb 2021-04-08 16:01:10 +08:00
这种我一般都是 celery,效果,功效都不错
|
29
Hconk 2021-04-09 07:07:30 +08:00 via iPhone
试试 ray 呢, https://github.com/ray-project/ray
|
30
cco 2021-04-09 09:39:29 +08:00
pyspark 或许也可以。
|
31
sss495088732 2021-04-09 11:30:12 +08:00
kafka+Faust
|