需求是服务端单向向客户端(浏览器)推送消息。由于会出现同一用户同时登录着多个客户端的情况,希望能把消息同时推送给所有的终端。所以我是这么做的:
async def handler(request):
admin = Admin(request)
ws = web.WebSocketResponse()
await ws.prepare(request)
request.app['websockets'][admin.id].add(ws)
try:
while True:
msg = await admin.get_unread_msg(admin.id)
for ws in request.app['websockets'][admin.id]:
await ws.send_json({'data': msg})
finally:
request.app['websockets'][admin.id].discard(ws)
await ws.close()
return ws
question:这么做有没有问题 = =
然后消息队列用的是 redis + aioredis,上面的 get_unread_msg 我是这么做的:
async def get_unread_msg(self, admin_id):
while True:
data = await self.redis.lpop(f'msgs:{admin_id}')
if data:
return data
await asyncio.sleep(0.1)
初始化 redis:
async def setup_redis(app):
redis_url = app['config']['REDIS_URL']
app['redis'] = await aioredis.create_redis_pool(redis_url)
yield
app['redis'].close()
await app['redis'].wait_closed()
question:我在这里使用 blpop 会阻塞其它请求,这是为什么 = =
至于哪里来的其它请求,是我写了另一个请求用来添加消息:
async def add_msg(request):
redis = request.app['redis']
msg_id = await redis.incr('msg_id')
import random
msg = {
'id': msg_id,
'text': f'hello~{msg_id}'
}
await redis.rpush('msgs', json.dumps(msg))
这里会把消息都写进一个大列表里,所以还有一个后台任务做消息分发:
@register_background_tasks
async def distribute_msgs(app):
redis = app['redis']
while True:
data = await redis.lpop('msgs')
if data:
json_data = json.loads(data)
to_admin = redis.rpush(f'msgs:{json_data["admin_id"]}', data)
await asyncio.gather(to_sa, to_admin)
await asyncio.sleep(0.1)
这里换成 blpop 也会阻塞请求= =
感谢大佬们读到这里,最后一个问题。。
如果我用 gunicorn 开启了多个 worker,会出现同一用户的多个请求发送到了不同的进程上,就没法保证所有连接都收到消息了。这种情况该怎么办?
1
gjquoiai OP o(╥﹏╥)o 路过的大佬请伸出你们的援手~
|