from __future__ import annotations
import sys
import threading
import time
from datetime import datetime
from random import randint
from PySide6.QtCore import QObject, QRunnable, QThreadPool, Signal, Slot
from PySide6.QtWidgets import QApplication, QLabel, QMainWindow, QProgressBar, QPushButton, QVBoxLayout, QWidget
def get_time_str() -> str:
return datetime.now().isoformat(sep = ' ')
class WorkerSignals(QObject):
begin = Signal(str, str)
finished = Signal(str, str)
progress_value = Signal(int) # 进度值
class MyWorker(QRunnable):
def __init__(self, *args, **kwargs):
# 传递参数给 Worker
super().__init__()
print('args=', args, 'kwargs=', kwargs)
# 初始化信号和槽
self.signals = WorkerSignals()
self.args = args
self.kwargs = kwargs
@Slot()
def run(self) -> None:
thread_id = str(threading.get_ident())
# 开始工作
self.signals.begin.emit(thread_id, get_time_str())
total_progress = 1000
for i in range(total_progress):
time.sleep(float(randint(100, 500)) / 1000) # 模拟耗时操作
self.signals.progress_value.emit(i) # 发送信号,更新进度值
# 结束工作
self.signals.finished.emit(thread_id, get_time_str())
class MyMainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle('Hello, PySide6!')
self.setToolTip('A PySide6 GUI Application Demo')
self.progress_bar = QProgressBar()
self.progress_bar.setRange(0, 1000)
self.label = QLabel('0.0%')
self.button = QPushButton('Start')
self.button.clicked.connect(self.start_threads)
v_layout = QVBoxLayout()
v_layout.addWidget(self.progress_bar)
v_layout.addWidget(self.label)
v_layout.addWidget(self.button)
container = QWidget()
container.setLayout(v_layout)
self.setCentralWidget(container)
self.threads = QThreadPool()
print('maximum threads: {}'.format(self.threads.maxThreadCount()))
def update_progress_value(self, value: int) -> None:
percentage = (value / 1000) * 100 # 计算百分比
self.label.setText(f'{percentage:.1f}%')
self.progress_bar.setValue(value)
def handle_worker_begin(self, thread_id: str, time_str: str) -> None:
print('BEGIN thread_id=', thread_id, ', time_str =', time_str)
def handle_worker_finished(self, thread_id: str, time_str: str) -> None:
print('FINISHED thread_id=', thread_id, ', time_str =', time_str)
def start_threads(self) -> None:
worker = MyWorker()
worker.signals.begin.connect(self.handle_worker_begin)
worker.signals.finished.connect(self.handle_worker_finished)
worker.signals.progress_value.connect(self.update_progress_value)
self.threads.start(worker)
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MyMainWindow()
window.show()
app.exec()
from __future__ import annotations
import sys
import threading
import time
from datetime import datetime
from random import randint
from PySide6.QtCore import QObject, QRunnable, QThreadPool, Signal, Slot
from PySide6.QtWidgets import QApplication, QLabel, QMainWindow, QProgressBar, QPushButton, QVBoxLayout, QWidget
def get_time_str() -> str:
return datetime.now().isoformat(sep = ' ')
class WorkerSignals(QObject):
begin = Signal(str, str)
finished = Signal(str, str)
progress_value = Signal(int) # 进度值
class MyWorker(QRunnable):
def __init__(self, *args, **kwargs):
# 传递参数给 Worker
super().__init__()
print('args=', args, 'kwargs=', kwargs)
# 初始化信号和槽
self.signals = WorkerSignals()
self.args = args
self.kwargs = kwargs
@Slot()
def run(self) -> None:
thread_id = str(threading.get_ident())
# 开始工作
self.signals.begin.emit(thread_id, get_time_str())
total_progress = 1000
for i in range(total_progress):
time.sleep(float(randint(100, 500)) / 1000) # 模拟耗时操作
self.signals.progress_value.emit(i) # 发送信号,更新进度值
# 结束工作
self.signals.finished.emit(thread_id, get_time_str())
class MyMainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle('Hello, PySide6!')
self.setToolTip('A PySide6 GUI Application Demo')
self.progress_bar = QProgressBar()
self.progress_bar.setRange(0, 1000)
self.label = QLabel('0.0%')
self.button = QPushButton('Start')
self.button.clicked.connect(self.start_threads)
v_layout = QVBoxLayout()
v_layout.addWidget(self.progress_bar)
v_layout.addWidget(self.label)
v_layout.addWidget(self.button)
container = QWidget()
container.setLayout(v_layout)
self.setCentralWidget(container)
self.threads = QThreadPool()
print('maximum threads: {}'.format(self.threads.maxThreadCount()))
def update_progress_value(self, value: int) -> None:
percentage = (value / 1000) * 100 # 计算百分比
self.label.setText(f'{percentage:.1f}%')
self.progress_bar.setValue(value)
def handle_worker_begin(self, thread_id: str, time_str: str) -> None:
self.button.setEnabled(False) # 线程开始后将按钮设置为禁用状态
print('BEGIN thread_id=', thread_id, ', time_str =', time_str)
def handle_worker_finished(self, thread_id: str, time_str: str) -> None:
self.button.setEnabled(True) # 线程完成后将按钮设置为可用状态
print('FINISHED thread_id=', thread_id, ', time_str =', time_str)
def start_threads(self) -> None:
worker = MyWorker()
worker.signals.begin.connect(self.handle_worker_begin)
worker.signals.finished.connect(self.handle_worker_finished)
worker.signals.progress_value.connect(self.update_progress_value)
self.threads.start(worker)
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MyMainWindow()
window.show()
app.exec()
from __future__ import annotations
import sys
import threading
import time
from datetime import datetime
from random import randint
from PySide6.QtCore import QEvent, QObject, QRunnable, QThreadPool, Signal, Slot
from PySide6.QtWidgets import QApplication, QLabel, QMainWindow, QMessageBox, QProgressBar, QPushButton, QVBoxLayout, QWidget
def get_time_str() -> str:
return datetime.now().isoformat(sep = ' ')
class WorkerSignals(QObject):
begin = Signal(str, str)
finished = Signal(str, str)
progress_value = Signal(int) # 进度值
class MyWorker(QRunnable):
def __init__(self, *args, **kwargs):
# 传递参数给 Worker
super().__init__()
print('args=', args, 'kwargs=', kwargs)
# 初始化信号和槽
self.signals = WorkerSignals()
self.args = args
self.kwargs = kwargs
@Slot()
def run(self) -> None:
thread_id = str(threading.get_ident())
# 开始工作
self.signals.begin.emit(thread_id, get_time_str())
total_progress = 1000
for i in range(total_progress):
time.sleep(float(randint(100, 500)) / 1000) # 模拟耗时操作
self.signals.progress_value.emit(i) # 发送信号,更新进度值
# 结束工作
self.signals.finished.emit(thread_id, get_time_str())
class MyMainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.active_workers = 0
self.setWindowTitle('Hello, PySide6!')
self.setToolTip('A PySide6 GUI Application Demo')
self.progress_bar = QProgressBar()
self.progress_bar.setRange(0, 1000)
self.label = QLabel('0.0%')
self.button = QPushButton('Start')
self.button.clicked.connect(self.start_threads)
v_layout = QVBoxLayout()
v_layout.addWidget(self.progress_bar)
v_layout.addWidget(self.label)
v_layout.addWidget(self.button)
container = QWidget()
container.setLayout(v_layout)
self.setCentralWidget(container)
self.threads = QThreadPool()
print('maximum threads: {}'.format(self.threads.maxThreadCount()))
def closeEvent(self, event: QEvent) -> None:
if self.threads.activeThreadCount() > 0:
QMessageBox.critical(self, '任务正在执行', '还有进程在运行,不允许关闭',
QMessageBox.StandardButton.Ignore)
event.ignore() # 忽略关闭事件
def update_progress_value(self, value: int) -> None:
percentage = (value / 1000) * 100 # 计算百分比
self.label.setText(f'{percentage:.1f}%')
self.progress_bar.setValue(value)
def handle_worker_begin(self, thread_id: str, time_str: str) -> None:
self.button.setEnabled(False)
self.active_workers += 1
print('BEGIN thread_id=', thread_id, ', time_str =', time_str)
def handle_worker_finished(self, thread_id: str, time_str: str) -> None:
self.button.setEnabled(True)
self.active_workers -= 1
print('FINISHED thread_id=', thread_id, ', time_str =', time_str)
def start_threads(self) -> None:
worker = MyWorker()
worker.signals.begin.connect(self.handle_worker_begin)
worker.signals.finished.connect(self.handle_worker_finished)
worker.signals.progress_value.connect(self.update_progress_value)
self.threads.start(worker)
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MyMainWindow()
window.show()
app.exec()
此时只能强制退出程序,且退出时仍然会有报错:
from __future__ import annotations
import sys
import threading
import time
from datetime import datetime
from random import randint
from PySide6.QtCore import QEvent, QObject, QRunnable, QThreadPool, Signal, Slot
from PySide6.QtWidgets import QApplication, QLabel, QMainWindow, QProgressBar, QPushButton, QVBoxLayout, QWidget
def get_time_str() -> str:
return datetime.now().isoformat(sep = ' ')
class WorkerSignals(QObject):
begin = Signal(str, str)
finished = Signal(str, str)
progress_value = Signal(int) # 进度值
class MyWorker(QRunnable):
def __init__(self, *args, **kwargs):
# 传递参数给 Worker
super().__init__()
print('args=', args, 'kwargs=', kwargs)
# 初始化信号和槽
self.signals = WorkerSignals()
self.args = args
self.kwargs = kwargs
self.killed = False
@Slot()
def run(self) -> None:
thread_id = str(threading.get_ident())
# 开始工作
self.signals.begin.emit(thread_id, get_time_str())
total_progress = 1000
for i in range(total_progress):
if self.killed:
print('worker quit....')
return
time.sleep(float(randint(100, 1000)) / 1000) # 模拟耗时操作
self.signals.progress_value.emit(i) # 发送信号,更新进度值
# 结束工作
self.signals.finished.emit(thread_id, get_time_str())
def kill(self) -> None:
print('set worker to killed')
self.killed = True
class MyMainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.active_workers = 0
self.setWindowTitle('Hello, PySide6!')
self.setToolTip('A PySide6 GUI Application Demo')
self.progress_bar = QProgressBar()
self.progress_bar.setRange(0, 1000)
self.label = QLabel('0.0%')
self.button = QPushButton('Start')
self.button.clicked.connect(self.start_threads)
self.worker = MyWorker() # 创建一个 Worker 对象
v_layout = QVBoxLayout()
v_layout.addWidget(self.progress_bar)
v_layout.addWidget(self.label)
v_layout.addWidget(self.button)
container = QWidget()
container.setLayout(v_layout)
self.setCentralWidget(container)
self.threads = QThreadPool()
print('maximum threads: {}'.format(self.threads.maxThreadCount()))
def closeEvent(self, event: QEvent) -> None:
if self.active_workers > 0:
self.worker.kill()
self.threads.waitForDone()
self.active_workers -= 1
self.worker = None
event.accept()
def update_progress_value(self, value: int) -> None:
percentage = (value / 1000) * 100 # 计算百分比
self.label.setText(f'{percentage:.1f}%')
self.progress_bar.setValue(value)
def handle_worker_begin(self, thread_id: str, time_str: str) -> None:
self.button.setEnabled(False)
self.active_workers += 1
print('BEGIN thread_id=', thread_id, ', time_str =', time_str)
def handle_worker_finished(self, thread_id: str, time_str: str) -> None:
self.button.setEnabled(True)
self.active_workers -= 1
print('FINISHED thread_id=', thread_id, ', time_str =', time_str)
def start_threads(self) -> None:
if self.worker is not None:
self.worker.signals.begin.connect(self.handle_worker_begin)
self.worker.signals.finished.connect(self.handle_worker_finished)
self.worker.signals.progress_value.connect(self.update_progress_value)
self.threads.start(self.worker)
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MyMainWindow()
window.show()
app.exec()
from __future__ import annotations
import sys
import threading
import time
from datetime import datetime
from random import randint
from PySide6.QtCore import QEvent, QObject, QRunnable, QThreadPool, Signal, Slot
from PySide6.QtWidgets import QApplication, QLabel, QMainWindow, QProgressBar, QPushButton, QVBoxLayout, QWidget
def get_time_str() -> str:
return datetime.now().isoformat(sep = ' ')
class WorkerSignals(QObject):
begin = Signal(str, str)
finished = Signal(str, str)
progress_value = Signal(int) # 进度值
class MyWorker(QRunnable):
def __init__(self, *args, **kwargs):
# 传递参数给 Worker
super().__init__()
print('args=', args, 'kwargs=', kwargs)
# 初始化信号和槽
self.signals = WorkerSignals()
self.args = args
self.kwargs = kwargs
self.killed = False
self.paused = False
@Slot()
def run(self) -> None:
thread_id = str(threading.get_ident())
# 开始工作
self.signals.begin.emit(thread_id, get_time_str())
total_progress = 1000
for i in range(total_progress + 1):
while self.paused:
time.sleep(0)
if self.killed:
print('worker quit....')
return
time.sleep(float(randint(1, 10)) / 1000) # 模拟耗时操作
self.signals.progress_value.emit(i) # 发送信号,更新进度值
# 结束工作
self.signals.finished.emit(thread_id, get_time_str())
def kill(self) -> None:
print('set worker to killed')
self.killed = True
def pause(self) -> None:
self.paused = True
def pause_resume(self) -> None:
self.paused = False
def is_paused(self) -> bool:
return self.paused
class MyMainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.active_workers = 0
self.setWindowTitle('Hello, PySide6!')
self.setToolTip('A PySide6 GUI Application Demo')
self.progress_bar = QProgressBar()
self.progress_bar.setRange(0, 1000)
self.label = QLabel('0.0%')
self.button = QPushButton('Start')
self.button.clicked.connect(self.start_threads)
self.pause_button = QPushButton('Pause')
self.pause_button.clicked.connect(self.pause_resume)
self.pause_button.setEnabled(False)
self.worker = MyWorker() # 创建一个 Worker 对象
v_layout = QVBoxLayout()
v_layout.addWidget(self.progress_bar)
v_layout.addWidget(self.label)
v_layout.addWidget(self.button)
v_layout.addWidget(self.pause_button)
container = QWidget()
container.setLayout(v_layout)
self.setCentralWidget(container)
self.threads = QThreadPool()
print('maximum threads: {}'.format(self.threads.maxThreadCount()))
def closeEvent(self, event: QEvent) -> None:
if self.active_workers > 0:
self.worker.kill()
self.worker.pause_resume()
self.threads.waitForDone()
self.active_workers -= 1
self.worker = None
event.accept()
def update_progress_value(self, value: int) -> None:
if self.worker is not None and not self.worker.is_paused():
percentage = (value / 1000) * 100 # 计算百分比
self.label.setText(f'{percentage:.1f}%')
self.progress_bar.setValue(value)
def handle_worker_begin(self, thread_id: str, time_str: str) -> None:
self.button.setEnabled(False)
self.active_workers += 1
print('BEGIN thread_id=', thread_id, ', time_str =', time_str)
def handle_worker_finished(self, thread_id: str, time_str: str) -> None:
self.button.setEnabled(True)
self.active_workers -= 1
print('FINISHED thread_id=', thread_id, ', time_str =', time_str)
self.worker = MyWorker() # 创建一个 Worker 对象
self.pause_button.setEnabled(False)
def start_threads(self) -> None:
if self.worker is not None:
self.worker.signals.begin.connect(self.handle_worker_begin)
self.worker.signals.finished.connect(self.handle_worker_finished)
self.worker.signals.progress_value.connect(self.update_progress_value)
self.threads.start(self.worker)
self.pause_button.setEnabled(True)
def pause_resume(self) -> None:
if self.worker is not None:
if not self.worker.is_paused():
self.worker.pause()
self.pause_button.setText('Resume')
else:
self.worker.pause_resume()
self.pause_button.setText('Pause')
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MyMainWindow()
window.show()
app.exec()
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。