最近,在看 python 的异步编程( asyncio )部分,在使用 aiomysql 的时候遇到了困难,已经困惑我两三天了。可能是自己资质愚钝,看了 aiomysql 的官网例子( https://github.com/aio-libs/aiomysql/tree/master/examples ),我还是没能弄懂怎么才能多个数据库操作中复用 pool。下面是我的代码
import aiomysql import asyncio async def select(loop, sql): pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='123456', db='test', loop=loop) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql) r = await cur.fetchone() print(r) async def insert(loop, sql): pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='123456', db='test', loop=loop) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql) await conn.commit() async def main(loop): c1 = select(loop=loop, sql='select * from minifw') c2 = insert(loop=loop, sql="insert into minifw (name) values ('hello')") tasks = [ asyncio.ensure_future(c1), asyncio.ensure_future(c2) ] return await asyncio.gather(*tasks) if __name__ == '__main__': cur_loop = asyncio.get_event_loop() cur_loop.run_until_complete(main(cur_loop))
上面这样做的话,每次做数据库操作的时候,应该都会执行一次 create_pool
这个操作。现在我的问题是应该怎么改上面的代码,让连接池可以复用啊?
以前没怎么接触异步编程,希望大家能解答一下我的疑惑,感谢
1
qs 2017-06-11 18:46:22 +08:00 1
pool 放到全局变量里 创建前先判断下 pool 是否可用 具体可以看看单例的写法
|
2
resolvewang OP @qs 能否改一下给我看看,我用全局变量试过,应该是我能力不够吧,改过后调试不通。感谢
|
3
messense 2017-06-11 21:18:32 +08:00 1
把 pool 放到 main 里面?
import aiomysql import asyncio async def select(pool, sql): async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql) r = await cur.fetchone() print(r) async def insert(pool, sql): async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql) await conn.commit() async def main(loop): pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='123456', db='test', loop=loop) c1 = select(pool, sql='select * from minifw') c2 = insert(pool, sql="insert into minifw (name) values ('hello')") tasks = [ asyncio.ensure_future(c1), asyncio.ensure_future(c2) ] return await asyncio.gather(*tasks) if __name__ == '__main__': cur_loop = asyncio.get_event_loop() cur_loop.run_until_complete(main(cur_loop)) |
4
resolvewang OP @messense 感谢,这样确实是可以的。我咋就没想到。
|
5
resolvewang OP @messense 大神,是否还有别的方法,可以把 pool 从 main() 中抽出来,这样写我又遇到问题了,比如我要写单元测试用例,应该怎么写啊,我写的运行使用不行
<pre> import asyncio import unittest import aiomysql from minifw.db import base_db class TestDB(unittest.TestCase): def setUp(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) self.pool = aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='123456', db='test', loop=self.loop) def test_select(self): sql = 'select * from minifw where id = (%s)' rs = self.loop.run_until_complete(base_db.select(self.pool, sql, args=(1,), size=1)) self.assertEqual(len(rs), 1) def test_insert(self): sql = 'insert into `minifw` (`name`) values (?)' rs = self.loop.run_until_complete(base_db.insert(self.pool, sql, args=('test_val',))) self.assertEqual(rs, 1) def tearDown(self): self.loop.close() del self.loop if __name__ == '__main__': unittest.main() </pre> 这里的问题就是这个 pool 我不知道咋处理,如果我要运行 unnittest.main(),就会报错 `AttributeError: '_PoolContextManager' object has no attribute 'acquire'` |
6
messense 2017-06-11 23:40:15 +08:00 1
aiomysql.create_pool 需要 await 吧?没有 await 它返回的是个 coroutine,试试:
self.pool = self.loop.run_until_complete(aiomysql.create_pool(...)) |
7
resolvewang OP @messense 感谢您的耐心回复!通过你的回复,我对 asyncio 的理解进了一步,前两天看了差不多一天的文档,都还是有点懵。现在还有两个问题,我还是不是很懂,希望您能再帮忙解一下惑。
` async def select(pool, sql, args=(), size=None): async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql.replace('?', '%s'), args) if size: r = await cur.fetchmany(size) else: r = await cur.fetchall() return r ` 我在做关于这段代码的单元测试的时候,虽然执行正确,但是有一个警告,` ResourceWarning: Unclosed connection <aiomysql.connection.Connection object at 0x1030d1710> ResourceWarning)` 大概意思就是数据库连接没有关闭吧。但是这里用了上下文管理器啊,它不应该会关闭连接吗?我如果在 return 语句之前,加上 conn.close(), 就不会报这个警告了。 ` def test_select(self): sql = 'select * from minifw where id = (%s)' rs = self.loop.run_until_complete(base_db.select(self.pool, sql, args=(1,), size=1)) self.assertEqual(len(rs), 1) ` 这段代码是测试 select。 另外还有一个问题,我想问问 self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) 和 self.loop = asyncio.get_event_loop() 的区别,因为在我执行 unnittest.main()的时候,用前者就可以执行,用后者就会报错,`Event loop is closed`,如果是测试单个数据库操作,后者就不会报错。 期望您的回复,感恩 |
8
messense 2017-06-12 12:08:54 +08:00 1
按道理说会自动关闭连接,不清楚 aiomysql 的具体实现细节。
ef tearDown(self): self.loop.close() del self.loop 你 tearDown 里面把 loop close 了。。。 |
9
messense 2017-06-12 12:10:19 +08:00
Python 3.6 以后已经不推荐传递 event loop 了,需要用到 event loop 的时候调用 asyncio.get_event_loop() 就好了。
|
10
resolvewang OP @messense 我一直以为 teardown 和 setup 在 unnittest.main()中只会运行一遍。。。
event loop 在程序的整个生命流程中,如果被 close 了,那么就不能使用 asyncio.get_event_loop()获取了吗?根据上面的代码,感觉是这样的。那这也就是说,我们只能在程序不用 event loop 的时候再关吗? 感觉自己对 event loop 模型还不是特别清楚。 |
11
messense 2017-06-12 15:16:26 +08:00
一般来说不需要手动 close event loop
|
12
resolvewang OP @messense 好的,明白了,感谢
|