多任务是现代编程中的重要概念,可以显著提升程序效率。Python 中线程、进程和协程是三种主要并发方式,各有独特特点和适用场景。
01 进程(Process)
进程是操作系统进行资源分配和调度的基本单位。每个进程拥有独立的内存空间,可以绕过 GIL 实现真正的多核并行,非常适合 CPU 密集型任务。进程间通信需通过 Queue、Pipe 或共享内存等机制完成。
典型使用场景: 大规模数值计算、图像/视频批处理、机器学习数据预处理、密码学运算。
import multiprocessing
import time
def print_time(process_name):
count = 0
while count < 5:
time.sleep(1)
count += 1
print(f"{process_name}: {time.ctime(time.time())}")
# 创建两个进程
process1 = multiprocessing.Process(target=print_time, args=("Process-1",))
process2 = multiprocessing.Process(target=print_time, args=("Process-2",))
process1.start()
process2.start()
process1.join()
process2.join()
print("主进程退出")
避坑指南
坑 1:Windows / macOS 忘加 if __name__ == '__main__' 保护
Windows 和 macOS 使用 spawn 模式启动子进程,会重新导入主模块。若没有入口保护,子进程会再次执行顶层代码,无限递归创建进程,最终触发 RuntimeError。
✅ 所有进程创建代码必须放在
if __name__ == '__main__':内。Linux 使用 fork 模式表面上不报错,但仍建议加上以保持跨平台兼容性。
坑 2:进程数量过多,反而比单进程更慢
进程的创建和销毁开销远大于线程。若任务粒度太小(如每个任务只耗时几毫秒),进程间通信和调度的开销会远超计算收益。
✅ 使用
multiprocessing.Pool配合pool.map(),进程数控制在os.cpu_count()以内,并通过chunksize参数合并小任务。
坑 3:子进程中的异常被静默吞掉
直接使用 Process 时,子进程抛出的异常不会自动传播到主进程,排查问题极为困难。
✅ 使用
Pool.map()或concurrent.futures.ProcessPoolExecutor,异常会在主进程调用.result()时重新抛出。手动管理时,用 Queue 将异常对象传回主进程。
坑 4:进程间传递不可序列化对象
进程间通信依赖 pickle 序列化。lambda 函数、文件句柄、数据库连接、socket 等对象无法被 pickle,传递时会抛出 PicklingError。
✅ 只传递基本数据类型或可序列化的数据结构。数据库连接等资源应在子进程内部初始化,而非从主进程传入。
02 线程(Thread)
线程是操作系统调度的最小执行单位。同一进程的线程共享内存空间,数据共享方便,但也因此引入了竞态条件风险。
Python 存在 GIL(全局解释器锁),同一时刻只有一个线程执行 Python 字节码,多线程无法利用多核 CPU 进行 CPU 密集型并行。但 IO 操作会释放 GIL,多线程在 IO 等待期间可以真正并发切换。
典型使用场景: 网络爬虫、图像并行处理、GUI 后台任务、兼容旧同步库的并发改造。
import threading
import time
def print_time(thread_name, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
print(f"{thread_name}: {time.ctime(time.time())}")
thread1 = threading.Thread(target=print_time, args=("Thread-1", 1))
thread2 = threading.Thread(target=print_time, args=("Thread-2", 2))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("主线程退出")
避坑指南
坑 1:共享变量操作不加锁,导致数据竞争
多线程同时读写同一变量时,即使是看似原子的 counter += 1,在字节码层面也是多步操作,极易发生数据错误且难以复现。
# ❌ 错误写法
counter = 0
def increment():
global counter
counter += 1 # 非线程安全!
# ✅ 正确写法
lock = threading.Lock()
def safe_increment():
global counter
with lock:
counter += 1
坑 2:守护线程(daemon=True)导致任务被强制终止
设置 daemon=True 的线程会在主线程退出时被强制杀死,即使它正在执行写文件、提交数据库事务等关键操作,数据完整性无法保证。
✅ 只对真正的后台辅助任务(如心跳检测、日志刷新)使用 daemon 线程。涉及 IO 或状态修改的线程必须显式
.join()。
坑 3:误用多线程处理 CPU 密集型任务,性能反而下降
受 GIL 限制,纯 Python 的 CPU 密集型任务在多线程下实际是串行执行,且线程切换还会引入额外开销,导致性能比单线程更差。
✅ 若任务中 CPU 时间占比超过 70%,应换用
multiprocessing或ProcessPoolExecutor。numpy/scipy 等扩展库在底层绕过 GIL,视具体库情况而定。
坑 4:线程池大小设置不合理
为 IO 密集型任务开几百个线程并非总是最优。线程本身有内存开销(默认栈约 8 MB),线程切换有 CPU 代价,下游服务也往往有并发连接限制。
✅ 使用
ThreadPoolExecutor(max_workers=N),IO 密集型场景 N 的经验值为 CPU 核数 × 5,建议通过压测确定最优值。
03 协程(Coroutine)
协程是在单线程内实现并发调度的轻量级机制。通过 async/await 语法,协程在等待 IO 时主动让出控制权,事件循环趁机执行其他协程,从而在没有线程切换开销的情况下高效处理大量并发 IO。
典型使用场景: 高并发 API 服务、异步爬虫、WebSocket 服务、微服务 RPC 调用。
import asyncio
import time
async def print_time(delay, task_name):
count = 0
while count < 5:
await asyncio.sleep(delay)
count += 1
print(f"{task_name}: {time.ctime(time.time())}")
async def main():
await asyncio.gather(
print_time(1, "Task-1"),
print_time(2, "Task-2"),
)
asyncio.run(main()) # Python 3.7+ 推荐写法
💡 推荐用
asyncio.run()替代旧版loop.get_event_loop()写法,前者自动管理事件循环的创建与关闭,避免资源泄漏。
避坑指南
坑 1:在协程中调用同步阻塞函数,卡死整个事件循环
这是 asyncio 最常见也最致命的坑。time.sleep()、requests.get()、同步数据库驱动等阻塞调用会霸占事件循环,导致所有其他协程挂起,整个程序丧失并发能力。
# ❌ 错误写法:阻塞整个事件循环
async def fetch(url):
return requests.get(url) # 同步阻塞!
# ✅ 正确写法:使用异步库
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
# ✅ 或将同步函数卸载到线程池
async def fetch_sync(url):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, requests.get, url)
坑 2:忘记 await,协程对象从未被执行
调用 async 函数时若漏写 await,返回的是协程对象,函数体根本不会执行。Python 只会给出 RuntimeWarning: coroutine 'xxx' was never awaited,非常容易被忽略。
✅ 开发时启用调试模式:
asyncio.run(main(), debug=True),会将此类 warning 升级为错误日志。
坑 3:大量协程无限制并发,耗尽连接或触发限流
asyncio.gather(*[task() for _ in range(100000)]) 会瞬间创建 10 万个并发任务,轻松耗尽数据库连接池或触发下游服务限流。
# ✅ 使用 Semaphore 控制最大并发数
sem = asyncio.Semaphore(50) # 最多 50 个并发
async def limited_task(url):
async with sem:
return await fetch(url)
坑 4:asyncio 与多线程/多进程混用姿势错误
从非事件循环线程中直接调用 await 会报错;在子线程中调用 asyncio.run() 则会报"已有事件循环在运行"。
✅ 在独立线程中运行 asyncio 时,使用
asyncio.run_coroutine_threadsafe(coro, loop)将协程提交到已存在的事件循环;或在子线程中用asyncio.new_event_loop()创建独立循环。
04 对比总结与选型建议
| 特性 | 进程 | 线程 | 协程 |
|---|---|---|---|
| 内存隔离 | 独立(安全) | 共享(需加锁) | 共享(单线程无竞争) |
| 并行能力 | 真正多核并行 | 受 GIL 限制 | 单线程并发 |
| 创建开销 | 高(MB 级) | 中(KB 级) | 极低(字节级) |
| 通信方式 | Queue / Pipe | 共享变量(加锁) | 直接共享(await 协作) |
| 适合场景 | CPU 密集型 | IO 密集型 + 兼容旧代码 | 高并发 IO / 网络服务 |
| 调试难度 | 中 | 高(竞态难复现) | 中(需理解事件循环) |
选型决策:
- IO 密集(API 调用 / 爬虫 / RPC) → 优先协程(
asyncio) - CPU 密集(数值计算 / 加密 / 数据处理) → 多进程(
multiprocessing) - 兼容同步库 / 简单并发改造 → 线程(
threading)
05 通用避坑清单
以下问题与具体并发模型无关,三种方式都可能踩到。
坑 1:没有超时控制,任务永久挂起
无论是 process.join()、thread.join() 还是 await coro(),不加超时时若任务卡住,主程序会永远等待。
✅ 始终设置超时:进程/线程用
.join(timeout=30),协程用asyncio.wait_for(coro(), timeout=30),网络请求用库自带的timeout参数。
坑 2:异常处理缺失,程序"假装正常"运行
并发场景下子任务的异常经常被吞掉或延迟抛出,程序表面运行正常,但实际结果已错误。
✅ 对
asyncio.gather()使用return_exceptions=True;对Future/Task添加add_done_callback;生产环境配置统一错误报警。
坑 3:资源未释放,连接池 / 文件句柄泄漏
并发任务中途报错,若没有使用 try/finally 或上下文管理器,数据库连接、HTTP session、文件句柄等资源会持续泄漏,最终耗尽系统资源。
✅ 使用
with/async with管理所有需要释放的资源,配合连接池统一管理。
坑 4:并发写日志,输出混乱或文件损坏
多个进程/线程同时 print() 或写文件时,输出会交叉穿插,既难以阅读,也可能损坏文件结构。
✅ 使用标准库的
logging模块(默认线程安全)。多进程场景改用logging.handlers.QueueHandler+ 独立日志进程。生产代码中禁止裸print()做日志输出。