V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
janusle
V2EX  ›  Python

问个芹菜(Celery)的问题,我不知道我这样设计合不合理。有大虾指点下么?

  •  
  •   janusle · 2014-09-24 19:48:24 +08:00 · 4500 次点击
    这是一个创建于 3695 天前的主题,其中的信息可能已经有所发展或是发生改变。
    我现在在自己业余做一个社交网络分析的程序。主要是从Twitter拉用户的timeline然后统计这个用户哪一个时间点最活跃。

    我目前想到的设计是这样的

    有一个进程负责安排task A到celery用来从twitter拉数据。然后拉数据的task A把拉倒的数据放到另外一个AMQP(大约会用rabbitmq)中。然后另外有一个进程一直监测那个AMQP,一旦发现有新数据就安排一个task B到celery,这个新安排的task B做得是 根据数据统计下用户哪个时间点发帖最多,把结果更新到数据库中去。

    我有如下几个问题:

    1. celery有没有什么办法可以直接把task返回的值传到另外一个queue里面去嘛。我翻了一下文档
    貌似没有,看起来这个工作只能在定义task A的function里完成。

    2. 我因为第一次做这样的东西,好奇我这个设计合不合理,有没有办法改善。比如说让task A既拉数据又对数据做计算然后写入数据库?

    先谢谢啦。
    11 条回复    2014-09-24 22:49:32 +08:00
    wibile
        1
    wibile  
       2014-09-24 20:26:46 +08:00
    这不就是个爬虫嘛,没有大规模的业务不用上celery吧,celery做分布式用的。
    传数据可以直接在taskA里再生成一个带参数的任务给另一个队列,作为B来执行。也可以直接用memcache。
    janusle
        2
    janusle  
    OP
       2014-09-24 20:34:16 +08:00
    @wibile 因为可能会要拉很多个用户的数据所以才想到用celery。你是说在定义task A的function里调用定义task B的function然后task B入celery吗?

    你说的memcache也可以是怎么个用法呢?我从twitter拉下来的是用户的timeline所以就是一个很大的json,这个可以放进memcache?
    no13bus
        3
    no13bus  
       2014-09-24 20:40:31 +08:00
    @janusle 我最近一直在用celery。这样能不能解决。2个任务。taskA负责爬数据入库数据表tableA,taskB负责分析tableA里面的数据,合格的数据或者说分析之后的数据入数据表tableB。taskA和taskB都作为定时任务,前者比如1s执行一次,后者10分钟一次?
    janusle
        4
    janusle  
    OP
       2014-09-24 20:43:59 +08:00
    @no13bus 谢谢回答。你说的那个table A就相当于扮演一个queue的功能,这样是不是rabbitmq更适合呢?而且我想做成异步的,就是有数据就会分析。这样效率比较高。你觉得呢?
    starsoi
        5
    starsoi  
       2014-09-24 22:09:03 +08:00   ❤️ 1
    celery 可以用chain把前一个task的返回值传给下一个task:

    # (4 + 4) * 8 * 10
    >>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))

    你可以用TaskA拉数据,如果有数据就返回数据,没有数据就返回None.
    TaskB取TaskA的返回值作参数,如果是None就直接返回,如果是非None就处理数据更新数据库
    wibile
        6
    wibile  
       2014-09-24 22:09:09 +08:00   ❤️ 1
    @janusle 只要是定义好的task,哪里都可以用的。在task A里直接调用task_b.apply_async(kwargs=your_json)就可以把这个任务放入B队列。没必要用table,实在不想用queue,就搞个memcache存中转数据。
    no13bus
        7
    no13bus  
       2014-09-24 22:18:10 +08:00   ❤️ 1
    @wibile 正解。celery可以这么用的。celery配合redis 兔子能够变成队列服务。
    @starsoi 觉得chain和taskA里面套用taskB效果差不多,taskB写在taskA代码里面的最后 其实作用相当于chain。我自己觉得。
    starsoi
        8
    starsoi  
       2014-09-24 22:27:15 +08:00   ❤️ 1
    @no13bus 用chain的好处是task之间的依赖关系比较清晰,直接对应业务逻辑; 并且celery会自己保存中间结果(TasksA的结果),可以直接用result.parent.get()查看
    no13bus
        9
    no13bus  
       2014-09-24 22:31:16 +08:00
    @starsoi 恩。任务之间耦合度低,逻辑清晰明了。感谢。
    janusle
        10
    janusle  
    OP
       2014-09-24 22:48:50 +08:00
    @starsoi 这个就是我想要的,我不需要用个queue存中转数据类 谢谢!
    janusle
        11
    janusle  
    OP
       2014-09-24 22:49:32 +08:00
    @wibile 感谢,我打算用chain来做这个了!
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2946 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 14:53 · PVG 22:53 · LAX 06:53 · JFK 09:53
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.