用 ai 辅助修改了下代码,然后加了一些 print ,直观的了解了下执行流程
import time
import random
from collections import deque
from itertools import count
import heapq
count_id = count(1)
class Future:
def __init__(self):
self._done = False
self._result = None
self._callbacks = []
self._cancelled = False
self._id = f'Future-{next(count_id)}'
def add_done_callback(self, fn):
if self._done:
fn(self)
else:
self._callbacks.append(fn)
def set_result(self, result):
self._result = result
self._done = True
for cb in self._callbacks:
cb(self)
def __await__(self):
if not self._done:
print(f"Future {self._id} is not done, waiting...")
# 这里的 self 是一个 Future 对象, 需要在调用时使用 await 关键字
yield self
return self._result
class Task(Future):
def __init__(self, coro):
super().__init__()
self.coro = coro
print(f"任务初始化, 任务 ID: {self._id}")
loop.call_soon(
self.run)
def run(self):
try:
result = self.coro.send(None)
except StopIteration as e:
"""执行"""
print(f"任务 {self._id} 执行完毕, 结果: {e.value}")
self.set_result(e.value)
else:
if isinstance(result, Future):
result.add_done_callback(self._wakeup)
print(f"Task {self._id} is waiting for Future {result._id}")
def _wakeup(self, future: Future):
"""
This method is called when the future is done.
It schedules the task to run again.
"""
print(f"等待完成, 唤醒任务 {self._id}, 结果: {future._result}")
loop.call_soon(
self.run)
class EventLoop:
def __init__(self):
self._ready = deque()
self._scheduled = []
self._stopped = False
def call_soon(self, callback, *args):
self._ready.append((callback, args))
def call_later(self, delay, callback, *args):
heapq.heappush(self._scheduled,
(time.time() + delay, callback, args))
def stop(self):
self._stopped = True
def create_task(self, coro):
return Task(coro)
def run_forever(self):
while not self._stopped:
self.run_once()
def run_once(self):
now = time.time()
while self._scheduled and self._scheduled[0][0] <= now:
_, cb, args = heapq.heappop(self._scheduled)
self._ready.append((cb, args))
num = len(self._ready)
for _ in range(num):
# 取出一个任务, 执行它
cb, args = self._ready.popleft()
print(f"----> 执行 {cb}({args}) ---->")
cb(*args)
async def smallrun():
print("Start smallrun")
# 创建一个 Future 对象
# 代表一个将来的结果, 但是现在还不知道结果是什么
fut = Future()
print(f"Future {fut._id} created")
# 功能模拟 --- 随机延迟, 模拟 IO 操作
# IO 结束以后, 调用 fut.set_result(None)
delay = random.random()
loop.call_later(delay, fut.set_result, None)
await fut
print("End smallrun after", delay)
return delay
async def bigrun():
print("Start bigrun")
delay = await smallrun()
print("End bigrun with", delay*10)
return delay * 10
async def main_task():
print("Main task start")
result = await bigrun()
print("Final result:", result)
if __name__ == "__main__":
loop = EventLoop()
loop.create_task(main_task())
# 2.1 秒后停止事件循环
loop.call_later(2.2, loop.stop)
loop.run_forever()