V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
asuraa
V2EX  ›  Python

求助 Python 异步多线程下载又拍云图片问题

  •  3
     
  •   asuraa · 2017-09-03 21:10:06 +08:00 · 4654 次点击
    这是一个创建于 2666 天前的主题,其中的信息可能已经有所发展或是发生改变。

    代码

    import asyncio
    import base64
    import os
    import urllib
    
    import aiohttp
    
    # -----------------------
    # -----------------------
    bucket = 'xxx'
    username = 'xxx'
    password = 'xxxxxx'
    hostname = "xxxxxx"
    base_save_path = 'f:'
    # -----------------------
    
    headers = {}
    auth = base64.b64encode(f'{username}:{password}'.encode(encoding='utf-8'))
    headers['Authorization'] = 'Basic ' + str(auth)
    headers['User-Agent'] = "UPYUN_DOWNLOAD_SCRIPT"
    headers['x-list-limit'] = '300'
    
    thread_sleep = 1
    
    
    def is_dic(url):
        """判断 key 是否是目录 根据是否有后缀名判断"""
        url = url.replace('http://v0.api.upyun.com/', '')
        # print(len(url.split('.')))
        if len(url.split('.')) == 1:
            return True
        else:
            return False
    
    
    class Crawler:
        def __init__(self, init_key, hostname, max_tasks=10, pic_tsak=50):
            '''初始化爬虫'''
            self.loop = asyncio.get_event_loop()
            self.max_tries = 4 # 每个图片重试次数
            self.max_tasks = max_tasks # 接口请求进程数
            self.key_queue = asyncio.Queue(loop=self.loop) # 接口队列
            self.pic_queue = asyncio.Queue(loop=self.loop) # 图片队列
            self.session = aiohttp.ClientSession(loop=self.loop) #接口异步 http 请求
            self.pic_session = aiohttp.ClientSession(loop=self.loop) #图片异步 http 请求
            self.key_queue.put_nowait({'key': init_key, 'x-list-iter': None, 'hostname': hostname}) #初始化接口队列 push 需要下载的目录
            self.pic_tsak = pic_tsak #图片下载进程数(接口有调用频率限制,http 下载没有限制)
    
        def close(self):
            """回收 http session"""
            self.session.close()
            self.pic_session.close()
    
        async def work(self):
            """接口请求队列消费者"""
            try:
                while True:
                    url = await self.key_queue.get()
                    # print('key 队列数量:' + await self.key_queue.qsize())
                    await self.handle(url)
                    self.key_queue.task_done()
                    await asyncio.sleep(1)
            except asyncio.CancelledError:
                pass
    
        async def work_pic(self):
            """图片请求队列消费者"""
            try:
                while True:
                    url = await self.pic_queue.get()
                    await self.handle_pic(url)
                    self.pic_queue.task_done()
                    await asyncio.sleep(1)
            except asyncio.CancelledError:
                pass
    
        async def handle_pic(self, key):
            """处理图片请求"""
            url = (lambda x: x[0] == '/' and x or '/' + x)(key['key'])
            url = url.encode('utf-8')
            url = urllib.parse.quote(url)
    
            pic_url = key['hostname'] + url + '!s400'
    
            tries = 0
            while tries < self.max_tries:
                try:
                    response = await self.pic_session.get(pic_url)
                    break
                except aiohttp.ClientError:
                    pass
                tries += 1
            try:
                if is_dic(url):
                    # print('图片线程-目录 :{}'.format(url))
                    content = await response.text()
                    try:
                        iter_header = response.headers.get('x-upyun-list-iter')
                    except Exception as e:
                        iter_header = 'g2gCZAAEbmV4dGQAA2VvZg'
    
                    list_json_param = content + "`" + str(response.status) + "`" + str(iter_header)
                    self.do_file(self.get_list(list_json_param), key['key'], key['hostname'])
                else:
                    # print('图片线程-文件:{}'.format(key['save_path']))
                    with open(key['save_path'], 'wb') as f:
                        f.write(await response.read())
            finally:
                await response.release()
    
        async def handle(self, key):
    
            """处理接口请求"""
            url = '/' + bucket + (lambda x: x[0] == '/' and x or '/' + x)(key['key'])
            url = url.encode('utf-8')
            url = urllib.parse.quote(url)
    
            if key['x-list-iter'] is not None:
                if key['x-list-iter'] is not None or not 'g2gCZAAEbmV4dGQAA2VvZg':
                    headers['X-List-Iter'] = key['x-list-iter']
    
            tries = 0
            while tries < self.max_tries:
                try:
                    response = await self.session.get("http://v0.api.upyun.com" + url, headers=headers)
                    break
                except aiohttp.ClientError:
                    pass
                tries += 1
            try:
                if is_dic(url):
                    # print('目录线程-目录 :{}'.format(url))
                    content = await response.text()
                    try:
                        iter_header = response.headers.get('x-upyun-list-iter')
                    except Exception as e:
                        iter_header = 'g2gCZAAEbmV4dGQAA2VvZg'
    
                    list_json_param = content + "`" + str(response.status) + "`" + str(iter_header)
                    self.do_file(self.get_list(list_json_param), key['key'], key['hostname'])
                else:
                    # print('目录线程-文件:{}'.format(key['save_path']))
                    with open(key['save_path'], 'wb') as f:
                        f.write(await response.read())
            finally:
                await response.release()
    
        def get_list(self, content):
            # print(content)
            if content:
                content = content.split("`")
                items = content[0].split('\n')
                content = [dict(zip(['name', 'type', 'size', 'time'], x.split('\t'))) for x in items] + content[1].split() + \
                          content[2].split()
                return content
            else:
                return None
    
        def do_file(self, list_json, key, hostname):
            """处理接口数据"""
            for i in list_json[:-2]:
                if not i['name']:
                    continue
                new_key = key + i['name'] if key == '/' else key + '/' + i['name']
                try:
                    if i['type'] == 'F':
                        self.key_queue.put_nowait({'key': new_key, 'x-list-iter': None, 'hostname': hostname})
                    else:
                        try:
                            if not os.path.exists(bucket + key):
                                os.makedirs(bucket + key)
                        except OSError as e:
                            print('新建文件夹错误:' + str(e))
                        save_path = base_save_path + '/' + bucket + new_key
                        if not os.path.isfile(save_path):
                            print(f'请求图片:', new_key)
                            self.pic_queue.put_nowait(
                                {'key': new_key, 'save_path': save_path, 'x-list-iter': None, 'hostname': hostname})
                        else:
                            print(f'文件已存在:{save_path}')
                except Exception as e:
                    print('下载文件错误!:' + str(e))
                    with open('download_err.txt', 'a') as f:
                        f.write(new_key + '\n')
            if list_json[-1] != 'g2gCZAAEbmV4dGQAA2VvZg':
                self.key_queue.put_nowait({'key': key, 'x-list-iter': list_json[-1], 'hostname': hostname})
                # self.key_queue.put_nowait({'key': key, 'x-list-iter': list_json[-1], 'hostname': hostname})
    
        async def run(self):
            """初始化任务进程"""
            workers = [asyncio.Task(self.work(), loop=self.loop)
                       for _ in range(self.max_tasks)]
    
            workers_pic = [asyncio.Task(self.work_pic(), loop=self.loop)
                           for _ in range(self.pic_tsak)]
    
            await self.key_queue.join()
            await self.pic_queue.join()
    
            workers.append(workers_pic)
            for w in workers:
                w.cancel()
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        crawler = Crawler('/', hostname, max_tasks=5, pic_tsak=150)
        loop.run_until_complete(crawler.run())
    
        crawler.close()
    
        loop.close()
    
    

    上面是代码

    问题

    1. 以上代码执行后没什么问题。但是当长时间执行后会卡主。。百思不得其解(猜测可能是队列问题?但是无法验证)。为何到达一定时间(大约 5 小时以上)脚本会卡死?
    2. 此脚本目的为了下载又拍云所有图片保存到本地。图片量非常大(大约 10T) 3 亿张左右。目前机器的下载宽带大概在 300M/下载速度大约 30M/S ,多次联系又拍云,又拍云表示只能这样下载。无法通过邮寄硬盘直接拷贝。我们也在杭州。但是又拍云无法拷贝 还有什么特殊方法可以快速下载所有图片?
    第 1 条附言  ·  2017-09-05 05:33:00 +08:00

    说下最终解决方案吧

    1. 使用 mq 替代自带的队列,这样也方便断点续传,还有个原因是其中一个 bucket 图片大约有 7T 而我们购买的硬盘单个只有 4T 又不能做磁盘阵列,所以需要两个同时下而且不重复,于是乎就改造成了分布式的了
    2. 拆分生产者和消费者,其中获得目录的服务既是消费者又是生产者,目前按照 24 楼仁兄的几个细节建议,改进了代码另外加了多处的异常判断和处理,目前非常健壮和稳定

    速度:单机跑 100 个线程 两台机器一起跑,不加延时大约每秒 400 张,但是带宽会爆满,然后 mq 会断掉,所以加了延时,每个线程每次下载后加了 0.05 秒的延时后 下载速度大约每秒 300 到 360 张浮动,带宽占用百分之 80 到 95 浮动。目前很稳定。。

    感谢各位

    第 2 条附言  ·  2017-09-11 09:44:12 +08:00

    再次补充下。最后的代码在这里

    python异步多线程超高性能爬虫爬取又拍云图片

    第 3 条附言  ·  2017-09-23 12:22:12 +08:00
    33 条回复    2017-09-08 01:42:07 +08:00
    asuraa
        1
    asuraa  
    OP
       2017-09-03 21:12:28 +08:00
    asuraa
        2
    asuraa  
    OP
       2017-09-03 21:16:17 +08:00
    昂 这么硬的问题木有人给解答下吗
    asuraa
        3
    asuraa  
    OP
       2017-09-03 21:19:26 +08:00
    难道是保存文件的时候引起的?
    asuraa
        4
    asuraa  
    OP
       2017-09-03 21:19:39 +08:00
    在线等。。。。急
    asuraa
        5
    asuraa  
    OP
       2017-09-03 21:20:02 +08:00
    asuraa
        6
    asuraa  
    OP
       2017-09-03 21:23:04 +08:00
    更正:
    ```python
    def is_dic(url):
    """判断 key 是否是目录 根据是否有后缀名判断"""
    url = url.replace('http://v0.api.upyun.com/', '')
    # print(len(url.split('.')))
    if len(url.split('.')) > 1:
    return True
    else:
    return False
    ```
    asuraa
        7
    asuraa  
    OP
       2017-09-03 21:29:02 +08:00
    擦。。。看错了 帖子主题的代码是正确的。。
    Lax
        8
    Lax  
       2017-09-03 21:35:24 +08:00   ❤️ 1
    脚本运行时,用 strace 看一下卡在哪里了

    strace -p <pid>
    asuraa
        9
    asuraa  
    OP
       2017-09-03 21:36:14 +08:00
    @Lax windows 下有这个玩意么
    asuraa
        10
    asuraa  
    OP
       2017-09-03 21:36:54 +08:00
    @Lax 脚本是跑在 windows 下的。。因为硬盘要用 ntfs 格式给某部门送过去
    Lax
        11
    Lax  
       2017-09-03 21:40:34 +08:00
    另外一个比较值得怀疑的一点是,你的所有文件操作没有关闭,有可能用尽 open files 限制。
    可以对比一下这三个值:
    用户限制:ulimit -n
    进程限制:cat /proc/<pid>/limits
    实际使用:ls /proc/<pid>/fd | wc -l
    Lax
        12
    Lax  
       2017-09-03 21:41:11 +08:00
    当我没说。。
    Lax
        13
    Lax  
       2017-09-03 21:42:13 +08:00
    cygwin / mingw 之类的可能有 strace
    asuraa
        14
    asuraa  
    OP
       2017-09-03 21:43:51 +08:00
    @Lax 这里使用了 with 语句,应该能保证 with 语句执行完毕后已经关闭了打开的文件句柄。应该不是这个问题呀。
    mengskysama
        15
    mengskysama  
       2017-09-03 21:53:26 +08:00 via iPhone   ❤️ 1
    加个 timeout 试试,要等 tcp 的 timeout 机制触发要好久的
    asuraa
        16
    asuraa  
    OP
       2017-09-03 21:55:42 +08:00
    @mengskysama 是在每次 http 请求的时候加的吗?
    asuraa
        17
    asuraa  
    OP
       2017-09-03 21:58:30 +08:00
    @mengskysama 但是是异步的啊 应该会等待请求完成的啊
    mengskysama
        18
    mengskysama  
       2017-09-03 21:58:52 +08:00 via iPhone
    asuraa
        19
    asuraa  
    OP
       2017-09-03 21:59:56 +08:00
    @mengskysama 对于单个进程而言 会等待的啊
    mengskysama
        20
    mengskysama  
       2017-09-03 22:01:56 +08:00 via iPhone
    @luodaoyi 一个可能是池子里面 task 全塞死了,
    asuraa
        21
    asuraa  
    OP
       2017-09-03 22:18:22 +08:00
    @mengskysama http 请求引起的卡死吗?
    asuraa
        22
    asuraa  
    OP
       2017-09-03 23:02:01 +08:00
    我觉得好像找到问题了。当 asyncio 队列满了之后 会阻塞线程。但是我这里用的 put_nowait

    http://python.usyiyi.cn/translate/python_352/library/asyncio-queue.html
    put_nowait(item)
    将项目放入队列而不阻塞。

    如果没有可用的空位,引发 QueueFull。
    NoAnyLove
        23
    NoAnyLove  
       2017-09-03 23:10:40 +08:00
    @luodaoyi 又没有设置 queue 的 max_size,怎么会 QueueFull
    NoAnyLove
        24
    NoAnyLove  
       2017-09-03 23:20:44 +08:00   ❤️ 1
    一些细节的东西:

    * 用一个 ClientSession 就好,或者多个 ClientSession 用同一个 TCPConnector
    * session.get 要加 timeout,我以前遇到过卡死过在请求上
    * response 可以用 async with 打开,可靠性和可读性都有提高
    * Windows 下要确认是不是有很多文件没有关闭,可以用 OpenedFilesView
    asuraa
        25
    asuraa  
    OP
       2017-09-03 23:25:12 +08:00
    另外写文件使用的 io 阻塞操作

    写文件使用 aiofiles 实现异步写操作

    async with aiofiles.open('download_err.txt', 'a') as f:
    await f.write(new_key + '\n')
    asuraa
        26
    asuraa  
    OP
       2017-09-03 23:25:28 +08:00
    跑跑看 有问题在此贴继续讨论
    asuraa
        27
    asuraa  
    OP
       2017-09-03 23:40:54 +08:00
    @NoAnyLove 感谢 我都试试看 谢谢
    asuraa
        28
    asuraa  
    OP
       2017-09-04 00:15:24 +08:00
    @NoAnyLove 按您说的几点
    1. ClientSession 已改为一个
    2. session.get timeout=60
    3. response 使用 async with 打开

    目前再跑 明日看看会不会还卡死
    ysc3839
        29
    ysc3839  
       2017-09-04 01:20:41 +08:00 via Android
    @luodaoyi Windows 的话,我只知道 Visual Studio 能直接附加调试 Python 代码,不知道是啥黑科技。
    mert472114271
        30
    mert472114271  
       2017-09-04 01:55:33 +08:00
    在怀疑的代码快前后加两个变量计数,程序加个中断信号控制器,打印计数的变量
    asuraa
        31
    asuraa  
    OP
       2017-09-04 12:02:22 +08:00
    @ysc3839 目前没卡死了 一台电脑因为队列太大内存爆了
    @mert472114271 目前没有卡死了 一直在运行
    asuraa
        32
    asuraa  
    OP
       2017-09-05 05:31:53 +08:00
    说下最终解决方案吧
    1. 使用 mq 替代自带的队列,这样也方便断点续传,还有个原因是其中一个 bucket 图片大约有 7T 而我们购买的硬盘单个只有 4T 又不能做磁盘阵列,所以需要两个同时下而且不重复,于是乎就改造成了分布式的了
    2. 拆分生产者和消费者,其中获得目录的服务既是消费者又是生产者,目前按照 24 楼仁兄的几个细节建议,改进了代码另外加了多处的异常判断和处理,目前非常健壮和稳定

    速度:单机跑 100 个线程 两台机器一起跑,不加延时大约每秒 400 张,但是带宽会爆满,然后 mq 会断掉,所以加了延时,每个线程每次下载后加了 0.05 秒的延时后 下载速度大约每秒 300 到 360 张浮动,带宽占用百分之 80 到 95 浮动。目前很稳定。。

    感谢各位
    xbtlin
        33
    xbtlin  
       2017-09-08 01:42:07 +08:00
    m
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2693 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 11:32 · PVG 19:32 · LAX 03:32 · JFK 06:32
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.