用 asyncio 做了一个 UDP 传输性能测试工具,目前单进程服务端性能不够,流量大的时候处理不过来,服务端用的 asyncio.DatagramProtocol,怎么变成多进程的呢?试试了一下抢占式的写法,运行报错了,运行起来也只有一个进程工作,上代码
import asyncio
import time
import socket
from multiprocessing import Process
loop = asyncio.get_event_loop()
size = 0
class ServerProtocol(asyncio.DatagramProtocol):
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr, args=None):
global size
data = data.decode()
message = data
index = data.find('\n')
if index > 0:
filename = data[0:index]
data = data[index+1::]
size += len(data)
task = self.WriteToFile(filename, data)
asyncio.run_coroutine_threadsafe(task, loop)
async def WriteToFile(self, f, data):
await asyncio.sleep(1)
return True
async def print_size():
global size
while True:
await asyncio.sleep(1)
print(size)
def start(sock):
listen = loop.create_datagram_endpoint(
ServerProtocol,
sock = sock
)
transport, protocol = loop.run_until_complete(listen)
task = print_size()
asyncio.run_coroutine_threadsafe(task, loop)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
transport.close()
loop.close()
sock.close()
if __name__ == '__main__':
print("Starting UDP server")
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('0.0.0.0', 9873))
for i in range(1):
t = Process(target=start, args=(sock,))
t.deamon = True
t.start()
start(sock)
报错信息
Exception in callback BaseSelectorEventLoop._add_reader(6, <bound method..., bufsize=0>>>)
handle: <Handle BaseSelectorEventLoop._add_reader(6, <bound method..., bufsize=0>>>)>
Traceback (most recent call last):
File "/usr/local/python36/lib/python3.6/asyncio/selector_events.py", line 264, in _add_reader
key = self._selector.get_key(fd)
File "/usr/local/python36/lib/python3.6/selectors.py", line 191, in get_key
raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/python36/lib/python3.6/asyncio/events.py", line 145, in _run
self._callback(*self._args)
File "/usr/local/python36/lib/python3.6/asyncio/selector_events.py", line 267, in _add_reader
(handle, None))
File "/usr/local/python36/lib/python3.6/selectors.py", line 412, in register
self._epoll.register(key.fd, epoll_events)
FileExistsError: [Errno 17] File exists
1
lieh222 OP 这。。。咋没人呢
|
2
lolizeppelin 2018-08-07 09:00:04 +08:00 via Android
把 fork 搞明白
一多进城就 multiprocessing 有没有想过要看 multiprocessing 的源码? |
3
lieh222 OP @lolizeppelin 感谢回复,已经解决,因为多进程中共用了一个事件循环,add_reader 重复注册了 socket.fileno,所以报错了,用 new_event_loop 解决,与 fork、mutiprocessing 的用法和是否看了源码没有关系,另外我认为像 fork,mutiprocessing 这种系统接口函数和系统接口的高级封装直接用就是了,除非遇到相关必须要解决的问题和抱着学习的目的,没有必要看这种源码
|