V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
WindyXu
V2EX  ›  Python

如何实现一个实时转发 Stream 流的接口

  •  
  •   WindyXu · 2023-03-08 17:26:57 +08:00 · 1603 次点击
    这是一个创建于 619 天前的主题,其中的信息可能已经有所发展或是发生改变。

    想写一个实时转发 Stream 流内容的接口,不怎么懂 stream 流相关的知识

    之前的接口是

    async def index(path: str, request: Request):
        data = await request.json()
        resp = c.send_request(path, data)
        headers = dict(resp.headers)
    
        async def generate():
            for chunk in resp.iter_lines():
                if chunk:
                    yield chunk + b'\n\n'
        return StreamingResponse(generate(), headers=headers)
    

    问题是,这个会等接受完全部的 Stream 流信息,才会转发,想实现的效果是实时转发 stream 流信息,具体一点就是,接收到一行或者指定大小的信息块,就转发出去。

    想法是将 resp = c.send_request(path, data)这块改造成异步的,然后实时转发。

    目前的代码是

    async def index(path: str, request: Request):
        data = await request.json()
        async with httpx.AsyncClient() as client:
            async with client.stream('POST', path, json=data) as response:
                resp_headers = dict(response.headers)
                async def generate():
                    async for line in response.aiter_lines():
                        if line:
                            yield line.encode('utf-8') + b'\n\n'
                return StreamingResponse(generate(), headers=resp_headers)
    

    但是会报:

    httpx.StreamClosed: Attempted to read or stream content, but the stream has been closed.

    不知道怎么改,求赐教

    5 条回复    2023-03-09 09:47:50 +08:00
    yuanxing008
        1
    yuanxing008  
       2023-03-08 17:45:14 +08:00
    这个为什么还要走代码。直接用 Nginx 转出去不就好了么 你是要有业务逻辑处理?
    noparking188
        2
    noparking188  
       2023-03-08 17:50:30 +08:00
    有过类似需求,需要对下载过程中的 stream 数据进行处理,搜到篇文章 [How to stream Microsoft SQL Server to S3 using BCP on linux]( https://dstan.medium.com/streaming-microsoft-sql-server-to-s3-using-bcp-35241967d2e0),原理就是借助 named pipe 来对不支持 pipe 的下载工具截取 stream 进行处理。

    基于这个原理我写了个迁移 SQL Server 到 S3 的小工具: https://github.com/zhiweio/StreamXfer

    楼主可以参考下,读于你描述的需求,需要一个进程实时写 stream 到 named pipe ,另一个进程实时读 stream 写到其它地方,类似:
    1. mkfifo /tmp/fifo
    2. wget 'https://www.stats.govt.nz/assets/Uploads/Annual-enterprise-survey/Annual-enterprise-survey-2021-financial-year-provisional/Download-data/annual-enterprise-survey-2021-financial-year-provisional-csv.csv' -o /tmp/fifo &
    3. cat /tmp/fifo | aws s3 cp - s3://bucket/xxx

    希望能够有帮助,如果可以的话欢迎给我一个 star ,谢谢 :)
    liprais
        3
    liprais  
       2023-03-08 17:51:52 +08:00
    找个用 splice api 的完事
    009694
        4
    009694  
       2023-03-08 18:55:38 +08:00 via iPhone
    不要用 async with 。这样 return 出去的时候连接就已经关了 。用 background 去处理 httpx 未关闭的连接
    ruanimal
        5
    ruanimal  
       2023-03-09 09:47:50 +08:00
    `data = await request.json()` 这里其实就取了全部数据了, 不存在实时转发吧
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2915 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 14:25 · PVG 22:25 · LAX 06:25 · JFK 09:25
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.