V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
182247236
V2EX  ›  Python

Django 中 Python 多线程连接数据问题请教

  •  
  •   182247236 · 2021-12-30 17:06:10 +08:00 · 2522 次点击
    这是一个创建于 1051 天前的主题,其中的信息可能已经有所发展或是发生改变。

    需求: 需要请求数据库中 1 个月的带宽数据。
    遇到问题: 我的思路是按天请求数据,一个月 30 天,for 语句循环 30 次连接数据库,每次时间大约是 0.6 秒左右,30 次大概是 20 秒,可以完成查询。但毕竟是展示数据,感觉 20 秒时间太长了,想要缩短到 10 秒内。
    我的解决方式: 想利用 python 的多线程( threading ),如果能同时请求 5 天的数据那么速度就能降到 5 秒内。
    我遇到的问题: 使用的多线程,但并没有使查询时间缩短。
    我是用的框架:Django
    我的视图函数代码(代码有点多,但啥好办法了,逻辑是没问题的,跑出结果 20 秒,和 for 没区别):

    def cdn_detail(request):

    """
    获取域名带宽
    """
    company_id = request.GET['company_id']
    domain_id = request.GET['domain_id'].split(",")
    domain = request.GET['domain_id']
    daterange = request.GET['daterange'].split(' ~ ')
    """
    filter(xx__in = list)
    """
    
    import datetime
    t1=time.time() 
    def sql_execut(company_id,domain,date,backValue):
        with connection.cursor() as cursor:
            sql_select = f"SELECT timestrap, bps FROM cdn_bandwidth WHERE (company_id = {company_id} AND domain_id IN ({domain}) AND time >= '{date} 00:00:00' AND time <= '{date} 23:59:59')"
            cursor.execute(sql_select)
            row = cursor.fetchall()
            fr_row = pd.DataFrame(list(row), columns=['timestrap', 'bps'])
            backValue.put(fr_row)
    
    day_range = pd.date_range(start=daterange[0], end=daterange[1]).strftime("%Y-%m-%d").to_list()#创建日期范围 list
    import threading
    from queue import Queue
    threads =[]
    n = range(len(day_range))
    backValue = Queue()
    frame = pd.DataFrame(columns=['timestrap', 'bps'])#创建一个空 DataFrame
    for i in n:
        t=threading.Thread(target=sql_execut,args=(company_id,domain,day_range[i],backValue))
        t.start()
        threads.append(t)
    for i in threads:
        i.join()
    for _ in n:
        frame = frame.append(backValue.get()).groupby('timestrap')['bps'].sum().reset_index()
    frame_sum = frame.values.tolist()
    t2=time.time()
    print("相差",(datetime.datetime.fromtimestamp(t2)-datetime.datetime.fromtimestamp(t1)).seconds,"秒")
    point = int(len(frame_sum)/100*95)#95 值的点
    value_95 = sorted(frame_sum, key = lambda k:k[1])[point][1]#95 值
    context = {
        'datas': frame_sum,
        'value_95' : value_95
    }
    datas = json.dumps(context, ensure_ascii=False)
    return HttpResponse(datas, content_type="application/json")
    
    26 条回复    2022-03-18 17:43:06 +08:00
    Kinnice
        1
    Kinnice  
       2021-12-30 17:09:44 +08:00
    一次性从数据库把一个月的取出来,然后在程序里面去做按天的筛选。
    Kinnice
        2
    Kinnice  
       2021-12-30 17:15:47 +08:00
    分组统计也可
    nmzcbkof
        3
    nmzcbkof  
       2021-12-30 17:25:56 +08:00 via iPhone
    我最近看书看到说 由于 python 有一个 GIL 锁,导致多线程无法利用多核 ,得利用多进程
    182247236
        4
    182247236  
    OP
       2021-12-30 18:07:52 +08:00
    @Kinnice 一次性把月数据取出来这个动作就需要 120 秒了,所以没法这么做的
    182247236
        5
    182247236  
    OP
       2021-12-30 18:08:29 +08:00
    @Kinnice 我现在用 for 就是分批次获取数据,但是 20 秒还是没法接收。
    Marinata
        6
    Marinata  
       2021-12-30 18:17:08 +08:00
    小白理解,欢迎大佬纠正:Python 伪多线程,上进程池吧,把单次请求+处理封装成函数
    Kinnice
        7
    Kinnice  
       2021-12-30 18:31:35 +08:00 via Android
    @182247236 分组统计指的是使用 mysql 本身的 group by
    RRRoger
        8
    RRRoger  
       2021-12-30 18:46:28 +08:00
    多线程的并发在 Python 中就是一个美丽的梦。 -- 廖雪峰
    shyrock
        9
    shyrock  
       2021-12-30 18:51:21 +08:00
    既然是展示数据,对实时性要求应该不高。
    最简单的做法就是每个小时把数据提取出来计算好,结果放到 redis ,展示页面直接用 redis 数据就行了。

    回到你说的 thread 问题,如果耗时一样,很像是多个 thread 互相阻塞了,比如说 cursor 是不是有锁,多个 thread 只有一个能得到 cursor 执行,实际上串行化了。

    你可以打一些日志看看,多个 thread 的时序,到底卡在哪里了。
    lybcyd
        10
    lybcyd  
       2021-12-30 18:54:56 +08:00
    @182247236 多大的数据量?一条 SQL 居然需要这么久,我觉得应该考虑一下几个问题

    1. 优化查询的性能,即便是一天的数据也要 600ms ,一个月就要 120s ,这个应该是有不小的优化空间的
    2. 一个月数据的结果集有多大,是不是有必要取出这么多数据?粗看代码就是分析一下,做 group by sum 之类的统计,重新思考一下业务逻辑,看看有哪些可以优化的点,可不可以直接使用数据库查询来解决
    3.你这个拼接 SQL 的方式是有 SQL 注入风险的,要使用参数化查询
    meiyoumingzi6
        11
    meiyoumingzi6  
       2021-12-30 19:16:24 +08:00
    数据量大吗. 如果 30 天一次查询很大的话,
    1.可以尝试 每两天一次?
    2.pd.DataFrame 这行可以单独写到一个线程 /进程里面(之前就遇见过 sql 还是很快执行完,但是 pandas 处理时间要久一些的)
    3. sql 看看能不能优化一下, explain 看看
    4. 还有是否能避免使用 pandas ?
    noparking188
        12
    noparking188  
       2021-12-30 22:52:43 +08:00
    先分析 SQL 相关,表结构怎么设计的,数据量多大
    然后再告诉大家想计算啥,这样就可以告诉你怎么写更好了
    neoblackcap
        13
    neoblackcap  
       2021-12-30 23:21:45 +08:00
    据我了解,Django 是一个请求对应一个数据库连接,你这边的数据库多线程查询是如何连接数据库的?是自己重新创建连接了吗?
    chuanqirenwu
        14
    chuanqirenwu  
       2021-12-31 00:51:21 +08:00
    瓶颈不在并发数吧,先看数据量多大,优化 SQL 查询比较好。
    huazhaozhe
        15
    huazhaozhe  
       2021-12-31 08:10:41 +08:00 via Android
    这个我见过,数据量大又要实时的话
    先是数据库优化,查询语句优化,甚至需要的数据单独建表优化
    另一个每天一个定时任务跑之前的数据,按照年度月付日分别统计,所以只需要查当天的数据再加之前已经统计好的数据就可以了
    Anivial
        16
    Anivial  
       2021-12-31 09:23:11 +08:00
    楼上正解,如果你获取一天的数据都需要很长时间,那如果要一个月统计数据最好单独建立一张统计表,后台定时维护数据。 然后说一句,你这代码真不怕 sql 注入吗?
    julyclyde
        17
    julyclyde  
       2021-12-31 14:21:19 +08:00
    cdn 带宽一般是五分钟采样一次?一个月也就才 8640 个数据啊?
    即使一分钟一次,也就才 4 万多条

    要不你先看看数据库性能有没有毛病?
    julyclyde
        18
    julyclyde  
       2021-12-31 14:21:53 +08:00
    还有,你这个 sql 有注入漏洞
    182247236
        19
    182247236  
    OP
       2022-01-02 22:58:58 +08:00
    @Kinnice 没有用 mysql 的 group by 但是测试过了这个过程我用 pandas 计算非常快了。
    182247236
        20
    182247236  
    OP
       2022-01-02 23:02:42 +08:00
    @shyrock 使用场景注定了用 redis 不行,数据库方面由于数据量真的太大了,所以我放弃了在这上面优化,多线程这块我再研究研究。
    182247236
        21
    182247236  
    OP
       2022-01-02 23:06:44 +08:00
    @lybcyd 一个月的数据大概有 80 万条左右,用 mysql 做 group by sum 真的估计更慢了,这步我用 pandas 处理其实挺好的了,这个业务是内部运维用的,SQL 注入这些因为我不是专业的,所以暂时先不理会吧
    182247236
        22
    182247236  
    OP
       2022-01-02 23:07:45 +08:00   ❤️ 1
    @meiyoumingzi6 我的处理结果是 SQL 慢了,pandas 还是很给力的。。。哭
    182247236
        23
    182247236  
    OP
       2022-01-02 23:10:12 +08:00
    @neoblackcap 我的做法是写多线程,每个线程都请求数据库,但你说的这个很有可能是我遇到的问题,我节后好好看看
    182247236
        24
    182247236  
    OP
       2022-01-02 23:12:53 +08:00
    @Anivial 这个方式最近有想过,但是没有专业学过数据库,又着急,所以往后可能会考虑分表,SQL 注入这个超纲了。。。
    182247236
        25
    182247236  
    OP
       2022-01-02 23:17:35 +08:00
    @julyclyde 我这边有 100 多个域名,一个月的数据就是 80 多万条了,查起来确实太慢了。所以想用 python 的多线程解决。。。SQL 注入漏洞的问题后面再解决吧。。
    patrickpu
        26
    patrickpu  
       2022-03-18 17:43:06 +08:00
    性能慢主要慢在两个方面,一个是 python 的 for 循环,一个是获取 sql 查询数据。
    pymysql 是纯 python 的,而 python 的 for 循环性能是很低的,低的惨不忍听,数据量不大还好,当你一次要处理+10w 数据的时候性能就很感人了,最好的方法是用 cython 把.py 转成动态链接库.so 的形式,会有明显的加速效果。
    当你查询的数据量大了后,pymysql 等 python 客户端执行的结果集获取是通过游标分块获取的,也就是说查询 1w 的数据,数据库往返请求可能会有 100 次,这些请求都是串行的是透明的,优化的方向可以考虑通过多线程并发,按 id 分块读,同时指定 limit
    见 pymysql 的 cursor_iter:
    def cursor_iter(cursor, sentinel, col_count, itersize):
    """
    Yield blocks of rows from a cursor and ensure the cursor is closed when
    done.
    """
    try:
    for rows in iter((lambda: cursor.fetchmany(itersize)), sentinel):
    yield rows if col_count is None else [r[:col_count] for r in rows]
    finally:
    cursor.close()
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2874 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 06:31 · PVG 14:31 · LAX 22:31 · JFK 01:31
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.