之前自己遇到简单的需求需要异步并发的时候,经过 v2 大佬的指点之后用的 asyncio.Queue + aiohttp 搞定了,大概步骤是这样:(在下不是专业程序员,可能写的东西看起来像玩具,大佬们见谅)
async def worker(session):
while True:
uri = await queue.get()
uri, status_code = await delete_url(uri, session)
......
async def delete_url(uri, session):
......
async with session.delete(url, headers=headers) as response:
status_code = response.status
return uri, status_code
async def main():
async with aiohttp.ClientSession() as session:
tasks = []
for _ in range(max_worker):
task = asyncio.create_task(worker(session))
......
早几天看到有个大佬编的那个 [ Python 潮流周刊] 里面推荐一个异步队列 saq
github 地址: https://github.com/tobymao/saq
由于没玩过异步任务队列,就很想试试,结果遇到这样一个问题
async def delete_url(ctx, *, uri, session):
async with session.delete(url, headers=headers) as response:
status_code = response.status
return uri, status_code
settings = {
"functions": [delete_url],
"concurrency": 50
}
async def main():
async with aiohttp.ClientSession() as session:
with open(filename, 'r') as f:
for line in f:
uri = line.strip()
job = await queue.enqueue("delete_url", uri=uri, session=session)
session=session 不能这样传参,因为 saq 的这个 queue.enqueue 只能接收可序列化的作为参数, 而 aiohttp.ClientSession 不是一个可以 JSON 序列化的。这个 session 又不能写全局变量,要 session 共享的话,又不能写在 delete_url 函数里面,想问问大佬,这种情况要咋处理啊?
1
NessajCN 319 天前
写个类,session 是成员变量,delete_url 是成员函数
|
2
fzzff 319 天前
session 为什么不能写为全局变量?
|
3
Vegetable 319 天前 1
草草看了一下这个 saq ,他通过 redis 通信,和你本身实现的异步并不搭配,如果你之前能用 asyncio.queue ,证明你本来只是一个单进程的程序,这时候你在程序内部共享 session 是不错的做法。
但是 saq 通过 redis 通信,就意味着他的设计目的是实现分布式结构,分布式想共享对象就需要序列化了。但分布式之下共享 session 这个事儿其实没什么必要做了,至少不是一个好的方案。 |
4
ClericPy 318 天前
消费者那边每个进程共享一个 Session
|