V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
Monad
V2EX  ›  Python

如何优雅的通知 multiprocessing.Pool 中的进程退出?

  •  
  •   Monad · 2018-04-15 00:21:28 +08:00 · 7332 次点击
    这是一个创建于 2407 天前的主题,其中的信息可能已经有所发展或是发生改变。

    目的是想在主进程中随时终止子进程的执行 目前的代码长这样 有个问题是如果我 Ctrl-C 的话 所有的子进程虽然会正常退出 但是主进程会一直挂起在 pool.join()上 求解决方案~

    #!/usr/bin/env python2
    
    from __future__ import print_function
    
    import time
    import signal
    import logging
    from multiprocessing import Manager
    from multiprocessing.pool import Pool
    from multiprocessing.queues import Queue
    
    
    def Fn(n, q, ns):
      if ns.done:
        return
      try:
        q.put(n)
      finally:
        return
    
    
    def main():
      handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
      pool = Pool(processes=5)
      signal.signal(signal.SIGINT, handler)
    
      total = 10000
      manager = Manager()
      q = manager.Queue()
      ns = manager.Namespace()
      ns.done = False
    
      for n in range(total):
        pool.apply_async(Fn, args=(n, q, ns))
      pool.close()
      try:
        received = 0
        while received != total:
          n = q.get(60)
          print('get {} from queue'.format(n))
          received += 1
      except KeyboardInterrupt:
        ns.done = True
        pass
      finally:
        pool.join()
    
    
    if __name__ == '__main__':
      main()
      
    
    15 条回复    2018-09-12 15:17:33 +08:00
    lolizeppelin
        1
    lolizeppelin  
       2018-04-15 09:37:48 +08:00
    openstack 里缩水的代码, 稍微简化了下
    https://github.com/lolizeppelin/simpleservice/blob/master/simpleservice/base.py

    看懂了就知道怎么处理了
    lolizeppelin
        2
    lolizeppelin  
       2018-04-15 09:41:28 +08:00
    def _pipe_watcher(self):
    # This will block until the write end is closed when the parent
    # dies unexpectedly
    self.readpipe.read(1)

    LOG.info('Parent process has died unexpectedly, exiting')

    if self.launcher:
    self.launcher.stop()
    sys.exit(1)
    Monad
        3
    Monad  
    OP
       2018-04-15 12:11:59 +08:00
    @lolizeppelin #2 这段代码在主进程创建 pipe, 然后设置进程退出时的 close fd, 通过这种方式来通知子进程 read 返回吧
    但是我现在是 hang 在了主进程的 join 呢 而且目前来看 子进程(应该)是都正常退出了 主进程没有返回
    lolizeppelin
        4
    lolizeppelin  
       2018-04-15 12:18:26 +08:00 via Android
    pipe 是主进程退出 子进程也收到能退出 是个退出保险

    主进程里确认子进程退出用 waitpid
    lolizeppelin
        5
    lolizeppelin  
       2018-04-15 12:24:14 +08:00
    啥叫“应该” 子进程是否结束是可以看到

    multiprocessing 我记得默认是用 socket 来父子进程通信的
    join 里应该是取了 socket 数据 并 wait 子进程结束

    好好看看处理信号的部分就知道怎么让子进程 exit 了

    当然如果你代码是 win 上的当我上面的的都没说
    Monad
        6
    Monad  
    OP
       2018-04-15 12:36:38 +08:00
    @lolizeppelin #5 我的意思是 ##应该正常##退出了 子进程是肯定退出了 ps 看过 至于是不是异常退出的 并没有确定
    如果不用 Pool 手动创建管理的话 waitpid 应该是 OK 的
    但是用 Pool 的话 惯用法应该不会去 os.getpid 然后手动 waitpid 吧
    lolizeppelin
        7
    lolizeppelin  
       2018-04-15 13:12:41 +08:00 via Android
    你代码有问题 子进程是不是正常退出的都不知道 直接 try 包一层都好啊

    子进程有信号处理没
    主进程收到 ctrl c 信号以后 给所有子进程发终止信号不就行了
    lolizeppelin
        8
    lolizeppelin  
       2018-04-15 13:15:00 +08:00 via Android
    打日志 好歹你要知道子进程怎么退出的
    Monad
        9
    Monad  
    OP
       2018-04-15 13:18:04 +08:00
    @lolizeppelin #7 不是子进程正不正常退出的问题 这只是一个描述问题 不用纠结 我用 try 能保证它退出 并且我上面贴的代码就是这样了

    现在的问题是 子进程全部退出了 父进程仍然在 pool.join()没有返回 这个问题
    Monad
        10
    Monad  
    OP
       2018-04-15 13:19:48 +08:00
    @lolizeppelin #8 按照我的理解 如果用 C 的做法实现 pool.join 子进程无论怎么正常 /异常退出 父进程都应该能够通过 waitpid 感知到子进程退出 所有退出之后 join 就可以返回了 所以 python 现在这个现象让我很不理解
    lolizeppelin
        11
    lolizeppelin  
       2018-04-15 14:45:04 +08:00
    看了下, 和父子进程一点关系都没.....

    自进程
    ns.done 没有捕获异常只是小问题

    主要在这 3 有问题
    manager = Manager()
    q = manager.Queue()
    ns = manager.Namespace()

    要解决得慢慢折腾里面代码 我随便弄了下不想弄了, 折腾 multiprocessing 不如自己写多进程代码还好控一点
    lolizeppelin
        12
    lolizeppelin  
       2018-04-16 10:13:19 +08:00
    @Monad
    大致搞定了 给你代码弄蒙了

    一开始叫你看信号是没错的,你信号用错了.................

    except KeyboardInterrupt 这是不对的,你注册正确的拦截信号以后,是不会收到这个错误的

    信号要处理 2 次,一次是在 fork 前,就是 multiprocessing 创建任务之前,拦截 SIGINT,拦截执行内容
    def empty(signo, frame):
    print 'do nothing!!!'
    这里的目的是让 multiprocessing 里的代码不会因为收到 SIGINT 抛出异常

    第二次处理时在 fork 后,拦截内容
    def stop(signo, frame):
    print 'stoped'
    ns.done = True

    这里拦截到信号以后设置 ns.done
    lolizeppelin
        13
    lolizeppelin  
       2018-04-16 10:16:43 +08:00
    顺便...这个 pool 用的有点问题 Fn 返回后还会生成新的 Fn 塞进去....具体你看看怎么停掉 pool 我就不看了
    fool079
        14
    fool079  
       2018-04-16 10:17:30 +08:00
    我印象中 setDaemon=True
    也就是设为守护进程就可以随着父进程的关闭而关闭了。。
    itfanr
        15
    itfanr  
       2018-09-12 15:17:33 +08:00
    参考 Samba 和 ctdb 代码吧
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2900 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 07:32 · PVG 15:32 · LAX 23:32 · JFK 02:32
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.