使用 hashlib 获取摘要的时候明显代码瓶颈在单核 cpu 上
profile 显示主要都在各个 hashobj 的 update()方法耗时最长
我看官方文档里有这么一句话:
Note For better multithreading performance, the Python GIL is released for data larger than 2047 bytes at object creation or on update.
然而并没有发现实际起作用 即 GIL 没有被释放 占用率仍然是 100%cpu
验证了不是 io 瓶颈 python3.9.2
总算写完了 感谢#1回复
是我把GIL想错了
陷入了cpu占用率100%就想多进程的误区
GIL是线程的概念 某线程释放了GIL之后其他线程也可以使用
而不是说主线程释放GIL之后就会接着主线程之后的代码
想成async类似的机制了
由于附言超字数了 引用库和线程启动的代码就略去了
BUF_SIZE = 4 * 1024 * 1024 # 4MiB
BUF_PRE_FETCH = 64 # 最多消耗 256MiB 额外内存
def cal_hashes(f: typing.IO):
f.seek(0)
res = {
'finish': False,
}
queues = {
'md5': Queue(),
'sha1': Queue(),
'sha256': Queue(),
}
def _producer(_f: typing.IO):
while True:
time.sleep(0.01)
empty_flag = True
for _, queue in queues.items():
if not queue.empty():
empty_flag = False
break
if empty_flag:
count = BUF_PRE_FETCH
while count > 0:
count -= 1
content = _f.read(BUF_SIZE)
if not content:
res['finish'] = True
return
for _, queue in queues.items():
queue.put_nowait(content)
# 合理的相信算完256M的数据
# 至少需要这么久
# 树莓派4: 120MiB/s
# 8代i5: 370MiB/s
time.sleep(0.3)
def _consumer(_algo: str):
hash_obj = hashlib.new(_algo)
while True:
if res['finish'] and queues[_algo].empty():
break
try:
content = queues[_algo].get(timeout=1)
except Empty:
continue
hash_obj.update(content)
queues[_algo].task_done()
res[_algo] = hash_obj.hexdigest().upper()
return res
1
LeeReamond 2022-04-17 17:29:48 +08:00 1
hashlib 是通过 ffi 调用实现的,不需要多进程,直接使用多线程即可释放 GIL ,你说不能释放 GIL 我感觉是你哪里错了。
|
2
Licsber OP @LeeReamond #1 感谢 我刚思考了一会好像理解我哪里想错了
我现在用 threading 库实现一下 过会贴代码 |
3
Licsber OP 终于写完了 单测也测完了 至少我是很满意的
```python3 BUF_SIZE = 4 * 1024 * 1024 # 4MiB BUF_PRE_FETCH = 64 # 最多消耗 256MiB 额外内存 def cal_hashes(f: typing.IO): f.seek(0) res = { 'finish': False, } queues = { 'md5': Queue(), 'sha1': Queue(), 'sha256': Queue(), } def _producer(_f: typing.IO): while True: time.sleep(0.01) empty_flag = True for _, queue in queues.items(): if not queue.empty(): empty_flag = False break if empty_flag: count = BUF_PRE_FETCH while count > 0: count -= 1 content = _f.read(BUF_SIZE) if not content: res['finish'] = True return for _, queue in queues.items(): queue.put_nowait(content) # 合理的相信算完 256M 的数据 # 至少需要这么久 # 树莓派 4:120MiB/s # 8 代 i5: 370MiB/s time.sleep(0.3) def _consumer(_algo: str): hash_obj = hashlib.new(_algo) while True: if res['finish'] and queues[_algo].empty(): break try: content = queues[_algo].get(timeout=1) except Empty: continue hash_obj.update(content) queues[_algo].task_done() res[_algo] = hash_obj.hexdigest().upper() ths = [] producer = threading.Thread(target=_producer, args=(f,)) producer.start() ths.append(producer) for algorithm in queues.keys(): consumer = threading.Thread(target=_consumer, args=(algorithm,)) consumer.start() ths.append(consumer) for th in ths: th.join() return res ``` |
4
JSPIXiaoHei 2022-04-18 10:12:52 +08:00
写的是 Forensic 工具?
|
5
Licsber OP @JSPIXiaoHei #4 不是( 没这么高级
|