多任务是现代编程中的重要概念,可以显著提升程序效率。Python 中线程、进程和协程是三种主要并发方式,各有独特特点和适用场景。


01 进程(Process)

进程是操作系统进行资源分配和调度的基本单位。每个进程拥有独立的内存空间,可以绕过 GIL 实现真正的多核并行,非常适合 CPU 密集型任务。进程间通信需通过 Queue、Pipe 或共享内存等机制完成。

典型使用场景: 大规模数值计算、图像/视频批处理、机器学习数据预处理、密码学运算。

import multiprocessing
import time

def print_time(process_name):
    count = 0
    while count < 5:
        time.sleep(1)
        count += 1
        print(f"{process_name}: {time.ctime(time.time())}")

# 创建两个进程
process1 = multiprocessing.Process(target=print_time, args=("Process-1",))
process2 = multiprocessing.Process(target=print_time, args=("Process-2",))

process1.start()
process2.start()
process1.join()
process2.join()

print("主进程退出")

避坑指南

坑 1:Windows / macOS 忘加 if __name__ == '__main__' 保护

Windows 和 macOS 使用 spawn 模式启动子进程,会重新导入主模块。若没有入口保护,子进程会再次执行顶层代码,无限递归创建进程,最终触发 RuntimeError

✅ 所有进程创建代码必须放在 if __name__ == '__main__': 内。Linux 使用 fork 模式表面上不报错,但仍建议加上以保持跨平台兼容性。

坑 2:进程数量过多,反而比单进程更慢

进程的创建和销毁开销远大于线程。若任务粒度太小(如每个任务只耗时几毫秒),进程间通信和调度的开销会远超计算收益。

✅ 使用 multiprocessing.Pool 配合 pool.map(),进程数控制在 os.cpu_count() 以内,并通过 chunksize 参数合并小任务。

坑 3:子进程中的异常被静默吞掉

直接使用 Process 时,子进程抛出的异常不会自动传播到主进程,排查问题极为困难。

✅ 使用 Pool.map()concurrent.futures.ProcessPoolExecutor,异常会在主进程调用 .result() 时重新抛出。手动管理时,用 Queue 将异常对象传回主进程。

坑 4:进程间传递不可序列化对象

进程间通信依赖 pickle 序列化。lambda 函数、文件句柄、数据库连接、socket 等对象无法被 pickle,传递时会抛出 PicklingError

✅ 只传递基本数据类型或可序列化的数据结构。数据库连接等资源应在子进程内部初始化,而非从主进程传入。


02 线程(Thread)

线程是操作系统调度的最小执行单位。同一进程的线程共享内存空间,数据共享方便,但也因此引入了竞态条件风险。

Python 存在 GIL(全局解释器锁),同一时刻只有一个线程执行 Python 字节码,多线程无法利用多核 CPU 进行 CPU 密集型并行。但 IO 操作会释放 GIL,多线程在 IO 等待期间可以真正并发切换。

典型使用场景: 网络爬虫、图像并行处理、GUI 后台任务、兼容旧同步库的并发改造。

import threading
import time

def print_time(thread_name, delay):
    count = 0
    while count < 5:
        time.sleep(delay)
        count += 1
        print(f"{thread_name}: {time.ctime(time.time())}")

thread1 = threading.Thread(target=print_time, args=("Thread-1", 1))
thread2 = threading.Thread(target=print_time, args=("Thread-2", 2))

thread1.start()
thread2.start()
thread1.join()
thread2.join()

print("主线程退出")

避坑指南

坑 1:共享变量操作不加锁,导致数据竞争

多线程同时读写同一变量时,即使是看似原子的 counter += 1,在字节码层面也是多步操作,极易发生数据错误且难以复现。

# ❌ 错误写法
counter = 0
def increment():
    global counter
    counter += 1  # 非线程安全!

# ✅ 正确写法
lock = threading.Lock()
def safe_increment():
    global counter
    with lock:
        counter += 1

坑 2:守护线程(daemon=True)导致任务被强制终止

设置 daemon=True 的线程会在主线程退出时被强制杀死,即使它正在执行写文件、提交数据库事务等关键操作,数据完整性无法保证。

✅ 只对真正的后台辅助任务(如心跳检测、日志刷新)使用 daemon 线程。涉及 IO 或状态修改的线程必须显式 .join()

坑 3:误用多线程处理 CPU 密集型任务,性能反而下降

受 GIL 限制,纯 Python 的 CPU 密集型任务在多线程下实际是串行执行,且线程切换还会引入额外开销,导致性能比单线程更差。

✅ 若任务中 CPU 时间占比超过 70%,应换用 multiprocessingProcessPoolExecutor。numpy/scipy 等扩展库在底层绕过 GIL,视具体库情况而定。

坑 4:线程池大小设置不合理

为 IO 密集型任务开几百个线程并非总是最优。线程本身有内存开销(默认栈约 8 MB),线程切换有 CPU 代价,下游服务也往往有并发连接限制。

✅ 使用 ThreadPoolExecutor(max_workers=N),IO 密集型场景 N 的经验值为 CPU 核数 × 5,建议通过压测确定最优值。


03 协程(Coroutine)

协程是在单线程内实现并发调度的轻量级机制。通过 async/await 语法,协程在等待 IO 时主动让出控制权,事件循环趁机执行其他协程,从而在没有线程切换开销的情况下高效处理大量并发 IO。

典型使用场景: 高并发 API 服务、异步爬虫、WebSocket 服务、微服务 RPC 调用。

import asyncio
import time

async def print_time(delay, task_name):
    count = 0
    while count < 5:
        await asyncio.sleep(delay)
        count += 1
        print(f"{task_name}: {time.ctime(time.time())}")

async def main():
    await asyncio.gather(
        print_time(1, "Task-1"),
        print_time(2, "Task-2"),
    )

asyncio.run(main())  # Python 3.7+ 推荐写法

💡 推荐用 asyncio.run() 替代旧版 loop.get_event_loop() 写法,前者自动管理事件循环的创建与关闭,避免资源泄漏。

避坑指南

坑 1:在协程中调用同步阻塞函数,卡死整个事件循环

这是 asyncio 最常见也最致命的坑。time.sleep()requests.get()、同步数据库驱动等阻塞调用会霸占事件循环,导致所有其他协程挂起,整个程序丧失并发能力。

# ❌ 错误写法:阻塞整个事件循环
async def fetch(url):
    return requests.get(url)  # 同步阻塞!

# ✅ 正确写法:使用异步库
async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

# ✅ 或将同步函数卸载到线程池
async def fetch_sync(url):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, requests.get, url)

坑 2:忘记 await,协程对象从未被执行

调用 async 函数时若漏写 await,返回的是协程对象,函数体根本不会执行。Python 只会给出 RuntimeWarning: coroutine 'xxx' was never awaited,非常容易被忽略。

✅ 开发时启用调试模式:asyncio.run(main(), debug=True),会将此类 warning 升级为错误日志。

坑 3:大量协程无限制并发,耗尽连接或触发限流

asyncio.gather(*[task() for _ in range(100000)]) 会瞬间创建 10 万个并发任务,轻松耗尽数据库连接池或触发下游服务限流。

# ✅ 使用 Semaphore 控制最大并发数
sem = asyncio.Semaphore(50)  # 最多 50 个并发

async def limited_task(url):
    async with sem:
        return await fetch(url)

坑 4:asyncio 与多线程/多进程混用姿势错误

从非事件循环线程中直接调用 await 会报错;在子线程中调用 asyncio.run() 则会报"已有事件循环在运行"。

✅ 在独立线程中运行 asyncio 时,使用 asyncio.run_coroutine_threadsafe(coro, loop) 将协程提交到已存在的事件循环;或在子线程中用 asyncio.new_event_loop() 创建独立循环。


04 对比总结与选型建议

特性 进程 线程 协程
内存隔离 独立(安全) 共享(需加锁) 共享(单线程无竞争)
并行能力 真正多核并行 受 GIL 限制 单线程并发
创建开销 高(MB 级) 中(KB 级) 极低(字节级)
通信方式 Queue / Pipe 共享变量(加锁) 直接共享(await 协作)
适合场景 CPU 密集型 IO 密集型 + 兼容旧代码 高并发 IO / 网络服务
调试难度 高(竞态难复现) 中(需理解事件循环)

选型决策:

  • IO 密集(API 调用 / 爬虫 / RPC) → 优先协程(asyncio
  • CPU 密集(数值计算 / 加密 / 数据处理) → 多进程(multiprocessing
  • 兼容同步库 / 简单并发改造 → 线程(threading

05 通用避坑清单

以下问题与具体并发模型无关,三种方式都可能踩到。

坑 1:没有超时控制,任务永久挂起

无论是 process.join()thread.join() 还是 await coro(),不加超时时若任务卡住,主程序会永远等待。

✅ 始终设置超时:进程/线程用 .join(timeout=30),协程用 asyncio.wait_for(coro(), timeout=30),网络请求用库自带的 timeout 参数。

坑 2:异常处理缺失,程序"假装正常"运行

并发场景下子任务的异常经常被吞掉或延迟抛出,程序表面运行正常,但实际结果已错误。

✅ 对 asyncio.gather() 使用 return_exceptions=True;对 Future / Task 添加 add_done_callback;生产环境配置统一错误报警。

坑 3:资源未释放,连接池 / 文件句柄泄漏

并发任务中途报错,若没有使用 try/finally 或上下文管理器,数据库连接、HTTP session、文件句柄等资源会持续泄漏,最终耗尽系统资源。

✅ 使用 with / async with 管理所有需要释放的资源,配合连接池统一管理。

坑 4:并发写日志,输出混乱或文件损坏

多个进程/线程同时 print() 或写文件时,输出会交叉穿插,既难以阅读,也可能损坏文件结构。

✅ 使用标准库的 logging 模块(默认线程安全)。多进程场景改用 logging.handlers.QueueHandler + 独立日志进程。生产代码中禁止裸 print() 做日志输出。