PyQt6基础_QThreadPool
目录
前置
代码:
运行
正常运行
运行报错
前置
1 当要使用子线程非常多时,如果使用QThread实现,就面临两个问题的决策
1.1 根据硬件设备,创建的QThread最理想个数是多少
1.2 QThread的复用和回收要怎么处理
2 基于【1】中的考虑,可以使用 QThreadPool 来实现多线程。QThreadPool一般都和QRunnable一起使用。任务在QRunnable中执行,QThreadPool通过QRunnable实现对线程的管理。
3 主线程与子线程之间的交互需要通过信号和槽实现,QRunnable不支持信号,需要额外定义信号的类在Qrunnable中作为变量使用。
4 QRunnable 有且只有任务执行完毕线程才结束。所以如果要实现手动控制线程结束,需要定义一个标记,在QRunnable任务循环中对标记做判断,通过中断循环从而强制任务结束达到线程结束的目的。
代码:
import os,traceback,sys
from time import sleep
from datetime import datetime
from PyQt6.QtCore import (
QSize,
QObject,
pyqtSlot,
pyqtSignal,QTime,QRunnable,QThreadPool
)
from PyQt6.QtWidgets import (QApplication,QMainWindow,QLabel,QPushButton,QWidget,QVBoxLayout,QMessageBox,QGroupBox
)class WorkerSignals(QObject):signal_finished = pyqtSignal(str) # 任务执行结束信号,返回 线程名 thread_namesignal_error = pyqtSignal(tuple) # 任务出错信号,返回 (thread_name,exctype,value,traceback.format_exc())signal_result = pyqtSignal(object) # 任务结果信号,返回任务执行结果passclass Worker(QRunnable):def __init__(self, task_data:dict):super().__init__()self.task_data = task_data # 要有 thread_name ,用于识别线程self.thread_name = task_data['thread_name']self.signals = WorkerSignals()self.is_stop = False # 用于控制任务停止,结束线程pass@pyqtSlot()def run(self):results = f'{self.thread_name} 任务执行结束。'try:i = 0while True:if i == 10:breakif i == 5:raise ValueError('测试报错功能')if self.is_stop:self.signals.signal_result.emit((self.thread_name,f"{self.thread_name} 任务手动终止。第 {i} 次。 {QTime.currentTime().toString('hh:mm:ss')}"))breakself.signals.signal_result.emit((self.thread_name,f"{self.thread_name} 第 {i} 次。 {QTime.currentTime().toString('hh:mm:ss')}"))sleep(self.task_data['interval'])i += 1passpassexcept:traceback.print_exc()exctype, value = sys.exc_info()[:2]self.signals.signal_error.emit((self.task_data['thread_name'], exctype, value, traceback.format_exc()))passelse:self.signals.signal_result.emit((self.thread_name,results))finally:self.signals.signal_finished.emit(self.thread_name)passpassdef stop_run(self):self.is_stop = Truepasspassclass MainWindow(QMainWindow):signal_worker = pyqtSignal(object)def __init__(self):super().__init__()self.setWindowTitle('QThreadPool')self.setMinimumSize(QSize(600,800))self.layout_groupbox = QVBoxLayout()groupbox = QGroupBox('线程执行显示')groupbox.setLayout(self.layout_groupbox)self.label_runnable = QLabel()self.label_runnable.setWordWrap(True)self.btn = QPushButton('启动线程池',clicked=self.btn_clicked)self.btn_stop = QPushButton('结束线程池',clicked=self.btn_stop_clicked)self.btn_main = QPushButton('main',clicked=self.btn_main_clicked)self.label_main = QLabel()layout = QVBoxLayout()layout.addWidget(groupbox)layout.addWidget(self.label_runnable)layout.addWidget(self.btn)layout.addWidget(self.btn_stop)layout.addWidget(self.btn_main)layout.addWidget(self.label_main)widget = QWidget()widget.setLayout(layout)self.setCentralWidget(widget)self.open_init()passdef open_init(self):self.waitting_close = Falseself.label_dict:dict = {}self.threadpool = QThreadPool()self.max_thread_count = self.threadpool.maxThreadCount()self.thread_count = self.max_thread_count//2 if self.max_thread_count>4 else self.max_thread_countself.runner_dict = {}passdef btn_clicked(self):self.label_runnable.setText('开启线程池')while self.layout_groupbox.count():item = self.layout_groupbox.takeAt(0)widget = item.widget()if widget is not None:widget.deleteLater()passself.runner_dict.clear()self.label_dict.clear()for i in range(0,self.thread_count):thread_name = f"thread {i}_"task_data = {'thread_name': thread_name,'interval':i+1}label = QLabel(f"{thread_name} 创建")self.label_dict[thread_name] = labelself.layout_groupbox.addWidget(label)worker = Worker(task_data)worker.signals.signal_result.connect(self.thread_result_emit)worker.signals.signal_error.connect(self.thread_error_emit)worker.signals.signal_finished.connect(self.thread_finished_emit)self.runner_dict[thread_name] = workerself.threadpool.start(worker)passself.btn.setDisabled(True)passdef btn_stop_clicked(self):if self.threadpool.activeThreadCount()>0:for k,v in self.runner_dict.items():v.stop_run()passpassdef btn_main_clicked(self):now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')self.label_main.setText(f"当前还有 {self.threadpool.activeThreadCount()} 个线程在运行。{now_str}")passdef thread_result_emit(self,res:tuple):self.label_dict[res[0]].setText(res[1])passdef thread_error_emit(self,res:tuple):print(f"{res[0]} 报错啦。 {res[-1]}")self.label_dict[res[0]].setText(f"{res[0]} 报错啦。错误详情看控制台。")passdef thread_finished_emit(self,res:str):res_str = f"{res} finished"res_str00 = self.label_dict[res].text()self.label_dict[res].setText(f"finished {res_str00}")pre_str = self.label_runnable.text()self.label_runnable.setText(f"{res_str}\n{pre_str}")res_count = self.threadpool.activeThreadCount()if res_count != 0:del self.runner_dict[res]else:self.btn.setDisabled(False)if self.waitting_close:self.close()passdef closeEvent(self, a0):answer = QMessageBox.question(self,'确认退出?','退出将中断操作,确定要退出么?',QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No)if answer == QMessageBox.StandardButton.Yes:if self.threadpool.activeThreadCount()>0:self.waitting_close = Truea0.ignore()self.btn_stop_clicked()QMessageBox.information(self, '提示', '正在关闭线程,请稍等', QMessageBox.StandardButton.Ok)passelse:a0.accept()passelse:a0.ignore()passpasspassif __name__ == '__main__':app = QApplication([])window = MainWindow()window.show()app.exec()pass
运行
正常运行
运行报错
控制台打印