V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
qazwsxkevin
V2EX  ›  Python

请教这个 concurrent.futures 多进程处理 SQL 队列,为什么只处理了第一个,就停了下来?

  •  
  •   qazwsxkevin · 2020-11-01 16:33:34 +08:00 · 2059 次点击
    这是一个创建于 1518 天前的主题,其中的信息可能已经有所发展或是发生改变。
    # coding=utf-8
    import time
    import pymysql
    import MySQLdb
    import AnalyFunc 
    
    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, wait, ALL_COMPLETED, as_completed
    from dbutils.pooled_db import PooledDB
    
    GloSQLQueueBreakFlag = 1 # 处理队列退出判断信号
    
    # 处理 SQL 队列
    def procSQLcmd(sqlqueue):
        import time
        from dbutils.pooled_db import PooledDB
        import pymysql
    
        POOL = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=80,  # 连接池允许的最大连接数,0 和 None 表示不限制连接数
            mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建
            maxcached=5,  # 链接池中最多闲置的链接,0 和 None 不限制
            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待; False,不等待然后报错
            maxusage=None,  # 一个链接最多被重复使用的次数,None 表示无限制
            setsession=[],  # 开始会话前执行的命令列表。
            ping=0,  # ping MySQL 服务端,检查是否服务可用。
            host='192.168.89.48',
            port=3306,
            user='root',
            password='root123',
            database='eee',
            charset='utf8'
        )
    
        while True:
            while not sqlqueue.empty():
                print(GloSQLQueueBreakFlag:', str(GloSQLQueueBreakFlag))
                sqlTask = sqlqueue.get()
                DBconn = POOL.connection()
                cur = DBconn.cursor()
                print("sqlTask:",sqlTask)
                ses = cur.execute(sqlTask)
                cur.close()  # or del cur
                DBconn.close()  # or del db
                time.sleep(0.5)
            if GloSQLQueueBreakFlag == 0:
                break
            else:
                time.sleep(1)
        return
    
    if __name__ == '__main__':
        from concurrent import futures
        from multiprocessing import Manager
        from teFunc import TranDicttoSQLcmd
    
        SQLQueue = Manager().Queue()
        ProcessSQLQueue = futures.ProcessPoolExecutor(max_workers=1)
        ProcessSQLQueueRet = ProcessSQLQueue.submit(procSQLcmd,SQLQueue)
    
        # 导入测试数据,成为字典列表
        teList = eval(AnalyFunc.ReadFiletoStr('h:/testdict.dict'))
    
        for i in teList:
            print(i)
            TranDicttoSQLcmd('testSheet', i, SQLQueue)  # 把字典转换成 MySQL 的 INSERT 语句,同时把语句作为任务交到全局队列,交由独立进程的 procSQLcmd 函数去处理
    
    
        # 最后确保队列全部弄完,才完全退出整个程序
        waitSQLQueue = True
        while waitSQLQueue == True:
            time.sleep(0.5)
            SQLQueueCount = SQLQueue.qsize()
            print(f'SQLQueue 队列还有:{SQLQueueCount} 未处理完.')
            if SQLQueueCount == 0:
                GloSQLQueueBreakFlag = 0
                waitSQLQueue = False
    
    # 将字典转换成 SQL 语句
    def TranDicttoSQLcmd(tblName,DictObj,SQLQueue,printSQL=False):
        import time
        # 组合字段
        FiledStr = ''
        ValueStr = ''
        SQLText = ''
    
        # 生成 INSERT 语句
        SQLcmd = "INSERT INTO %s ({}) VALUE ({});" % tblName
    
        # 单一字典
        if isinstance(DictObj, dict):
            FiledStr = ''
            ValueStr = ''
            for k, v in DictObj.items():
                if v == None:
                    continue
                FiledStr = FiledStr + "`%s`" % (k) + ','
                ValueStr = ValueStr + "'%s'" % (str(v)) + ','
            FiledStr = FiledStr[:-1]
            ValueStr = ValueStr[:-1]
            SQLText = SQLcmd.format(FiledStr, ValueStr)
            if printSQL:
                print('TranDicttoSQLcmd:',SQLText)
            if SQLQueue:
                SQLQueue.put(SQLText)
            return SQLText
    
        # 字典列表
        if isinstance(DictObj, list):
            kvDict = {}
            ccount = 0
            for i in DictObj:
                FiledStr = ''
                ValueStr = ''
                ccount += 1
                for k,v in i.items():
                    if v == None:
                        continue
                    FiledStr = FiledStr + "`%s`" % (k) + ','
                    ValueStr = ValueStr + "'%s'" % (str(v)) + ','
                FiledStr = FiledStr[:-1]
                ValueStr = ValueStr[:-1]
                SQLText = SQLcmd.format(FiledStr, ValueStr)
                if printSQL:
                    print('TranDicttoSQLcmd:',SQLText)
                if SQLQueue:
                    SQLQueue.put(SQLText)
        return
    

    运行过程:

    SQLQueue 队列还有:538
    GloSQLQueueBreakFlag: 1
    sqlTask: INSERT INTO testSheet (`tename`,`amount`,`weight`) VALUE ('椰子','218','72170');
    SQLQueue 队列还有:537
    SQLQueue 队列还有:537
    SQLQueue 队列还有:537
    SQLQueue 队列还有:537
    SQLQueue 队列还有:537
    SQLQueue 队列还有:537
    SQLQueue 队列还有:537
    SQLQueue 队列还有:537
    SQLQueue 队列还有:537
    SQLQueue 队列还有:537
    SQLQueue 队列还有:537
    SQLQueue 队列还有:537
    #一直刷下去重复
    

    检查 testSheet 表,一个椰子内容被插入,处理 SQL 语句队列的进程函数工作不正常是什么原因呢?

    1 条回复    2020-11-06 02:51:44 +08:00
    abucus
        1
    abucus  
       2020-11-06 02:51:44 +08:00
    尝试把 `procSQLcmd` 方法里的 `break` 语句注释掉看看?
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   955 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 20:53 · PVG 04:53 · LAX 12:53 · JFK 15:53
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.