python 版本:3.10.1 或 3.10.2
代码:
def main():
log_listener = setup_logging(log_filename)
e = asyncio.Event()
consumer = Consumer(e)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for sig in signals:
loop.add_signal_handler(
sig, lambda s=sig: asyncio.create_task(shutdown(s, loop, consumer))
)
tasks = consumer.run()
try:
for name, task in tasks.items():
loop.create_task(task, name=name)
loop.run_forever()
finally:
loop.close()
log_listener.stop()
consumer.run()
方法会返回一个 Dict[str, Coroutine]
类型的字典。最初以为是自己的 coroutine 实现有问题导致 high CPU 。然后将 tasks 中的 coroutine 一个个移除,最后 tasks 返回空的情况下也是 100% CPU 。
用 cProfile 看了下:
Ordered by: cumulative time
List reduced from 3038 to 10 due to restriction <10>
ncalls tottime percall cumtime percall filename:lineno(function)
423/1 0.005 0.000 44.862 44.862 {built-in method builtins.exec}
1 0.000 0.000 44.862 44.862 myscript.py:1(<module>)
1 0.000 0.000 44.219 44.219 myscript.py:54(main)
1 0.000 0.000 44.017 44.017 /home/xxx/.pyenv/versions/3.10.1/lib/python3.10/asyncio/base_events.py:582(run_forever)
3 0.000 0.000 44.016 14.672 /home/xxx/.pyenv/versions/3.10.1/lib/python3.10/asyncio/base_events.py:1806(_run_once)
3 0.000 0.000 43.983 14.661 /home/xxx/.pyenv/versions/3.10.1/lib/python3.10/selectors.py:452(select)
3 43.983 14.661 43.983 14.661 {method 'poll' of 'select.epoll' objects}
484/4 0.006 0.000 0.642 0.161 <frozen importlib._bootstrap>:1022(_find_and_load)
483/4 0.003 0.000 0.642 0.161 <frozen importlib._bootstrap>:987(_find_and_load_unlocked)
447/5 0.003 0.000 0.641 0.128 <frozen importlib._bootstrap>:664(_load_unlocked)
有 v 友遇到类似情况的吗?还是说我的用法有问题。
def _start_kafka_client(self) -> None:
logging.debug(f"[_start_kafka_client] id(event)={id(self._event)}")
i = 0
try:
while not self._event.is_set():
msg_packs = self._kafka_client.poll(
timeout_ms=1000,
max_records=5000,
)
if not msg_packs:
continue
# msgs is of type list containerd with ConsumerRecords
tp: kafka.TopicPartition
msgs: List[ConsumerRecord]
for tp, msgs in msg_packs.items():
self._data_queue.put_nowait(msgs)
i += len(msgs)
if i % 1000 == 0:
logging.info(f"count of msgs: {i}")
except asyncio.QueueFull:
logging.debug("data queue is full")
time.sleep(1)
except Exception as e:
logging.error(f"error: {e}")
finally:
logging.debug("_start_kafka_client terminates")
1
netcan 2022-04-07 17:20:05 +08:00
tasks 为空的话,`loop.run_forever()`直接就返回了,建议上完整代码,例如 Consumer
|
2
Nitroethane OP @netcan #1 已添加。就是从 kafka 接收数据,解析之后写到 es 里
|
3
makerbi 2022-04-07 18:24:09 +08:00
应该是 while 里没有 sleep 的原因吧
time.sleep(0.1)也好过完全没有 sleep |
4
jenlors 2022-04-07 18:32:26 +08:00
从 kafka 读取消息的时候 block 设置为 True ,直接阻塞循环
|
5
Nitroethane OP |
6
Richard14 2022-04-10 07:35:22 +08:00
问题太长,且缺乏最小实现,1L 代码里很多不明实现的东西,实在是不想看。而且问题给人感觉很像 AB 问题
|
7
Nitroethane OP @Richard14 #6 我寻思也没强制你看呀😁,不想看就别回复呀 :)
|
8
lolizeppelin 2022-04-12 23:35:44 +08:00
不建议 sleep 0.1 ,sleep 0.001 都不适合。
一般来说都是通过监听事件 fd 来实现 sleep 的同时能及时响应 |