22 threading多线程
有些任务需要"同时"做多件事——比如一边下载文件一边处理数据、一边接收请求一边写日志。threading模块就是干这个的,它让你在一个进程里开多个线程,并行执行任务。
注意:Python有GIL(全局解释器锁),同一时刻只有一个线程在执行Python字节码。所以threading适合I/O密集型任务(网络请求、文件读写),不适合CPU密集型任务(大量计算)。CPU密集型任务用multiprocessing。
一、创建线程
1.1 基本用法
python
import threading
def worker(num):
print(f"线程 {num} 开始工作")
# 模拟耗时操作
import time
time.sleep(2)
print(f"线程 {num} 完成")
# 创建线程
t = threading.Thread(target=worker, args=(1,))
# 启动线程
t.start()
# 等待线程完成
t.join()
print("主线程继续")1.2 多个线程
python
import threading
def worker(num):
print(f"线程 {num} 开始")
import time
time.sleep(2)
print(f"线程 {num} 完成")
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print("所有线程完成")1.3 继承Thread类
python
import threading
class MyThread(threading.Thread):
def __init__(self, num):
super().__init__()
self.num = num
def run(self):
print(f"线程 {self.num} 开始")
import time
time.sleep(2)
print(f"线程 {self.num} 完成")
# 使用
t = MyThread(1)
t.start()
t.join()二、线程同步
2.1 Lock:互斥锁
保护共享资源,防止多个线程同时修改。
python
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
lock.acquire() # 获取锁
try:
counter += 1
finally:
lock.release() # 释放锁
threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"结果: {counter}") # 500000更简洁的写法,用with语句:
python
def increment():
global counter
for _ in range(100000):
with lock:
counter += 12.2 RLock:可重入锁
同一线程可以多次获取锁,不会死锁。
python
import threading
rlock = threading.RLock()
def recursive_function(n):
with rlock:
if n > 0:
print(f"递归 {n}")
recursive_function(n - 1)
t = threading.Thread(target=recursive_function, args=(5,))
t.start()
t.join()2.3 Condition:条件变量
线程间的通知机制。
python
import threading
condition = threading.Condition()
data_ready = False
def producer():
global data_ready
with condition:
print("生产数据...")
import time
time.sleep(2)
data_ready = True
condition.notify() # 通知消费者
def consumer():
with condition:
while not data_ready:
condition.wait() # 等待通知
print("消费数据")
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()2.4 Event:事件标志
简单的线程间通知。
python
import threading
event = threading.Event()
def waiter():
print("等待事件...")
event.wait() # 阻塞直到事件被设置
print("事件触发了!")
def setter():
import time
time.sleep(2)
event.set() # 触发事件
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)
t1.start()
t2.start()
t1.join()
t2.join()2.5 Semaphore:信号量
限制同时访问资源的线程数量。
python
import threading
semaphore = threading.Semaphore(3) # 最多3个并发
def limited_task(id):
with semaphore:
print(f"任务 {id} 开始")
import time
time.sleep(2)
print(f"任务 {id} 完成")
threads = [threading.Thread(target=limited_task, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()2.6 Barrier:栅栏
等待所有线程到达某个点再一起继续。
python
import threading
barrier = threading.Barrier(3)
def worker(id):
print(f"线程 {id} 准备就绪")
barrier.wait() # 等待其他线程
print(f"线程 {id} 开始执行")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()三、Timer:定时器
python
import threading
def delayed_action():
print("3秒后执行的操作")
# 3秒后执行
timer = threading.Timer(3.0, delayed_action)
timer.start()
print("主线程继续执行")
timer.join()四、线程信息
4.1 获取当前线程
python
import threading
def worker():
current = threading.current_thread()
print(f"当前线程: {current.name}, ID: {current.ident}")
t = threading.Thread(target=worker, name="Worker-1")
t.start()
t.join()4.2 列出所有线程
python
import threading
def worker():
import time
time.sleep(5)
threads = [threading.Thread(target=worker, name=f"Worker-{i}") for i in range(3)]
for t in threads:
t.start()
# 列出所有活跃线程
for t in threading.enumerate():
print(f"线程: {t.name}, 存活: {t.is_alive()}")五、守护线程
守护线程在主线程结束时自动退出。
python
import threading
import time
def daemon_worker():
while True:
print("守护线程运行中...")
time.sleep(1)
# daemon=True:设为守护线程
t = threading.Thread(target=daemon_worker, daemon=True)
t.start()
time.sleep(3)
print("主线程结束")
# 守护线程会随主线程一起退出六、实战场景
6.1 并发下载
python
import threading
import time
def download(url):
print(f"开始下载 {url}")
time.sleep(2) # 模拟下载
print(f"完成下载 {url}")
return f"{url} 的内容"
urls = ["url1", "url2", "url3", "url4", "url5"]
threads = []
for url in urls:
t = threading.Thread(target=download, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join()
print("所有下载完成")6.2 生产者-消费者
python
import threading
import queue
q = queue.Queue()
def producer():
for i in range(10):
q.put(i)
print(f"生产: {i}")
time.sleep(0.5)
def consumer():
while True:
item = q.get()
if item is None: # 结束信号
break
print(f"消费: {item}")
q.task_done()
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
q.put(None) # 发送结束信号
t2.join()七、常见问题
7.1 竞态条件
python
# 错误:不加锁会导致数据不一致
counter = 0
def increment():
global counter
for _ in range(100000):
counter += 1 # 不是原子操作
# 正确:加锁保护
lock = threading.Lock()
def safe_increment():
global counter
for _ in range(100000):
with lock:
counter += 17.2 死锁
python
# 错误:可能死锁
lock1 = threading.Lock()
lock2 = threading.Lock()
def thread1():
with lock1:
with lock2: # 等待lock2
pass
def thread2():
with lock2:
with lock1: # 等待lock1
pass
# 正确:按相同顺序获取锁
def thread1():
with lock1:
with lock2:
pass
def thread2():
with lock1: # 也先获取lock1
with lock2:
pass八、总结
threading模块的核心:
| 组件 | 用途 |
|---|---|
Thread | 创建线程 |
Lock | 互斥锁 |
RLock | 可重入锁 |
Condition | 条件变量 |
Event | 事件标志 |
Semaphore | 信号量 |
Barrier | 栅栏同步 |
Timer | 定时器 |
使用场景:
- I/O密集型任务(网络请求、文件读写)
- 需要"同时"处理多个任务
- 后台任务、定时任务
记住:Python的threading受GIL限制,同一时刻只有一个线程执行Python代码。CPU密集型任务用multiprocessing。