V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
ray1888
V2EX  ›  Python

Python 多进程问题

  •  
  •   ray1888 ·
    ray1888 · 2017-06-20 16:44:21 +08:00 · 2383 次点击
    这是一个创建于 2713 天前的主题,其中的信息可能已经有所发展或是发生改变。

    遇到一个问题,是关于多进程队列的问题。当队列放进去的数量不能被每次新建的进程数整除的情况下,如果把多余的进程关闭并且退出创建多进程的循环。这是我现在问题上面的一个简单抽象(本质一样,免除了需求环境),把场景抽象出来的代码如下: import multiprocessing

    def printt(q): if q.empty(): pass else: data = q.get() print data

    if name == "main": q = multiprocessing.JoinableQueue() for i in range(5): q.put() while 1: for i in range(3): process = multiprocessing.Process(target=printt, arg=(q,)) process.start() process.join() q.join()

    上面代码如何才能把数字完全打印出来并且不报错

    16 条回复    2017-06-21 22:18:39 +08:00
    ray1888
        1
    ray1888  
    OP
       2017-06-20 16:45:11 +08:00
    import multiprocessing

    def printt(q):
    if q.empty():
    pass
    else:
    data = q.get()
    print data

    if __name__ == "__main__":
    q = multiprocessing.JoinableQueue()
    for i in range(5):
    q.put()
    while 1:
    for i in range(3):
    process = multiprocessing.Process(target=printt, arg=(q,))
    process.start()
    process.join()
    q.join()

    代码变形了,重新贴一遍
    kingmo888
        2
    kingmo888  
       2017-06-20 16:59:07 +08:00
    尝试给楼主美化一波
    ```
    import multiprocessing

    def printt(q):
    if q.empty():
    pass
    else:
    data = q.get()
    print data

    if __name__ == "__main__":
    q = multiprocessing.JoinableQueue()
    for i in range(5):
    q.put()
    while 1:
    for i in range(3):
    process = multiprocessing.Process(target=printt, arg=(q,))
    process.start()
    process.join()
    q.join()
    ```
    kingmo888
        3
    kingmo888  
       2017-06-20 17:00:05 +08:00
    `
    import xx
    if __name__ == '__main__':
    test()
    `
    kingmo888
        4
    kingmo888  
       2017-06-20 17:05:57 +08:00
    唉,水楼了成了。放弃。抱歉了。
    EchoUtopia
        5
    EchoUtopia  
       2017-06-20 17:48:03 +08:00
    import multiprocessing


    def printt(q):
    while 1:
    if q.empty():
    break
    else:
    data = q.get()
    print data


    if __name__ == "__main__":
    q = multiprocessing.JoinableQueue()
    for i in range(5):
    q.put(i)
    while 1:
    for i in range(3):
    process = multiprocessing.Process(target=printt, args=(q,))
    process.start()
    q.join()
    EchoUtopia
        6
    EchoUtopia  
       2017-06-20 17:55:40 +08:00
    @EchoUtopia #5 没仔细检查,没退出
    import multiprocessing


    def printt(q):
    while 1:
    if q.empty():
    break
    else:
    data = q.get()
    q.task_done()
    print data


    if __name__ == "__main__":
    q = multiprocessing.JoinableQueue()
    for i in range(5):
    q.put(i)
    for i in range(3):
    process = multiprocessing.Process(target=printt, args=(q,))
    process.start()
    q.join()
    print "over"

    不知道这个满足楼主需求不
    ray1888
        7
    ray1888  
    OP
       2017-06-20 17:58:18 +08:00
    @EchoUtopia 不行,还是会阻塞
    ray1888
        8
    ray1888  
    OP
       2017-06-20 18:00:38 +08:00
    @EchoUtopia 这个可以了,想请问一下,我看了文档还是不太懂那个 task_done 的函数作用
    EchoUtopia
        9
    EchoUtopia  
       2017-06-20 18:12:49 +08:00   ❤️ 1
    @ray1888 #8 其实就是一个 semaphores,它有个计数器,put 一次就加一,调用一次 task_done 表示当前对 queue 里面 get 到的东西处理已经完成,计数器减一,当queue里面的所有任务都完成后,锁就是释放了,join 就会取消阻塞,我自己是这样理解的,不知道会不会误导你。。
    ray1888
        10
    ray1888  
    OP
       2017-06-20 20:22:22 +08:00
    @EchoUtopia 你的理解是对的。我刚刚认真看了官方文档也是类似这样描述的。翻译不太好,但是意思跟你的理解差不多。

    task_done()
    Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
    If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
    Raises a ValueError if called more times than there were items placed in the queue.
    表明之前入队的任务完成(即现在出栈的任务)。使用队列消费者线程。当每个 get 方法获取一个任务,随后调用 task_done 告诉队列 运行在那个线程上的任务已经完成
    当线程当前拥塞,当所有任务(个体)完成处理后,会继续(意味着 task_done 回调 是接收每个个体(任务)被放置到队列里面)
    当被调用的次数多余队列中的个体数,会提出 Raise ValueError 值错误。
    join()
    Block until all items in the queue have been gotten and processed.
    The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
    阻塞直到队列里面所有的任务(个体)被获取并且被处理。
    当队列里面被放入一个个体(任务)时,未完成任务数会+1,这个计数会随着一个消费进程调用 Task_done 去表示那个个体(任务)已经被取回,并且所有的队列里面的任务都已经完成。当未完成计数=0 时,队列不在阻塞
    araraloren
        11
    araraloren  
       2017-06-21 09:15:04 +08:00
    @kingmo888 python 的缩进 + V2EX 的格式 = 让人绝望。。
    ray1888
        12
    ray1888  
    OP
       2017-06-21 09:17:12 +08:00
    @araraloren 其实如果 V2 可以像 stackoverFlow 上面那样,一键把代码全部格式化,那挺好的
    araraloren
        13
    araraloren  
       2017-06-21 10:14:09 +08:00
    @ray1888 ...不 我只求不要把原来 的格式破坏了 这样就谢天谢地了。。
    atempcode
        14
    atempcode  
       2017-06-21 10:47:33 +08:00
    不应该判断 q.empty() 来决定是不是退出吧,应该是去 catch queue.empty() 这个 exception。
    ray1888
        15
    ray1888  
    OP
       2017-06-21 11:05:45 +08:00
    Multiprocessing.JoinableQueue 的 empty 方法只返回 True 和 False,没有 exception 产生
    atempcode
        16
    atempcode  
       2017-06-21 22:18:39 +08:00
    @ray1888
    1. q.empty() 由於多綫程 /多進程的特性,是不可相信的
    empty()
    Return True if the queue is empty, False otherwise. Because of multithreading/multiprocessing semantics, this is not reliable.
    2. 所以判斷 queue 裏面有沒有元素不能用 q.empty()來判斷
    3. 推薦的辦法是一直 get(False),直到抓住 queue.empty() 这个 exception。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1084 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 19:09 · PVG 03:09 · LAX 11:09 · JFK 14:09
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.