V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
Raul7
V2EX  ›  Python

Celery+Django 如何动态修改已被接收或已在队列中的任务?

  •  
  •   Raul7 · 2022-11-21 16:45:02 +08:00 · 2503 次点击
    这是一个创建于 729 天前的主题,其中的信息可能已经有所发展或是发生改变。

    项目遇到两种场景:

      1. 任务已被 Celery 接收;
      1. 任务已在队列中排队;

    队列配置了默认任务优先级,我想在任务被接收或者在队列中,动态修改任务的优先级,有什么比较好的方法吗?

    希望各位大佬不吝赐教。

    8 条回复    2022-12-07 15:34:44 +08:00
    crysislinux
        1
    crysislinux  
       2022-11-21 16:49:23 +08:00 via Android
    把任务处理做成幂等的,然后要改优先级就同样的参数但是更高的优先级再发一次。
    Raul7
        2
    Raul7  
    OP
       2022-11-21 16:51:05 +08:00
    @crysislinux 再发一次的话,之前的那个任务要清掉吗?应该不会覆盖掉吧?队列就需要执行两个任务了
    westoy
        3
    westoy  
       2022-11-21 17:01:10 +08:00
    不能修改
    revoke 之前那个 task id, 再重新创建一个,celery 运行到的时候会判断状态
    如果你本身要处理的数据会保存状态, 那也不用取消了, 执行的时候判断一下就行了
    celery 的优先级比较玄学, 也只有基于 rabbitmq 才支持
    Raul7
        4
    Raul7  
    OP
       2022-11-21 17:20:29 +08:00
    @westoy 现在新版本的 celery ,Redis 也支持的
    crysislinux
        5
    crysislinux  
       2022-11-21 17:24:27 +08:00 via Android
    @Raul7 不会覆盖,但是后面允许的那个处理的时候你的代码能判断出来处理过了就可以跳过
    akaHenry
        6
    akaHenry  
       2022-11-22 12:28:31 +08:00
    应该没有什么队列产品, 满足你这奇怪的需求. 不只是 celery.

    队列, 只是管道, 是无状态的.

    你想改 task 状态, 是业务需求. 需要自己单独设计方案.

    给个思路:

    1. task 数据包内, 定义 task_type 字段.
    2. redis, 根据 task_type, 动态调整优先级 level 值.
    3. 定义第一级 MQ 管道 + task worker, 此 worker 主要做调度器. 收到 task 时, 根据 task_type, 查询 redis, 判断实时优先级, 进而决定:
    - 立即执行
    - 转发到另一个队列中, 等待其他 task worker 处理.
    - 忽略丢弃
    4. 次一级的 MQ + task worker, 无脑执行.


    celery, 可以是个管道链. 其他消息队列, 都可按照此思路设计业务.
    akaHenry
        7
    akaHenry  
       2022-11-22 12:35:28 +08:00
    另外不要滥用 celery, 这东西, 本身是个调度器, 做定时任务用还行.

    常规的 MQ 需求, 踢开 celery, 直接写业务代码, 操作 rabbitmq/kafka 就好.

    当然, 如果是维护的不值钱代码, 当我没说.
    kelvin_fly
        8
    kelvin_fly  
       2022-12-07 15:34:44 +08:00
    队列是无状态的管道。已经进入后,不能再做修改。这应该是个业务上的问题,需要记录队里的 ID ,然后在任务分配那,和 worker 里做业务联动处理 i
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3288 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 13:13 · PVG 21:13 · LAX 05:13 · JFK 08:13
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.