V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX 提问指南
black11black
V2EX  ›  问与答

Python 设计一个能够自动释放未占用的资源的线程池

  •  
  •   black11black · 2020-12-27 21:19:18 +08:00 · 1568 次点击
    这是一个创建于 1470 天前的主题,其中的信息可能已经有所发展或是发生改变。

    关联问题: https://www.v2ex.com/t/739399

    问题描述:Python 默认的 concurrent.futures.ThreadPoolExecutor 这个线程池,不具备自动释放资源的功能。也就是该线程池有两个问题,其一是当线程池中线程数量未达到设计上限的时候,每次新增任务都会创建新的线程池,不会利用旧的,即使旧的线程已经空闲。其二是线程池中线程数量只能增加不能减少,即使线程已经空闲也不会释放,导致占用过多系统资源。

    的确现在折腾线程这种东西没什么意思,毕竟 python 已经全面进入异步宇宙了,基本能用 IO 复用的都不会用线程。但是不妨碍偶尔还是会用到线程池,而且很好用。

    比如我遇到的这个场景:Github 找了一圈,没找到什么比较成熟的 Python 可以异步连接 Oracle 的方案,那么要如何将 Oracle 接入 python 的异步生态呢?显然好办法是直接用线程封装原先的同步阻塞库,写个几十行代码,基本就堪用了。同理,如果以后遇到 mongodb 没有连接库啊,或者是什么新的数据库,比如各种时序数据库没有异步 api,都可以用这个方案。

    ==================================================================

    所以一个好用的线程池显得很重要。

    上个问题中几个回答都语焉不详的,还是 v 友不给力,只能自己去看了一下源码。

    concurrent.futures.TreadPoolExecutor 的目录在 Lib\concurrent\futures\thread.py

    里面还用到一个 SimpleQueue,来自 Lib\queue.py ,这个包又引用了 DLLS\_queue.pyd ,这个文件就不能打开看了,再往前看得翻 C 了,实在没那个兴趣。

    还有就是引用了 threading 里的 Thread

    大概翻了翻,TreadPoolExecutor 的实现挺简单的,每次新增任务都新创建一个线程。该线程利用 threading.Thread 的标准接口,但是目标函数并不是用户的业务函数,而是在其基础上加了一层封装。该封装输入一个 SimpleQueue,内部逻辑是 While True 循环,执行到 SimpleQueue.get()时阻塞,如果 queue 中有新任务,就提取出来执行,否则一直阻塞。以此实现一个 FIFO 的线程池。

    由于存储线程使用的数据结构是集合,没有删除功能,故一开始就没考虑释放线程的问题。其二是由于使用 queue 和循环来实现 FIFO,每个任务并不能对应到具体的线程,故无法根据任务结束与否对线程进行摧毁或是其他什么操作。

    ==================================================================

    大概想了一下,如果要实现两个目标:其一是线程可以复用,当有空闲线程时不创建新线程。其二是线程自动摧毁,当某线程已经空闲一段时间以后将其关闭,感觉还是可以实现。想了个简单思路,大家给参谋参谋。

    我的大概想法就是创建线程池的时候直接创建一个守护线程(同步)用来管理线程池状态,每隔固定时间唤醒一次,比如一分钟唤醒一次,检查检查线程池中每个线程已经空载多长时间了,如果空载比如五分钟,那就关闭该线程。

    判断线程是否空载的方法,大概是利用 python threading.Thread 实现时的 future,这个对象可以追踪完成状态,以实现业务任务与线程的对应?

    目前感觉直接修改 python 內建库的话应该能把上述思路写出来。但是一般使用的时候基本是依靠第三方库,我还没太想好怎么搞,很多依赖关系都是隐式的,不开放导入。

    15 条回复    2020-12-29 15:46:08 +08:00
    lpts007
        1
    lpts007  
       2020-12-27 21:36:32 +08:00
    https://github.com/ydf0509/threadpool_executor_shrink_able


    看见你另外一个帖子,我去搜了下,好像是你想要的。就是 shrink_able 写法有点脏.....
    renmu123
        2
    renmu123  
       2020-12-27 23:38:16 +08:00 via Android
    我记得好像有异步的 MySQL 的实现,你去看看咋做的
    black11black
        3
    black11black  
    OP
       2020-12-28 04:35:04 +08:00 via Android
    @renmu123 aiomysql 重写协议了。异步基本几种做法,一种完全原生重写,一种猴子补丁,我没用过,不知道能不能跟原生接口的兼容,再一种像我这样封装线程
    black11black
        4
    black11black  
    OP
       2020-12-28 09:24:46 +08:00
    @lpts007 感谢,看了一下代码,启发很大。感觉实现的并不脏,逻辑上没什么冗余,就是写法不太好看而已。我根据原生库重写了一份,已经传 Github 了 https://github.com/GoodManWEN/ThreadPoolExecutorPlus
    lpts007
        5
    lpts007  
       2020-12-28 10:15:14 +08:00 via Android
    我是说单词写法,他把单词给拆了
    black11black
        6
    black11black  
    OP
       2020-12-28 10:17:06 +08:00
    @lpts007 绝了,但他这个库我发现锁里有一个问题,容易导致死锁。在超时之后,结束阻塞之后,催回线程之前,默认线程都还是激活的,不知道怎么解决。
    black11black
        7
    black11black  
    OP
       2020-12-28 13:59:40 +08:00   ❤️ 1
    @lpts007 这个作者也是逗比,我去他那里 issue,他根本都没看懂我说的问题是什么,这 repo 真的是他写的么。。
    lpts007
        8
    lpts007  
       2020-12-28 15:16:38 +08:00
    @black11black 我又给他描述了一遍。老哥水平很高啊,一眼就看出来这种问题了。
    lpts007
        9
    lpts007  
       2020-12-28 15:20:15 +08:00
    不过他语气还算正常,就是一开始没看懂你说的啥而已,老哥别生气
    black11black
        10
    black11black  
    OP
       2020-12-28 15:25:05 +08:00
    @lpts007 谢谢你,其实也没咋生气,其实还行。主要是第一遍描述他上来就很不客气,我也很不客气地回了一下,怕他没听懂就又描述了一遍,结果他还是没听懂,我无语顺便吐槽。

    他这个遇到的问题我以前还真没遇到过,完全不知道怎么解决。之前以为线程锁啥的挺单纯的东西,线程之间逻辑理顺了不会出什么问题。但是他这个现在类似于事件驱动的回调,没有办法按顺序执行的方法加锁,在异常抛出的一瞬间已经没有监听功能了,但是不能马上调整统计状态。不知道怎么解决,也不知道怎么跟别人讨论,甚至不知道怎么跟别人描述。。
    black11black
        11
    black11black  
    OP
       2020-12-28 16:09:14 +08:00
    大概翻了一下,目前结论基本以我的技术力来说是解决不了了。他这个问题解决办法只能是进入回调当中修改,在线程抛出异常之前修改计数。翻了翻,SimpleQueue 引用自 DLLS\_queue.pyd 来自源码包里的 _queuemodule.c.h 和 _queuemodule.c, 如果用纯 python 实现的话,关键的计时唤醒功能来自 threading.Semaphore() , 里面的实现调用的內建库 _thread, 这个库没有__file__属性,找不到源码,具体细节就涉及 python 本身编译了,以前没搞过,看不懂了。

    解决方法应该是要传入参数,在回调前调用这个统计计数减,并且在 concurrent.futures.ThreadPoolExecutor 调用 _adjust_thread_count 时,其中的判断语句也要加锁。完成这两个步骤应该可以解决问题,但编译的部分搞不定,算了。

    目前能做的大概是增大最小线程数量,随时保有一定线程数的话可以减小故障发生概率,只要不是所有既存线程全部满载,都会用空载线程推动前进。
    lpts007
        12
    lpts007  
       2020-12-28 21:41:22 +08:00
    我在这里回你吧,改了源码,放大了问题。否则确实没法控制时间片。运行结果表明剩余任务数一直不变了。
    https://gist.github.com/lypro09539/fd10fa057964d42ff25ee9cea9cdc47f

    Java 的 ThreadPoolExecutor 源码应该是答案,我去看看。
    black11black
        13
    black11black  
    OP
       2020-12-28 23:29:49 +08:00
    @lpts007 确实,直接 sleep 就行了。我脑袋里一直在想单个语句拆解成多个 byte code 解释,限制了思路
    black11black
        14
    black11black  
    OP
       2020-12-28 23:33:04 +08:00
    @lpts007 他这个解决方案就是问题发生在回调的位置,比如到时间以后 queue 抛出异常,只需要在抛出异常之前修改计数就可以了,没什么复杂的。只是 python 这个回调都是用 c 实现的,不好修改。
    beifengzhishen
        15
    beifengzhishen  
       2020-12-29 15:46:07 +08:00
    @lpts007
    @black11black

    写好了就加个 q,我要引用你的 pool,
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3007 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 14:09 · PVG 22:09 · LAX 06:09 · JFK 09:09
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.