from concurrent import futures
from multiprocessing import Manager
# 此函数无限循环,消费掉 sqltextqueen
def procSQL(sqltextqueen):
while True
while not sqltextqueen:
inserttext = sqltextqueen.get()
mysqldb =xxxx
mysqldb.insert(inserttext)
#略
time.sleep(3)
if __name__ == '__main__':
MainSQLProcess = futures.ProcessPoolExecutor(max_workers=1)
# 全局 SQL 队列
manager = Manager()
SQLQueen = manager.queue
MainSQLProcessRet = MainSQLProcess.sumit(procSQL,SQLQueen)
################ another.py ##################
TaskProcess = {}
TaskProcessRet = {}
# 提交任务
TaskinfoA = {
'TYPE': 'CPS'
'countt': countt,
'Rget': stRget,
'DLoption': DLoption,
'DTO': mtinfo.get('DTO'),
'errFlag': errFlag
# 字典内容引用的一些值,有些是从函数外几层的函数传过来的,距离 main()已经好 N 层了
}
TaskProcess['A'] = futures.ProcessPoolExecutor(max_workers=1)
TaskProcess['B'] = futures.ProcessPoolExecutor(max_workers=1)
# 交给进程池
TaskRet['A'] = TaskProcess['A'].submit(ProcessSuit,TaskinfoA)
TaskRet['B'] = TaskProcess['B'].submit(ProcessSuit,TaskinfoB)
ProcessSuit 函数里,产生的一些 SQL 语句,希望能及时送到全局队列里去消费,而不是通过 concurrent.futures 的回调 函数一层一层地往回 main 送到才处理...
我尝试了把 SQLQueen 引用在
TaskinfoA = {
'TYPE': 'CPS'
'countt': countt,
'Rget': stRget,
'DLoption': DLoption,
'DTO': mtinfo.get('DTO'),
'errFlag': errFlag,
'SQLQueen' : SQLQueen # <-----
}
是不行的,貌似是有个 concurrent.futures pick 锁什么的 请教如何可以让各个子进程,都能不冲突地,实时送到全局的队列里,各自进程都可以 put,get,但是又不冲突?
1
qazwsxkevin OP 最后重新改写了大部分函数
再用了 Manager().Queue() 作为队列 事件算是解决了。。。 |