# coding=utf-8
import time
import pymysql
import MySQLdb
import AnalyFunc
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, wait, ALL_COMPLETED, as_completed
from dbutils.pooled_db import PooledDB
GloSQLQueueBreakFlag = 1 # 处理队列退出判断信号
# 处理 SQL 队列
def procSQLcmd(sqlqueue):
import time
from dbutils.pooled_db import PooledDB
import pymysql
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=80, # 连接池允许的最大连接数,0 和 None 表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建
maxcached=5, # 链接池中最多闲置的链接,0 和 None 不限制
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待; False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None 表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=0, # ping MySQL 服务端,检查是否服务可用。
host='192.168.89.48',
port=3306,
user='root',
password='root123',
database='eee',
charset='utf8'
)
while True:
while not sqlqueue.empty():
print(GloSQLQueueBreakFlag:', str(GloSQLQueueBreakFlag))
sqlTask = sqlqueue.get()
DBconn = POOL.connection()
cur = DBconn.cursor()
print("sqlTask:",sqlTask)
ses = cur.execute(sqlTask)
cur.close() # or del cur
DBconn.close() # or del db
time.sleep(0.5)
if GloSQLQueueBreakFlag == 0:
break
else:
time.sleep(1)
return
if __name__ == '__main__':
from concurrent import futures
from multiprocessing import Manager
from teFunc import TranDicttoSQLcmd
SQLQueue = Manager().Queue()
ProcessSQLQueue = futures.ProcessPoolExecutor(max_workers=1)
ProcessSQLQueueRet = ProcessSQLQueue.submit(procSQLcmd,SQLQueue)
# 导入测试数据,成为字典列表
teList = eval(AnalyFunc.ReadFiletoStr('h:/testdict.dict'))
for i in teList:
print(i)
TranDicttoSQLcmd('testSheet', i, SQLQueue) # 把字典转换成 MySQL 的 INSERT 语句,同时把语句作为任务交到全局队列,交由独立进程的 procSQLcmd 函数去处理
# 最后确保队列全部弄完,才完全退出整个程序
waitSQLQueue = True
while waitSQLQueue == True:
time.sleep(0.5)
SQLQueueCount = SQLQueue.qsize()
print(f'SQLQueue 队列还有:{SQLQueueCount} 未处理完.')
if SQLQueueCount == 0:
GloSQLQueueBreakFlag = 0
waitSQLQueue = False
# 将字典转换成 SQL 语句
def TranDicttoSQLcmd(tblName,DictObj,SQLQueue,printSQL=False):
import time
# 组合字段
FiledStr = ''
ValueStr = ''
SQLText = ''
# 生成 INSERT 语句
SQLcmd = "INSERT INTO %s ({}) VALUE ({});" % tblName
# 单一字典
if isinstance(DictObj, dict):
FiledStr = ''
ValueStr = ''
for k, v in DictObj.items():
if v == None:
continue
FiledStr = FiledStr + "`%s`" % (k) + ','
ValueStr = ValueStr + "'%s'" % (str(v)) + ','
FiledStr = FiledStr[:-1]
ValueStr = ValueStr[:-1]
SQLText = SQLcmd.format(FiledStr, ValueStr)
if printSQL:
print('TranDicttoSQLcmd:',SQLText)
if SQLQueue:
SQLQueue.put(SQLText)
return SQLText
# 字典列表
if isinstance(DictObj, list):
kvDict = {}
ccount = 0
for i in DictObj:
FiledStr = ''
ValueStr = ''
ccount += 1
for k,v in i.items():
if v == None:
continue
FiledStr = FiledStr + "`%s`" % (k) + ','
ValueStr = ValueStr + "'%s'" % (str(v)) + ','
FiledStr = FiledStr[:-1]
ValueStr = ValueStr[:-1]
SQLText = SQLcmd.format(FiledStr, ValueStr)
if printSQL:
print('TranDicttoSQLcmd:',SQLText)
if SQLQueue:
SQLQueue.put(SQLText)
return
运行过程:
SQLQueue 队列还有:538
GloSQLQueueBreakFlag: 1
sqlTask: INSERT INTO testSheet (`tename`,`amount`,`weight`) VALUE ('椰子','218','72170');
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
#一直刷下去重复
检查 testSheet 表,一个椰子内容被插入,处理 SQL 语句队列的进程函数工作不正常是什么原因呢?
1
abucus 2020-11-06 02:51:44 +08:00
尝试把 `procSQLcmd` 方法里的 `break` 语句注释掉看看?
|