一个礼拜前粗略的学习了下 asyncio + aiohttp 实现异步爬虫,三天前为了联手写了一个 ins 下载爬虫。 爬虫思路草图:用文字描述把: 首先我参考了 aiohttp 官方的爬虫例子,官方爬虫例子在这:crawl.py,我的思路是这样的
1.获取 user id ; user id 是必须的,所以我把这个写成了一个方法在创建类的时候直接调用
2.获取页的数据;因为没发说是第几页,这个请求需要三个参数,一个是 user id,一个是一次获取到的数量,第三个是 end_cursor 用来获取下一页
3.解析数据;获取到的数据是 json 格式的,我需要获取两个东西,第一个是图片链接,第二个是 end_cursor,用来获取下一页
4.处理 url ;这个方法遍历 urls 调用 download 方法下载
5.下载;用到了 aiohttp 和 aiofiles,没有异常、下载完后我用 asyncio.Task(self.get_display_urls(end_cursor))回到了获取页数据的方法,以此循环,当然获取页数据的方法有判读 end_cursor 是否为空,直接 loop.stop()
为了实现抓取所有图片,我没有使用 run_until_complete,因为它只获取了一次就停了,我就是用的 run_forever
全部代码如下:
import aiohttp
import asyncio
import aiofiles
import aioredis
import re
import json
import os
import signal
import time
import logging
class Instagram(object):
def __init__(self, username, loop, depth=0, maxtasks=200):
"""
:param username: 用户名
:param loop:
:param depth: 下载页面数量
:param maxtasks: 最大并发限制
"""
self.down_tasks = set()
self.down_todo = set()
self.down_busy = set()
self.down_done = {}
self.loop = loop
self.sem = asyncio.Semaphore(maxtasks, loop=loop)
self.username = username
self.max_page = depth if depth >= 1 else -1
self.ROOT_URL = 'https://www.instagram.com/'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.131 Safari/537.36',
'Cookie': 'rur=ATN; mid=XM2K8QAEAAGy8fiEf1b2T05Pssas; fbm_124024574287414=base_domain=.instagram.com; fbsr_124024574287414=ns7o0TqnERhbPihnN390KYuDdI7xVM2vgUunMZT4URY.eyJjb2RlIjoiQVFESlVpaVhaSFNwWnBTZ2VGUE1nUGlfUXlsdElpRG9vOHJDdHB3Qm14Q25rNUx6YnJsNHdBX1JRVnowaDREU3J4ZzFGTWVHWHdlWFlhVGxuVi0yMk84ZXdlUVBNWTg5bVF6MFg5RG40b3psSEozTGk4WW40N1lPeFQzdE0yQUNJWkg5SWh1VmhpRHBoaXZ4ZXNMM3dhc2hMcHdQQ2RkSDZWR2FQMlR1QVM4V3U1SElGTERWaEpfYzl3akstem94TFl3QWRESE9wSjNwcDlhTjVhcXFBWGlWM0lfNTducGZ0cmpCWlFLd2xUZzlYZjBEbUlFdmR5RTBsMng3OEY0RkJ6Q1NtNWEzQ2RISTRYckVqNXB6LWVrYjRyNHRza05HOUhHUmZSaXAwS0hya1VqQ3l4T3YwNDBEU2txOHI4MGJvZG9GU3o4THFHelpSckZ4dldVMjNUWGhkZ2d6MTEzbHNfVnN5T1V5X01EUHZlSHVtUkQ5bXJ1V01ObGUxOFBuV2hvIiwidXNlcl9pZCI6IjEwMDAyNDA3NTU3MTE2NyIsImFsZ29yaXRobSI6IkhNQUMtU0hBMjU2IiwiaXNzdWVkX2F0IjoxNTU3MDQ0NTE0fQ; csrftoken=2JzdvnHL9iMuxbV7KiJcASk8RlKuYWAQ; shbid=2545; shbts=1557044558.2494695; ds_user_id=5561946202; sessionid=5561946202%3AwE5Vb00lI1bmIb%3A23; urlgen="{"2001:19f0:7001:1e1d:5400:1ff:fef7:67fd": 20473}:1hND0O:dQodCbp0SM_24vfenOyhBT-Curk"'
}
self.proxy = "http://localhost:8001"
t = asyncio.ensure_future(self.init(), loop=loop)
loop.run_until_complete(t)
async def run(self):
"""
:return:
"""
await self.init()
t = asyncio.ensure_future(self.addurls(), loop=self.loop)
while self.down_busy:
await asyncio.sleep(1, loop=self.loop)
await t
self.loop.close()
async def init(self):
"""
初始化必要参数:user id
:return:
"""
print('[init] 初始化参数...')
shared_data = await self.get_shared_data()
if not shared_data:
print('!!!!!!!')
exit(0)
self.user_id = re.findall('"logging_page_id":.?"profilePage_(.*?)"', shared_data)[0]
async def _http_request(self, url, **kwargs):
"""
http 请求
:param url: 请求链接
:param kwargs: 链接参数
:return: 网页 response
"""
params = dict()
if kwargs:
for k, v in kwargs.items():
params.update(v)
async with self.sem:
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, timeout=10, proxy=self.proxy, headers=self.headers,
params=params) as response:
html = (await response.read()).decode('utf-8', 'replace')
return html
except Exception as exc:
logging.warning("[_http_request] 异常: {}".format(exc))
async def get_shared_data(self):
"""
获取 shared data
:return:
"""
html = await self._http_request(self.ROOT_URL + self.username)
if html:
shared_data = html.split("window._sharedData = ")[1].split(";</script>")[0]
return shared_data
def get_ends_cursor(self, html):
"""
:param html:
:return:
"""
if html:
edge_media = json.loads(html)['data']['user']['edge_owner_to_timeline_media']
edges = edge_media['edges']
if edges:
end_cursor = edge_media['page_info']['end_cursor']
has_next_page = edge_media['page_info']['has_next_page']
if has_next_page:
return end_cursor
return ''
async def get_display_url(self, max=50, end_cursor=""):
"""
解析 display url
:param max: 单次获取图片总量
:param end_cursor: end_cursor 是获取下一页的参数
:return: 包含{max}数量的图片链接列表
"""
pic_params = {
'query_hash': 'f2405b236d85e8296cf30347c9f08c2a',
'variables': '{{"id":"{0}","first":{1},"after":"{2}"}}'.format(self.user_id, max, end_cursor),
}
pic_url = self.ROOT_URL + 'graphql/query/'
html = await self._http_request(pic_url, parms=pic_params)
if html:
edge_media = json.loads(html)['data']['user']['edge_owner_to_timeline_media']
edges = edge_media['edges']
if edges:
display_urls = []
for edge in edges:
display_urls.append(edge['node']['display_url'])
return display_urls, self.get_ends_cursor(html)
async def download(self, url):
"""
下载到本地
:param url:
:return:
"""
print('processing:', url)
# try:
# async with self.sem: //如果使用 Semaphore 会卡住。。。虽然不会报错
self.down_todo.remove(url)
self.down_busy.add(url)
path = './instagram/' + self.username
if not os.path.exists(path):
os.makedirs(path)
filename = path + '/' + url.split('?')[0].split('/')[-1]
print('start download:', url)
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, headers=self.headers, proxy=self.proxy) as resp:
if resp.status == 200:
f = await aiofiles.open(filename, 'wb')
await f.write(await resp.read())
await f.close()
await asyncio.Task(self.addurls(self.end_cursor))
resp.close()
self.down_done[url] = True
except Exception as exc:
logging.error('[download]下载异常,异常:{}\n 链接:{}'.format(repr(exc), url))
self.down_done[url] = False
self.down_busy.remove(url)
print(len(self.down_done), 'completed tasks,', len(self.down_tasks),
'still pending, todo', len(self.down_todo))
# 这个判断根本没有任何用,不会调用,直接卡住
if self.end_cursor is False:
print('下载完 la')
self.loop.close()
# except Exception as exc:
# logging.error('[download]下载异常,异常:{}\n 链接:{}'.format(repr(exc), url))
async def add_down_urls(self, urls):
print('[add_down_urls] 开始下载,数量:', len(urls))
async with asyncio.Semaphore()
for url in urls:
self.down_todo.add(url)
await self.sem.acquire()
task = asyncio.ensure_future(self.download(url), loop=self.loop)
task.add_done_callback(lambda t: self.sem.release())
task.add_done_callback(self.down_tasks.remove)
self.down_tasks.add(task)
async def addurls(self, end_cursor=""):
"""
:param end_cursor: 当前页面的标示 base64 加密,用于加载下一页,如果没有下一页改参数为 Fasle
:return:
"""
print("\n\n 开始获取下一页,end_cursor:", end_cursor)
display_urls, self.end_cursor = await self.get_display_url(end_cursor=end_cursor)
await self.add_down_urls(display_urls)
if not self.end_cursor:
return
'''
流程:
run() --> addurls() --> add_own_urls() --> download()
^ |
| |
<-------<-----<--------<-------<--
'''
if __name__ == '__main__':
start = time.time()
loop = asyncio.get_event_loop()
ins = Instagram('taeri__taeri', loop)
future = asyncio.ensure_future(ins.addurls(), loop=loop)
try:
loop.add_signal_handler(signal.SIGINT, loop.stop)
except RuntimeError:
pass
loop.run_forever()
# loop.run_until_complete(future)
# for i in future.result():
# print(">>>>", i)
# ins.main()
end = time.time()
print('耗时:', end - start)
我遇到的问题是不使用 Semaphore 的情况下一开始是疯狂下载,也的确是下载成功了,然后就直接卡住,也不停,就一直卡住(原谅我使用卡住这个词),希望能帮忙看一下错在哪,谢谢了
1
zckun OP 草图在这。。。为了找画图工具用了半个多小时 https://github.com/ZCKun/d/blob/master/a.png
|
2
zckun OP ins 是新号,所以 cookie 没去掉就算了
|
3
zckun OP 不能重新编辑么。。代码有些错误忘了删除,希望别介意
|
4
zckun OP emmmm
|
5
CSM 2019-05-09 14:15:46 +08:00 1
你好,研究了下你的代码,发现一个小问题
# 这个判断根本没有任何用,不会调用,直接卡住 if self.end_cursor is False: 这个是因为之前没有下一页的时候 end_cursor 是 '' 空字符串,而不是 False。 另外就是我觉得你的架构上有问题,这个问题是经典的生产者-消费者模型,请求并解析出图片链接作为生产者,然后启动多个消费者来下载这些链接就行了。我重构了一下你的代码,具体可见 https://gist.github.com/cshuaimin/4cf8d769b88e93fc805ceefb9af8c1f4 |
6
CSM 2019-05-09 14:25:25 +08:00
还有就是可以看到在 _http_request 方法里为每一个请求都生成了一个 ClientSession,这样太浪费了,建议只用一个 session。doc:
Session encapsulates a connection pool (connector instance) and supports keepalives by default. Unless you are connecting to a large, unknown number of different servers over the lifetime of your application, it is suggested you use a single session for the lifetime of your application to benefit from connection pooling. https://docs.aiohttp.org/en/stable/client_reference.html |
9
shawndev 2019-05-09 15:06:32 +08:00
GvR 有一个 500lines 项目你可以参考一下。另外楼上说的很对,这是典型的生产者消费者模式,另外似乎没有考虑去重、重定向和超时重试?
|