V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
YuuuZeee
V2EX  ›  Python

Celery 可以启动多个线程嘛=-=

  •  
  •   YuuuZeee · 2019-01-03 10:57:47 +08:00 · 4218 次点击
    这是一个创建于 2180 天前的主题,其中的信息可能已经有所发展或是发生改变。

    RT =-=小白想要提升一下现在的一个任务的性能

    假设我们有一个 api 多次调用的需求在一个 celery 的 task 里面

    最普通的方法可以是这样:

    @task
    def foo():
    	for i in range(20):
        	call_api_here(i)
        do_something_else()
    

    但是这个很耗时因为会卡在 api 调用上

    然后如果我们用 multiprocessing (不是一个好的注意,但是粗暴), celery 有自己的 library 叫做 billiard.

    然后我们可以这样做

    @task
    def foo():
    	with Pool(5) as p:
        	p.map(call_api_here, some_params)
        do_something_else()    
    

    但是这样的问题是,每一个 task 都会开启一个 pool 然后结束后这个 pool 并没有被释放,最终导致内存被各种占用

    所以我想了另外一个办法,用 multithreading

    def foo()
        threads = []
        for i in range(20):
                t = threading.Thread(target=call_api_here, args=(i,))
                threads.append(t)
    
        for t in threads:
            t.start()
    
        for t in threads:
            t.join()
    
    

    但是试了下发现,虽然线程会随着进程结束而被销毁,但是貌似在每个 task 里面只有前几个线程执行了。。。后面的都 gg 了。。。

    想问问各位大佬有什么好的方法嘛😂

    10 条回复    2019-01-12 07:34:58 +08:00
    fanhaipeng0403
        1
    fanhaipeng0403  
       2019-01-03 11:16:30 +08:00
    from time import sleep
    from concurrent.futures import ThreadPoolExecutor\ ProcessPoolExecutor
    def child_1():
    sleep(9)
    print(1)


    def child_2():
    sleep(2)
    print(2)



    def child_3():
    sleep(3)
    print(3)



    def child_4():
    sleep(1)
    print(4)


    def child_5():
    sleep(2)
    print(5)



    with ThreadPoolExecutor\ProcessPoolExecutor(max_workers=5) as executor:
    executor.submit(child_1)
    executor.submit(child_2)
    executor.submit(child_3)
    executor.submit(child_4)
    executor.submit(child_5)


    t1= executor.submit(child_1)
    t2=executor.submit(child_2)
    t3=executor.submit(child_3)
    t4=executor.submit(child_4)
    t5=executor.submit(child_5)

    print(t1.result())
    YuuuZeee
        2
    YuuuZeee  
    OP
       2019-01-03 11:17:22 +08:00
    @fanhaipeng0403 这样的话每个任务都会创建一个 pool 还是占着内存呀
    fanhaipeng0403
        3
    fanhaipeng0403  
       2019-01-03 11:17:28 +08:00
    y worker -A app.tasks.celery -l INFO -Q default -c 20 (每个队列多搞几个 worker ) -n default_worker.%%i
    fanhaipeng0403
        4
    fanhaipeng0403  
       2019-01-03 11:20:17 +08:00
    import asyncio

    async def slow_operation(n):
    await asyncio.sleep(n)
    print('Slow operation {} complete'.format(n))
    return n


    loop = asyncio.get_event_loop()
    done, _ = loop.run_until_complete(
    asyncio.wait([
    slow_operation(1),
    slow_operation(2),
    slow_operation(9),
    slow_operation(2),
    slow_operation(1),
    slow_operation(2),
    slow_operation(3),
    ]))
    for fut in done:
    print("return value is {}".format(fut.result()))

    然后用 uvloop
    Hstar
        5
    Hstar  
       2019-01-03 11:20:48 +08:00
    关键词 celery worker
    ipwx
        6
    ipwx  
       2019-01-03 11:21:43 +08:00
    不能拆成多个 celery 任务,让 celery 去管嘛?
    YuuuZeee
        7
    YuuuZeee  
    OP
       2019-01-03 11:23:38 +08:00
    @ipwx 你的意思是对 api 的调用也拆成 celery 任务嘛=-=?这个思路也有考虑
    优点是 每个任务都可以追踪
    缺点是 任务颗粒度太高,很吃存储
    wizardoz
        8
    wizardoz  
       2019-01-03 11:24:32 +08:00
    同意楼上的拆成很多任务
    ohyeah521
        9
    ohyeah521  
       2019-01-11 20:34:46 +08:00
    遇到同样的问题,一个 task 启动了,然后在 task 里面读取一个目录下面的所有文件,查找文件内容是否包含我要查找的字符串,如果不用 threading,就是一个文件一个文件的读取,效率真的很低,请问各位大佬该怎么办?


    每个文件启动一个 worker 感觉也很 low 啊
    YuuuZeee
        10
    YuuuZeee  
    OP
       2019-01-12 07:34:58 +08:00 via Android
    @ohyeah521 用 threading
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1050 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 23:24 · PVG 07:24 · LAX 15:24 · JFK 18:24
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.