前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Python基础编程】高效并发编程及协程、线程、进程的交叉应用

【Python基础编程】高效并发编程及协程、线程、进程的交叉应用

作者头像
易辰君
发布2024-11-07 22:09:13
1050
发布2024-11-07 22:09:13
举报
文章被收录于专栏:CSDN

前言

上篇文章主要讲述了python的进程,进程池和进程与线程对比等知识,接下来这篇文章再唠唠python的协程,让我们继续往下看!

一、协程的使用

python 中的协程是一种用于处理并发任务的高效工具,它依赖于 asyncio 库以及 asyncawait 关键字来实现异步编程。协程与传统的多线程或多进程并发模型不同,它通过事件循环实现任务的调度,在单线程内并发执行多个任务,适用于 I/O 密集型任务,如网络请求、文件操作等。

(一)基本使用

协程的定义使用 async 关键字,调用协程函数不会立即执行,而是返回一个协程对象,只有通过 await 关键字来执行它。await 会暂停当前协程的执行,等待另一个协程完成后再继续。

示例:

代码语言:javascript
复制
import asyncio

# 定义一个协程函数
async def say_hello():
    print("Hello")
    await asyncio.sleep(1)  # 模拟耗时操作
    print("World")

# 定义主函数
async def main():
    await say_hello()

# 启动事件循环
asyncio.run(main())

代码解释:

  • async def say_hello() 定义了一个协程函数,它会在 await asyncio.sleep(1) 处暂停 1 秒,模拟一个耗时任务。
  • asyncio.run(main()) 启动了事件循环并执行协程。

(二)并发执行多个任务

协程的优势在于可以并发执行多个任务,避免顺序执行带来的阻塞。通过 asyncio.gather(),可以同时运行多个协程,从而提高程序效率。

示例:

代码语言:javascript
复制
import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)  # 模拟耗时操作
    print("Task 1 finished")

async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 finished")

async def main():
    # 并发运行两个任务
    await asyncio.gather(task1(), task2())

asyncio.run(main())

代码解释:

  • asyncio.gather() 用于并发执行 task1()task2()。两个任务同时开始,而不会等待前一个任务完成再执行下一个。

(三)协程与异步I/O

协程在处理 I/O 密集型任务时表现尤为出色,例如网络请求、文件读取等。通过 aiohttp 等异步库,可以大大提高程序的响应速度。

示例:

代码语言:javascript
复制
import asyncio
import time

async def fetch_data(url):
    print(f"Fetching data from {url}")
    await asyncio.sleep(2)  # 模拟网络请求
    print(f"Received data from {url}")

async def main():
    start = time.time()
    # 并发执行多个网络请求
    await asyncio.gather(fetch_data('url1'), fetch_data('url2'), fetch_data('url3'))
    end = time.time()
    print(f"Total time: {end - start} seconds")

asyncio.run(main())

代码解释:

  • asyncio.gather() 并发运行多个网络请求的协程任务。
  • 由于 await asyncio.sleep(2) 是异步操作,因此多个请求会并发执行,从而显著减少总的执行时间。

(四)创建任务并独立执行

有时需要在不等待任务完成的情况下创建协程任务,可以使用 asyncio.create_task() 来创建一个独立执行的协程任务。

示例:

代码语言:javascript
复制
import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 finished")

async def main():
    # 创建任务,但不等待其完成
    task = asyncio.create_task(task1())
    print("Task 1 is running in the background")
    await asyncio.sleep(1)  # 主程序继续执行其他操作
    print("Main task completed")

asyncio.run(main())

代码解释:

  • asyncio.create_task() 创建了一个独立的协程任务,它会在后台执行,而主程序可以继续执行其他操作。
  • task1() 的执行不会阻塞主任务的执行。

(五)协程中的异常处理

在协程中同样可以使用 try/except 进行异常处理,这样可以确保即使某个协程抛出异常,程序依然可以继续执行。

示例:

代码语言:javascript
复制
import asyncio

async def faulty_task():
    try:
        print("Faulty task started")
        await asyncio.sleep(1)
        raise ValueError("An error occurred!")
    except ValueError as e:
        print(f"Error caught: {e}")

async def main():
    await faulty_task()

asyncio.run(main())

该示例展示了如何在协程中捕获并处理异常,避免程序因异常崩溃。

(六)超时控制

有时,某些任务可能会长时间运行或卡住,可以通过 asyncio.wait_for() 为协程任务设置超时时间,如果任务未在指定时间内完成,将抛出 asyncio.TimeoutError 异常。

示例:

代码语言:javascript
复制
import asyncio

async def long_task():
    print("Long task started")
    await asyncio.sleep(5)
    print("Long task finished")

async def main():
    try:
        # 设置超时时间为 3 秒
        await asyncio.wait_for(long_task(), timeout=3)
    except asyncio.TimeoutError:
        print("Task timed out!")

asyncio.run(main())

asyncio.wait_for() 设置任务的超时时间。如果 long_task() 在 3 秒内未完成,将抛出 TimeoutError

(七)总结

python 中的协程是一种高效处理并发任务的工具,特别适用于 I/O 密集型操作。通过 async/await 语法和 asyncio 库,可以在单线程环境下实现多任务并发,避免不必要的阻塞,极大提高程序的执行效率。

  • asyncawait 是定义和调用协程的基础。
  • asyncio.gather()asyncio.create_task() 实现并发任务。
  • 异常处理、超时控制、同步函数的异步化都可以在协程中灵活应用。

二、concurrent中的future对象

concurrent.futures 模块中,Future 对象是用于表示一个异步操作的结果,它可以帮助我们在多线程或多进程环境下跟踪任务的执行状态,并在任务完成后获取结果。Future 对象通常与线程池 ThreadPoolExecutor或进程池 ProcessPoolExecutor一起使用。

(一)概述

Future 对象是一个容器,用于存储异步任务的结果。它提供了多种方法和属性,用来检查任务的状态、获取任务的结果,或者等待任务完成。它的核心思想是:异步任务在后台执行,程序可以继续运行而不阻塞,而当我们需要结果时,可以通过 Future 对象访问该任务的执行状态和结果。

(二)使用场景

Future 对象一般与 concurrent.futures 模块中的线程池或进程池执行器executor一起使用,用来并发地执行多个任务。

ThreadPoolExecutor 示例

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor

def task(n):
    print(f"处理任务 {n}")
    return n * 2

# 使用线程池执行多个任务
with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(task, i) for i in range(5)]
    for future in futures:
        print(f"任务结果: {future.result()}")

在上面的代码中,executor.submit() 提交任务并返回一个 Future 对象。通过调用 future.result(),我们可以获取任务的结果。如果任务还未完成,result() 将会阻塞直到任务完成。

(三)Future 对象的属性与方法

Future 对象提供了几种方法和属性,用来跟踪和获取异步任务的状态和结果。

主要方法和属性

  • future.result(timeout=None):用于获取异步任务的结果。如果任务完成,立即返回结果;如果任务尚未完成,则会等待。如果设置了 timeout 参数,则最多等待 timeout 秒,超过时间将抛出 TimeoutError 异常。如果任务在执行过程中抛出了异常,result() 也会重新抛出该异常。
  • future.done():返回 True 表示任务已经完成(无论是成功完成还是抛出异常),否则返回 False
  • future.cancel():用于尝试取消异步任务。如果任务未开始执行,则可以取消并返回 True;如果任务已经开始,则无法取消,返回 False
  • future.cancelled():返回 True 表示任务已经被取消,返回 False 表示任务没有被取消。
  • future.running():返回 True 表示任务正在执行,返回 False 表示任务未执行或已经完成。
  • future.add_done_callback(fn):给 Future 对象添加一个回调函数 fn,当任务完成时会调用该函数。回调函数会接收 Future 对象作为参数。

示例:

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    time.sleep(n)
    return n * 2

with ThreadPoolExecutor() as executor:
    future = executor.submit(task, 3)
    print(f"任务完成了吗? {future.done()}")
    result = future.result()  # 这里会阻塞直到任务完成
    print(f"任务结果:{result}")
    print(f"任务完成了吗? {future.done()}")

在这个例子中,future.done() 在任务开始执行时返回 False,而当任务完成后再调用 done() 则返回 True

(四)Future 对象的回调机制

Future 对象支持回调机制,通过 add_done_callback() 方法,我们可以在任务完成时自动调用指定的回调函数。回调函数会接收当前 Future 对象作为参数。

示例:

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor

def task(n):
    return n * 2

def callback(future):
    print(f"回调:任务结果是 {future.result()}")

with ThreadPoolExecutor() as executor:
    future = executor.submit(task, 3)
    future.add_done_callback(callback)

在这个示例中,当任务完成时,回调函数会自动被调用,并且可以通过传递的 Future 对象来获取任务结果。

(五)as_completedwait

concurrent.futures 模块还提供了 as_completed()wait() 函数,便于在多个 Future 对象上等待和检查结果。

  • as_completed(futures):返回一个迭代器,当每个 Future 对象完成时,它会按照完成的顺序返回。
  • wait(futures, timeout=None, return_when=ALL_COMPLETED):等待多个 Future 对象的完成,支持超时和条件返回。

示例:

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor, as_completed

def task(n):
    return n * 2

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(task, i) for i in range(5)]
    for future in as_completed(futures):
        print(f"任务结果: {future.result()}")

(六)总结

concurrent.futures 模块中的 Future 对象为我们提供了处理异步任务的方式。它可以通过线程池或进程池来并发执行任务,并允许我们轻松地获取任务的执行状态、结果以及异常处理。Future 对象的灵活性使它成为并发编程中不可或缺的工具,适用于 I/O 密集型任务、CPU 密集型任务等场景。


三、协程与线程和进程的交叉使用

在 Python 编程中,协程、线程和进程是三种常用的并发编程方式。它们各自适用于不同的场景,但在一些复杂的应用中,可能需要将它们交叉使用,以充分发挥它们各自的优势,实现更高效的并发处理。

(一)协程、线程和进程的区别

协程

  • 轻量级并发:协程是由 Python 内部实现的用户级并发,基于事件循环。协程通过 asyncawait 关键字实现异步非阻塞的 I/O 操作,适合处理 I/O 密集型任务,如网络请求、文件读写等。
  • 单线程执行:协程通常在单个线程中执行,通过释放控制权 (await) 来提高程序的并发性,不会占用多个 CPU 核心。

线程

  • 操作系统级并发:线程由操作系统调度,可以在同一个进程内运行多个线程。线程可以利用多核 CPU 来并发执行代码。
  • 多线程共享内存:线程共享进程的内存资源,但这也带来了线程安全问题,因此需要使用锁(Lock)或其他同步机制来避免数据竞争。
  • 适合 I/O 密集型任务:对于 I/O 密集型任务,多线程可以有效提高程序的响应速度。

进程

  • 独立的内存空间:进程是完全独立的运行实体,拥有自己的内存空间和资源。通过 multiprocessing 模块可以在不同的 CPU 核心上并行运行进程。
  • 进程间通信(IPC):进程间不能直接共享内存,因此需要使用管道、队列等方式进行通信。
  • 适合 CPU 密集型任务:由于 Python 的全局解释器锁(GIL),在单个进程内无法同时运行多个 Python 字节码,但通过多进程可以避免 GIL 的影响,充分利用多核 CPU,适合 CPU 密集型任务如图像处理、大数据计算等。

(二)协/线/进程的交叉使用场景

  • 协程与线程的交叉使用

协程可以在单线程中提供高效的 I/O 并发处理,但有时需要同时进行一些阻塞的同步操作,或者需要利用多核 CPU 进行并发计算时,可以将协程和线程结合使用。

示例:在协程中运行阻塞的同步代码

代码语言:javascript
复制
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time

# 阻塞的同步任务
def blocking_task():
    time.sleep(2)
    return "任务完成"

# 使用协程调度异步任务
async def main():
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor() as pool:
        # 在线程池中运行同步阻塞任务
        result = await loop.run_in_executor(pool, blocking_task)
        print(result)

# 启动事件循环
asyncio.run(main())

在这个例子中,blocking_task 是一个同步任务,通过 ThreadPoolExecutor 在单独的线程中运行,从而避免阻塞事件循环。协程通过 await 来异步等待线程中的任务完成,这种方式结合了协程的异步优势和线程的多核并发处理能力。

  • 协程与进程的交叉使用

在某些情况下,单线程中的协程可能无法满足 CPU 密集型任务的需求,因此可以结合进程来处理耗费 CPU 的任务。

示例:在协程中并发运行多进程任务

代码语言:javascript
复制
import asyncio
from concurrent.futures import ProcessPoolExecutor
import time

# CPU 密集型任务
def cpu_bound_task(n):
    total = 0
    for i in range(1000000 * n):
        total += i
    return total

# 使用协程调度异步任务
async def main():
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        # 在进程池中运行 CPU 密集型任务
        result = await loop.run_in_executor(pool, cpu_bound_task, 5)
        print(f"计算结果: {result}")

# 启动事件循环
asyncio.run(main())

在这个例子中,cpu_bound_task 是一个 CPU 密集型任务,它在协程环境中通过 ProcessPoolExecutor 来并发执行。协程负责调度和等待进程的结果返回,从而避免事件循环被阻塞。

  • 线程与进程的交叉使用

有时我们可能需要同时处理 I/O 密集型和 CPU 密集型任务,这时可以考虑将线程和进程结合使用。

示例:线程池和进程池的结合

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

# I/O 密集型任务
def io_task():
    time.sleep(1)
    print("I/O 密集型任务完成")

# CPU 密集型任务
def cpu_task(n):
    total = 0
    for i in range(1000000 * n):
        total += i
    print(f"CPU 密集型任务结果: {total}")

# 使用线程池和进程池并发执行任务
def main():
    with ThreadPoolExecutor() as thread_pool, ProcessPoolExecutor() as process_pool:
        # 在线程池中运行 I/O 密集型任务
        for _ in range(3):
            thread_pool.submit(io_task)

        # 在进程池中运行 CPU 密集型任务
        for _ in range(2):
            process_pool.submit(cpu_task, 5)

if __name__ == "__main__":
    main()

在这个示例中,io_task 在线程池中并发执行,而 cpu_task 则在进程池中运行。这种设计模式使得我们可以在同一个程序中同时处理 I/O 密集型任务和 CPU 密集型任务,从而最大化利用系统资源。

(三)总结

协程、线程和进程各有其优点和适用场景:

  • 协程:适用于 I/O 密集型任务,尤其是大量网络请求、文件操作等场景,能够高效地进行异步非阻塞操作。
  • 线程:适用于 I/O 密集型任务,可以在多核 CPU 上并发执行,但需要注意线程安全和锁的问题。
  • 进程:适用于 CPU 密集型任务,能够充分利用多核 CPU,但进程间通信的开销较大。

四、总结

这篇文章主要讲的是协程的基本使用,原理,以及协程、线程和进程间的差别和交叉使用,根据不同的需求使用不同的功能,使得代码运行效率更高!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-10-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 一、协程的使用
    • (一)基本使用
      • (二)并发执行多个任务
        • (三)协程与异步I/O
          • (四)创建任务并独立执行
            • (五)协程中的异常处理
              • (六)超时控制
                • (七)总结
                • 二、concurrent中的future对象
                  • (一)概述
                    • (二)使用场景
                      • (三)Future 对象的属性与方法
                        • (四)Future 对象的回调机制
                          • (五)as_completed 和 wait
                            • (六)总结
                            • 三、协程与线程和进程的交叉使用
                              • (一)协程、线程和进程的区别
                                • (二)协/线/进程的交叉使用场景
                                  • (三)总结
                                  • 四、总结
                                  相关产品与服务
                                  大数据
                                  全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
                                  领券
                                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档