V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
ray1888
V2EX  ›  Python

异步与多 worker 的问题

  •  
  •   ray1888 ·
    ray1888 · 2017-10-24 11:08:57 +08:00 · 1689 次点击
    这是一个创建于 2643 天前的主题,其中的信息可能已经有所发展或是发生改变。

    现在在写一个服务端的程序,主要有三个操作,1.是等待网络请求接收数据(使用 socket 而不是 http ),2.是对数据转格式,第三个把整理好格式的数据写入数据库中。现在是想把 1,2,3 都写到一个协程中,然后开多个 worker 来进行处理,怎样可以使协程开出多个 worker 来应对多并发呢?目前打算使用 asyncio,如果不支持可以考虑 tornado,求各位看看有什么解决方法

    2 条回复    2017-10-28 00:28:59 +08:00
    hcnhcn012
        1
    hcnhcn012  
       2017-10-24 11:43:36 +08:00 via iPhone
    Twisted
    hook923
        2
    hook923  
       2017-10-28 00:28:59 +08:00
    我有个类似的做法,我是这样解决的。
    因为我是多个线程共享一个数据库连接,每个线程都 execute,最后一起提交数据库。因此我在 savedata 中加了个锁

    import threading
    lock = threading.Lock()

    from concurrent.futures import ThreadPoolExecutor

    max_workers=64
    sock_pool =ThreadPoolExecutor(max_workers=max_workers) #注意这 3 个 max_workers 不必都相同的
    chgdata_pool = ThreadPoolExecutor(max_workers=max_workers)
    chgdata_future = []
    savedata_pool = ThreadPoolExecutor(max_workers=200)
    savedata_future = []

    def sock(参数):
    接收 shock 数据的代码
    chgdata_future.append(chgdata_pool.submit(chgdata,参数) ) ##异步委托一个清洗数据的函数 chgdata
    其它代码
    def chgdata(参数):
    清洗数据的代码
    savedata_future.append(savedata_pool.submit(savedata,参数)) ##异步委托一个保存数据的函数 savedata
    其它代码
    def savedata(参数):
    保存语句生成
    lock.acquire() #加个互斥锁
    保存到数据库
    lock.release() #释放锁
    其它代码

    if __name__ == '__main__':
    执行 sock()之前的代码
    sock_future = sock_pool.submit(sock,参数) for 参数 in 列表] ## 多个 sock 接收数据
    for f in sock_future:
    f.result()
    for f in chgdata_future:
    f.result()
    for f in saveda'ta_future:
    f.result()

    conn.commit() #。这步你可以视你的实际需求放在 savadata 中。

    每个 sock 接收数据后传递给 chgdata,不必等待 chgdata。每个 chgdata 清洗数据后传递给 savedata,不必等待 savedata。
    这应该是楼主想要的效果
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1010 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 30ms · UTC 23:03 · PVG 07:03 · LAX 15:03 · JFK 18:03
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.