V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
Tdy95
V2EX  ›  程序员

[求助] Python 调用 dll 的时候发现代码运行被阻塞了?

  •  
  •   Tdy95 ·
    mydaoyuan · 25 天前 · 2097 次点击

    业务说明

    使用 ctypes 对接 C++的 dll 接口,封装为 websocket 接口方便网络调用。使用了 Redis 的 PUB/SUB 来监听服务端主动发起的消息通知。

    使用了 asyncio 库做异步任务来处理 websocket 的请求通知,使用 aioredis 来进行管理 Redis 连接。

    案例代码

    入口代码如下:

    async def main():
        redis = aioredis.from_url(f'redis://:{redis_password}@{redis_host}:{redis_port}', encoding='utf-8')
        redis_task = asyncio.create_task(redis_subscriber_start_websocket(redis))
        server_task = websockets.serve(msdk_handler, "0.0.0.0", 8765)
        asyncio.create_task(monitor_loop())
        print("WebSocket 服务器启动在 ws://localhost:8765:redis 地址: ", redis_host, redis_port)
        await asyncio.gather(server_task, redis_task)
        
    
    if __name__ == "__main__":
        asyncio.run(main())
    
    

    大致的 websocket 处理流程如下:

           try:
                async for message in websocket:
                    data = json.loads(message)
                    # print(f"收到 websocket 消息: {data}")
                    await messageHandler(data, connected[random_id_str])
                print("WebSocket 连接正常关闭")  # 添加这行来确认循环是否正常结束
                await clearnWebscoket(random_id_str)
                
         
       async def messageHandler(data, connected):
          if data['action'] == 'something':
          # 切换位置
          results = await change_something(connected['client_id'], data)
          await connected['websocket'].send(json.dumps(results))
    
    	async def change_something(client_id):
    	    future = asyncio.Future()
    	    futures_change_chara_pos[client_id] = future
    	    # 调用 DLL 中的更改角色位置函数
    	    sdk.SDK_change_something(c_char_p(client_id.encode('utf-8')),callback_instance)    
    	    while not future.done():
    	        await asyncio.sleep(0)
    	    result = await future
    	    return result
    	    
    在 callback_instance 回调中,我会设置 future 的内容。
    
    def set_futures_status(client_id, futures_obj, data):
        if client_id in futures_obj:
            future = futures_obj.pop(client_id)
            if not future.done():
                future.set_result(data)
    

    问题

    对项目的异步和 Redis 消息有点不理解。

    1 、有一个播放音频的 C++接口,会频繁触发回调函数,我发现在播放音频的时候,接收了其他的 websocket 消息,发现好像没有处理,处理函数的日志没有打印出来 [偶现正常] 。不知道是不是我异步有问题,阻塞了主线程导致的。(作为一个前端开发,python 的异步好难懂,大佬们有推荐的书籍吗)

    2 、Redis 消息一会后,就不会接收到订阅的消息。 消息的发布和接收频道一致,并且重启 python 后服务正常。

    期望

    能帮忙解决一下目前碰到的 2 个问题,特别是异步的代码,我不知道自己写的是否正确。

    	    while not future.done():
    	        await asyncio.sleep(0)
    	    result = await future
    

    上面的代码就是我发现 await future 后会一直阻塞主进程代码运行。所以在 AI 的提示下,加上了代码后正常运行。但是 websocket 消息处理中的 await 又不会阻塞整个主进程运行,就很疑惑。

    报酬

    目前预算 500 ,可谈。代码地址: https://github.com/mydaoyuan/pythonAndDll

    联系方式:d29zaGl0ZHkxMjM0NTY=

    18 条回复    2024-08-29 11:57:55 +08:00
    zombiecong
        1
    zombiecong  
       25 天前   ❤️ 1
    python 的异步并不是多线程和多进程,需要并发还要用 ThreadPoolExecutor 或者 ProcessPoolExecutor
    https://docs.python.org/3/library/concurrent.futures.html
    mightybruce
        2
    mightybruce  
       25 天前   ❤️ 1
    调用 C++的 dll 接口 是会阻塞的,
    asyncio 是协程,线程阻塞了, 协程肯定是会阻塞的,

    sdk 相关的代码单独测试吧,可以搞一个并发多进程队列, 发数据扔给队列就继续处理接受数据,sdk 相关的代码不断从队列里面取数据。
    pursuer
        3
    pursuer  
       25 天前   ❤️ 1
    如果在使用 asyncio 的过程中使用了多线程回调时需要注意,asyncio 中很多 API 并不是线程安全的,不能跨线程调用

    比如 set_futures_status 这个函数就应该通过如下模式调用

    asyncio.run_coroutine_threadsafe(set_futures_status(a1,a2,a3),eventloop)
    pursuer
        4
    pursuer  
       25 天前   ❤️ 1
    顺便一提,此时 set_futures_status 应该改成 async def

    eventloop 是你创建 future 的线程的 eventloop ,可以通过 get_event_loop()获取
    这样可以解决无法用 future 唤醒导致的阻塞的问题。

    while not future.done():
    --await asyncio.sleep(0)
    result = await future
    这种写法就是个坑,相当于轮循,完全没发挥协程的优势,不建议使用这种写法
    zagfai
        5
    zagfai  
       25 天前   ❤️ 1
    @pursuer await sleep 并没有问题,在有其他协程需要用 cpu 的时候主动让出并不影响多少性能
    fds
        6
    fds  
       25 天前   ❤️ 1
    python 感觉还是写阻塞的代码比较流畅,异步是后面塞进来的,得对底层多一些了解。阻塞的逻辑得单独放在个线程里处理。要异步不能直接用 nodejs 吗?虽然 nodejs 写不好也可能阻塞,但毕竟设计之初就是异步模式,大部分常用 IO 也都包装好了。前端上手 js 也熟练些。
    pursuer
        7
    pursuer  
       25 天前   ❤️ 1
    @zagfai 对应用本身影响不大,但是会额外损耗其他进程的 CPU 资源,能用回调唤醒自然优先回调唤醒
    Tdy95
        8
    Tdy95  
    OP
       25 天前
    @fds node 对接 dll 和 so 的能力还是太弱了,python 这块方便很多。Python 之前么咋用过,全靠 AI 磕磕碰碰写出来的代码。结果被异步搞自闭了 T T
    fds
        9
    fds  
       25 天前
    @Tdy95 哦 这样呀。其实就像前面一些回复说的把 dll 调用扔到个线程就行。我也不太熟悉,问了下 gpt ,给出的代码是

    import asyncio
    from concurrent.futures import ThreadPoolExecutor

    executor = ThreadPoolExecutor(max_workers=4)

    def dll_call(client_id):
    # 同步调用 DLL 函数
    sdk.SDK_change_something(c_char_p(client_id.encode('utf-8')), callback_instance)
    # 假设这里返回结果
    return "result from dll"

    async def change_something(client_id):
    loop = asyncio.get_running_loop()
    # 在后台线程中执行 DLL 调用,避免阻塞事件循环
    result = await loop.run_in_executor(executor, dll_call, client_id)
    return result

    确实就跟前面几楼的回复一样呢,也算是挺清晰的。
    sujin190
        10
    sujin190  
       25 天前
    这还不简单,asyncio 的运行线程和 dll 运行线程不能是同一个啊,否则一个线程同一个时刻只能干一件事情,asyncio await 检测到有 io 阻塞会切换 python 栈帧,但是如果你是重 cpu 一直在计算 await 也会阻塞的
    LinePro
        11
    LinePro  
       25 天前
    问一下,印象中 Future 是可以直接 await 等待结果的。那 while not future.done() 这种轮询式的写法意义何在?
    Tdy95
        12
    Tdy95  
    OP
       25 天前
    @LinePro 我是发现在调用 dll 后,5s 后返回结果。 但是再等 5s 后才会执行 await 后的代码。我是让 AI 给出的 while not future.done() 代码
    Tdy95
        13
    Tdy95  
    OP
       25 天前
    @fds 谢谢老哥,我去试试看
    Tdy95
        14
    Tdy95  
    OP
       22 天前
    @pursuer 我已经修改了,发现 future 一直没变化。
    ```
    def callback_speak_by_audio(code, status, frame_id, client_id):
    client_id = client_id.decode('utf-8') # 解码客户端 ID
    status = json.loads(status.decode('utf-8')) # 假设 status 是 UTF-8 编码的字符串
    print(f"callback_speak_by_audio=====run, {client_id in futures_speak_by_audio_stream}")
    if status["data"]["FrameId"] == -1:
    if code == MSDKStatus.MSDK_SUCCESS_SPEAK_BY_AUDIO_FINISH.value:
    print(f"语音说话: {code}, 客户端 ID: {client_id}")
    asyncio.run_coroutine_threadsafe(set_futures_status_async(client_id, futures_speak_by_audio_stream, {"code": 204, "status": status, "name": "speak_by_audio", "success": True, "client_id": client_id}), websocketAll[client_id]['main_loop'])


    async def set_futures_status_async(client_id, futures_obj, data):
    if client_id in futures_obj:
    future = futures_obj.pop(client_id)
    if not future.done():
    future.set_result(data)


    ```

    外部调用形式为:
    ```
    if connected["audio_future"] is None:
    feature = asyncio.Future()
    connected["audio_future"] = feature
    print("Creating sendAudioEndData task")
    asyncio.create_task(sendAudioEndData(connected, feature))

    async def sendAudioEndData(connected, feature):
    print("发送音频结束数据")
    try:
    result = await feature # 等待 feature 完成
    print(f"发送音频结束数据: {result}")
    await connected['websocket'].send(json.dumps(result))
    except Exception as e:
    print(f"发送数据时发生错误: {e}")
    finally:
    connected["audio_future"] = None # 确保音频 future 被正确重置

    ```

    麻烦大佬再指点一下,是哪里不对呢?从日志结果看,“语音说话:”的日志已经执行。 但是后续的 sendAudioEndData 的 print(f"发送音频结束数据: {result}")没有执行。
    Tdy95
        15
    Tdy95  
    OP
       22 天前
    @pursuer 代码乱码了。我贴个图
    [Imgur]( https://imgur.com/Y46UeJ4)


    [Imgur]( https://imgur.com/2sNE2LK)
    pursuer
        16
    pursuer  
       22 天前
    @Tdy95 不知道你的 websocketAll[client_id]['main_loop']是哪里来的,future 正常唤醒是要求你的 future 创建和 set_result 在同一个线程/event_loop 中执行,要看着两个地方。
    Tdy95
        17
    Tdy95  
    OP
       22 天前
    @pursuer 我是直接在 message_handle.py 文件创建了一个全局变量。main_loop = asyncio.get_event_loop()
    pursuer
        18
    pursuer  
       22 天前
    @Tdy95 我看不到你怎么赋值 websocketAll[client_id]['main_loop']。也没法判断 futures_obj[client_id] 也不知道是不是和 connected["audio_future"]是同一个对象。或者在我上面说的两个地方打印下 get_running_loop() 和 future 看看是不是一致的
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2153 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 00:49 · PVG 08:49 · LAX 17:49 · JFK 20:49
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.