from threading import Thread
import time
def action():
for item in range(40):
print('我是子线程')
time.sleep(0.5)
if __name__ == '__main__':
th_obj = Thread(target=action, args=())
# 设置 .daemon = True,子线程跟着主进程一起死
# 在 start() 之前写!!!
th_obj.daemon = True
th_obj.start()
# 主进程有限循环
for i in range(10):
time.sleep(1)
print('父进程')
# th_obj.join() # 主进程等待子线程完成
from threading import Thread, Semaphore
import time
import random
def action(th, sem):
# 控制只有 4 个线程在操作
# 与 Lock() 一样上锁
sem.acquire()
print(th, '进入了程序')
time.sleep(random.randint(3, 5))
print(th, '结束了程序')
sem.release()
if __name__ == '__main__':
th_list = []
# 只允许 5个 进程同时操作
sem_obj = Semaphore(5)
for i in range(1, 21):
th_obj = Thread(target=action, args=('线程-> {}'.format(i), sem_obj))
th_obj.start()
th_list.append(th_obj)
for th_obj in th_list:
th_obj.join()
# 只要是同一个锁对象,都可以管控全局线程
# 不同的进程在不同的函数内做自己的事儿
# 线程先后顺序不随机
# 谁先拿到第一把锁,则其他的锁都会全部先给第一个拿到第一把锁的人
# 需要多把锁的时候,防止出现 A 一把锁,B 一把锁 造成全局的死锁
from threading import Thread, RLock
import time
import random
def func(th, lock_1, lock_2):
time.sleep(3)
lock_1.acquire()
print(th, '拿到了 锁_1')
lock_2.acquire()
print(th, '拿到了 锁_2')
time.sleep(random.randint(1, 5))
print(th, '干了一些事')
lock_1.release()
print(th, '放回了 锁_1')
lock_2.release()
print(th, '放回了 锁_2')
print('********************')
# 不同的进程在不同的函数内做自己的事儿
# 线程先后顺序不随机
# 谁先拿到第一把锁,则其他的锁都会全部先给第一个拿到第一把锁的人
# 需要多把锁的时候,防止出现 A 一把锁,B 一把锁 造成全局的死锁
def action_1(th, lock_1, lock_2):
print('action_1 函数做一些别的事')
func(th, lock_1, lock_2)
def action_2(th, lock_1, lock_2):
print('action_2 函数做一些别的事')
func(th, lock_1, lock_2)
def action_3(th, lock_1, lock_2):
print('action_3 函数做一些别的事')
func(th, lock_1, lock_2)
if __name__ == '__main__':
lock1 = lock2 = RLock()
Thread(target=action_1, args=('线程1', lock1, lock2)).start()
Thread(target=action_2, args=('线程2', lock1, lock2)).start()
Thread(target=action_3, args=('线程3', lock1, lock2)).start()
from threading import Thread
from queue import Queue
import time
import random
def producer(th_name, que):
for num in range(1, 6):
time.sleep(random.randint(1, 3))
content = th_name + '创造了数据' + str(num)
print('>>>', content, '<<<')
# .put(),将生产的数据放进全局队列
que.put(content)
que.put(None) # 最后放一个 None,告知生产者已经结束生产
def consumer(th_name, que):
while 1:
# .get(),从队列中拿数据
# 拿一个队列中就少一个
# 先放进队列的数据,就先被拿出来
queue_content = que.get()
if queue_content is None:
que.task_done()
break
print('===', th_name, '拿到了', queue_content, '===')
if __name__ == '__main__':
# 队列是全局的,在不同进程之间可传递参数
que_obj = Queue()
# 生产者进程
for i in range(1, 4):
th_producer = Thread(target=producer, args=('生产者-> {} '.format(i), que_obj))
th_producer.start()
# 消费者进程
for i in range(1, 3):
th_consumer = Thread(target=consumer, args=('消费者-> {} '.format(i), que_obj))
th_consumer.start()
from threading import Thread, Event
import time
import random
# 模拟红绿灯,设置事件的阻塞状态
def light_action(event):
while 1:
# 事件的默认状态是 False
print('>>>红灯亮了<<<')
time.sleep(10)
event.set()
print('===绿灯亮了===')
time.sleep(10)
event.clear()
# 模拟车辆
def car_action(event, t_car):
# 模拟绿灯
while 1:
if event.is_set():
print(t_car, '===过去了===')
break
else:
print(t_car, '>>>在等待<<<')
event.wait()
time.sleep(random.randint(1, 5))
if __name__ == '__main__':
# 设置一个全局的事件,控制进程之间的阻塞
event_obj = Event()
# 一个进程用来模拟红绿灯,设置事件阻塞
th_light = Thread(target=light_action, args=(event_obj,))
th_light.start()
for i in range(1, 101):
# 随机来车辆
time.sleep(random.randint(1, 3))
th_car = Thread(target=car_action, args=(event_obj, '车辆-> {}'.format(i)))
th_car.start()
# 事件为 True,event.wait() 直接返回 True
# 事件为 False,event 会一直阻塞,不返回任何值
# 假设设置 timeout=1,则 event 等待一秒,如果一秒到了,事件依然为 False,则直接返回 False
import threading
import time
from threading import Thread
def worker(e: threading.Event):
if e.wait(timeout=1):
print("我干活了!")
else:
print("我等了一面,事件还没有变成 True,不等了!")
if __name__ == '__main__':
event = threading.Event()
t = Thread(target=worker, args=(event,))
t.start()
time.sleep(5)
# event.set()
# .add_done_callback() 使用回调函数,
# 该回调无法产生返回值, 即使函数中 return 也不行
# 接收结果反而报错
# t_pool.submit(action1, item).add_done_callback(action2)
from concurrent.futures import ThreadPoolExecutor
import time
from threading import Thread, get_ident
from queue import Queue
def producer(que): # 模拟动态创建数据(请求)
i = 1
while 1:
que.put('数据--> {}'.format(i))
if i == 20:
que.put(None)
i += 1
def consumer(item):
time.sleep(3) # 线程花了一些时间干了一些事
print('线程号', get_ident(), '拿到了', item)
return item + '的返回值'
def callback(item):
print('callback : ', item)
if __name__ == '__main__':
que_producer = Queue() # 生产者队列
th_producer = Thread(target=producer, args=(que_producer,))
th_producer.daemon = True
th_producer.start()
pool_obj = ThreadPoolExecutor(max_workers=3)
res_list = []
while 1:
data = que_producer.get()
if data is None:
pool_obj.shutdown() # 关闭线程池,使进程池不再接受新的任务,相当于进程池中 close() + join()
break
# res = pool_obj.submit(consumer, data).add_done_callback(callback) # 回调函数
res = pool_obj.submit(consumer, data) # 非阻塞的
# print(res.result()) # 千万不要在这里打印结果,否则变成了单线程,先 append ,全部完成后 .result() 取值
res_list.append(res)
for res in res_list:
print(res.result()) # 当有回调函数的时候,.result()取值会报错
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。