python笔记-进程和线程

概念

通俗点地理解:

对于操作系统来说,一个任务就是一个进程(process),比如打开一个浏览器就是启动一个浏览器进程,打开两个记事本就是打开两个记事本进程.

有些进程它同时不止干一件事,比如word,它同时可以打字,拼写检查,打印等.在一个进程内部可以同时运行多个”子任务”,则称之为线程(Thread).

由于每个进程至少要干一件事,所以每个进程至少要有一个线程.

进程(process)

  • 进程是计算机中正在运行的程序的实例
  • 它是操作系统分配资源的基本单位
    • 所以每个进程都有自己的内存空间,系统资源和状态信息

线程(thread)

  • 线程是进程内的执行单元
  • 它是cpu调度和分配的基本单位
  • 同一进程内多个线程共享该进程的资源
区别资源占用开销通信安全并行
进程每个进程独立创建和切换开销大进程间通信(IPC)比较复杂进程之间不会相互影响在多核处理器上实现真正并行
线程每个线程共享进程的资源创建和切换开销小线程间通信比较简单,直接读写进程中的数据一个线程的奔溃可能导致整个进程奔溃在单核处理器上是并发执行,在多核上可以并行

并发与并行(concurrency & parallelism)

  • 进程在多核处理器上实现真正并行:
    多个进程在多核处理器上同时运行,每个核心执行一个进程,实现真正并行.操作系统将不同进程分配到不同的cpu核心上,这是在硬件级别实现的并行.
  • 线程在单核处理器上是并发执行:
    多线程通过时间片轮转(time-slicing)实现的并发,线程上下文会不停地,给人一种同时执行的错觉.
    这种称为并发(concurrency),并不是真正的并行.
  • 线程在多核上可以并行:
    类似多进程,不同的线程分配到不同的核心上同时执行.
    这种称为并行(parallelism)

那么,多任务的执行就有三种模型:

  • 多进程
  • 多线程
  • 多进程+多线程

通常,同时执行多个任务时,各个任务之间并不是完全没有关联,通常都需要互相通信和协调,有时,任务1需要等待任务2完成后才能继续执行,有时任务3和4又不能同时执行,所以多进程和多线程程序的复杂度是很高的.

多进程

fork

unix/linux 提供了一个fork()调用. 它调用一次,返回两次.因为操作系统会自动把当前进程(父进程)复制一份(子进程),然后分别在父子进程内返回.在fork()中:

  • 子进程永远返回0

  • 父进程返回子进程的id

python的os模块有fork调用:

1
2
3
4
5
6
7
8
9
10
import os

print(f'process {os.getpid()} start...')

pid = os.fork() # 创建子进程

if pid == 0:
print(f'I am child process {os.getpid()}, my pranent is {os.getppid()}')
else:
print(f'I {os.getpid} just create a child process {pid}.')

输出如下:

1
2
3
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

因为父进程需要记住每个子进程的pid以便后续:

  • 等待子进程结束(wait)
  • 向子进程发送信号(kill)
  • 监控子进程状态

而子进程可以通过getppid()获取父进程的pid,所以不需要在fork()时候特别记录

multiprocessing

windows没有fork调用,如果要在windows上使用python编写多进程程序.multiprocessing就是一个跨平台版本的多进程模块.

1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process # multiprocess提供一个Process类来代表一个进程对象
import os

def run_proc(name):
print(f'Run child process {name} ({os.getpid()})...')

if __name__ == '__main__':
print(f'Parent process {os.getpid()}.')
p = Process(traget=run_proc, args=('test',)) # 输入执行函数和函数的参数,args需要以元组形式传入,所以即使只有一个元素也要加逗号
print('Child process will start.')
p.start() # 启动子进程
p.join() # 等待子进程结束后再继续往下运行,通常用于进程间的同步
print('Child process end.')

Pool

需要大量创建子进程时,可以通过进程池的方式批量创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
print(f'Run task {name} ({os.getpid()})')
strat = time.time()
time.sleep(random.random() * 3)
end = time.time()
print(f'Task {name} runs {(end - start)} seconds.')

if __name__ == '__main__': # windows下必须在if __name__ == '__main__':下使用进程池
print(f'Parent process {os.getpid()}')
p = Pool(4) # Pool同时执行的进程数,默认与cpu核数相同
for i in range(5):
p.apply_async(long_time_task, arg=(i,)) # 异步执行(不等待任务完成直接返回)
# 与之相对的是p.apply(),它会阻塞等待任务完成
print('Waitting for all subprocesses done...')
p.close() # 调用join()之前必须调用close(),执行了close()后就不能继续添加新的process了
p.join() # Pool对象调用join()方法会等待所有子进程执行完毕
print('All subprocesses done.')
1
2
3
4
5
6
7
8
9
10
11
12
13
Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)... # 这里服务器以4核为例,Pool(4)同时执行4个进程,所以task0-3同时执行
Task 2 runs 0.14 seconds.
Run task 4 (673)... # task 4要等前面某个task完成后才能执行,如果你的服务器是8核,则要写Pool(9),才能看到等待效果
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.

subprocess

很多时候子进程是一个外部程序.我们创建子进程后还要控制子进程的输入输出.subprocess模块可以很方便地做到.

1
2
3
4
5
import subprocess

print('run nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)
1
2
3
4
5
6
7
8
9
10
run nslookup www.python.org
Server: 192.168.19.4
Address: 192.168.19.4#53

Non-authoritative answer:
www.python.org canonical name = python.map.fastly.net.
Name: python.map.fastly.net
Address: 199.27.79.223

Exit code: 0

如果子进程需要输入,则可以通过communicate()方法

1
2
3
4
5
6
7
import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)

实际实现的就是这种效果:

1
2
3
4
5
6
7
8
9
10
11
$ nslookup
> set q=mx
> python.org
Server: 127.0.0.53
Address: 127.0.0.53#53

Non-authoritative answer:
python.org mail exchanger = 50 mail.python.org.

Authoritative answers can be found from:
> exit

进程间通信(IPC)

python的multiprocessing模块包装了底层的机制,提供queue, pipes等多种方式交换数据.

特性QueuePipes
进程数量支持多进程仅支两个进程
通信方向单向(FIFO)双向
使用场景多生产者/消费者(实际开发中如果不需要pipes的特殊功能,建议使用Queue,因为它更安全更易用)点对点通信
安全性进程安全(并发时不会出现数据混乱,因为有锁机制)自行处理同步
性能较慢(有锁机制)较快

更多通讯方式:

通信方式优点缺点适用场景
Queue使用简单,进程安全相对较慢多生产者/消费者
Pipe速度快,双向通讯仅支持两个进程两进程间频繁通信
共享内存最快的IPC方式需要自行处理同步大量数据共享
Manager支持复杂的python对象需要共享复杂数据结构
Value/Array简单高效仅支持基本数据类型共享简单数据
文件系统简单,持久化最慢,需要IO操作少量数据,需要持久化

下面以Queue为例,在父进程中创建两个子进程,一个往Queue写数据,一个往Queue读数据.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from multiprocessing import Process, Queue
import os, time, random

# 写数据
def write(q):
print(f'Process to write: {os.getpid()}')
for value in ['A', 'B', 'C']:
print(f'Put {value} to queue...')
q.put(value)
time.sleep(random.random())

# 读数据
def read(q):
print(f'Process to read {os.getpid()}')
while True:
value = q.get(True)
print(f'Get {value} from queue')

if __name__=='__main__':
# 父进程创建Queue,并传入子进程
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 子进程qw启动,写入
pw.start()
# 子进程qr启动,读取
pr.start()
# 等到pw结束
pw.join()
# pr内是死循环,无法等待结束,只能强行终止
pr.terminate()
  1. 父进程和子进程间的通讯必须通过pickle序列化来传递数据.也就是说传入的args必须是可序列化的对象.比如:

    1. 基本数据类型(数字, 字符串, 布尔值)
    2. 列表,字典,元组等内置容器
    3. 类实例(该类需要可实例化)

    以下类型则不能被实例化:

    1. 函数对象
    2. 线程对象
    3. 文件句柄
    4. 数据库连接
    5. Socket连接

    当你在windows系统下使用multiprocess遇到问题,首先检查是否涉及不能序列化的对象.

多线程

threading

一个进程至少有一个线程,线程是操作系统直接支持的执行单元.python的线程是真正的Posix Thread,不是模拟出来的线程.

python的标准库有两个模块: _threadthreading.

threading_thread的高级封装,一般我们只要使用threading就行.

任何进程都会默认启动一个线程,称为主线程,主线程又可以启动新的线程.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import time, threading
def loop():
thread_name = threading.current_thread().name # current_thread()返回当前线程得实例,主线程名字就叫MainThread
print(f'thread {thread_name} is running...')
n = 0
while n < 5:
n = n + 1
print(f'thread {thread_name} >>> {n}')
time.sleep(1)
print(f'thread {thread_name} ended')

print(f'thread {threading.current_thread().name} is running')
t = threading.Thread(target=loop, name='LoopThread') # 创建子线程,执行loop函数,命名LoopThread,不命名的话默认是Thread-1,Thread-2,...
t.start()
t.join()
print(f'thread {threading.current_thread().name} ended.')
1
2
3
4
5
6
7
8
9
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.

Lock

  • 多进程中,同一个变量各自有一份拷贝存在于每个进程中,互不影响
  • 多线程中,所有变量都是共享的,所以任何一个变量都可以被任何一个线程修改

下面是一个多线程同时操作导致变量数据混乱的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# multithread
import time, threading

# 假定这是你的银行存款
balance = 0

def change_it(n):
# 先存后取,结果应该为0
global balance # 定义一个共享变量balance
balance = balance + n
balance = balance - n

def run_thread(n):
for i in range(10000000):
change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance) # 理论上,因为先存后取,所以不管循环多少次,balance都应该为0

但是,由于线程的调度是由操作系统决定的,当t1,t2交替执行,balance就有出错的可能

因为,高级语言的一条语句,在CPU执行时,实际上是若干条语句.比如:

1
balance = balance + n

在实际执行时,会分两步

1
2
x = balance + n # 存入临时变量x
balance = x # 把临时变量的值赋给balance

t1,t2交替执行,临时变量x在两个线程中单独存在,可能会出现这样的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 初始值 balance = 0

t1: x1 = balance + 5 # x1 = 0 + 5 = 5

t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2 # balance = 8

t1: balance = x1 # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1 # balance = 0

t2: x2 = balance - 8 # x2 = 0 - 8 = -8
t2: balance = x2 # balance = -8

# 结果 balance = -8

究其原因,是因为修改balance需要多条语句,而执行这几条语句时,线程可能中断,从而导致把同一个对象的内容改乱.

那么要确保一个线程修改balance的时候,别的线程一定不能改,此时就需要锁机制.

锁只有一个,无论有多少线程,同一时刻最多只能由一个线程持有锁.从而避免并发冲突.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading

balance = 0
lock = threading.Lock()

def run_thread(n):
for i in range(100000):
# 先要获取锁:
lock.acquire()
try:
# 放心地改吧:
change_it(n)
finally:
# 改完了一定要释放锁:
lock.release()

当多个线程同时执行lock.acquire()时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就要继续等待知道获得锁为止.使用try...finally来保证锁的释放.

  • 所以锁虽然可以保证一个线程从头到尾地完整执行,但是也会导致其他线程一直处于等待状态,没办法实现真正并发,大大降低效率.

  • 另外,可以存在多个锁,不同的线程持有不同的锁,并试图获取对方的锁时,可能会导致死锁,多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止.

python的多线程

按理来说,多核CPU可以同时跑多个线程,假如创建n个死循环线程,那么n个核就会被100%跑满.C, C++, Java都是如此.但是如果用python来实现时:

1
2
3
4
5
6
7
8
9
10
import threading, multiprocessing

def loop():
x = 0
while True:
x = x ^ 1

for i in range(multiprocessing.cpu_count()): # 启动与CPU核心数量相同的n个线程
t = threading.Thread(target=loop)
t.start()

你会发现,cpu占用只去到102%,也就是仅跑满一个核.

因为python解释器CPython在设计时,引入了GIL(Global Interpreter Lock)全局锁: 任何python线程执行前,必须先获得GIL锁,每执行100条字节码,解释器就会自动释放GIL锁,这才轮到别的进程.有了GIL的存在,python便无法真正地实现多线程并发.

目前(2024),GIL还无法移除,因此如果要使用python实现并发提升效率,目前主要的解决方案是:

  • 使用多进程编程
  • 使用asyncio异步
  • 使用C扩展在关键性能代码中释放GIL

ThreadLocal

多线程下,每个线程都有自己的数据.一个线程使用自己的局部变量比使用全局变量好.局部变量只有线程自己能用,全局变量要加锁.

ThreadLocal是一个特殊的对象,它能够为每个线程存储独立的数据副本.这意味着,即使多个线程同时访问同一个ThreadLocal对象,每个线程都只能看到和操作自己的数据,不会影响其他线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading
from threading import local

# 创建 ThreadLocal 对象
thread_local = local()

def worker():
# 每个线程设置自己的数据
thread_local.value = threading.current_thread().name
print(f'线程 {threading.current_theead().name} 设置的值: {thread_local.value}')

# 创建两个线程
t1 = threading.Thread(target=worker, name='Thread-1')
t2 = threading.Thread(target=worker, name='Thread-2')

t1.start()
t2.start()
t1.join()
t2.join()

主要用途包括:

  1. 线程隔离: 每个线程都有自己的数据副本,避免了线程间的数据竞争.
  2. 全局变量替代: 可以避免使用全局变量来传递线程相关数据.
  3. 常见应用场景:
    1. 数据库连接管理: 每个线程维护自己的数据库连接
    2. 用户身份信息: 在web应用重保存当前请求用户的信息
    3. 事务管理: 维护线程本地的事务状态

ThreadLocal在内存中的存储机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 简化的内部实现原理
class ThreadLocal:
def __init__(self):
# meige ThreadLocal 对象都有一个字典
# key 是线程ID, value是该线程的数据
self._storage = {}

def get(self):
thread_id = current_thread_id()
return self._storage.get(thread_id)

def set(self, value):
thread_id = current_thread_id()
self._storage[thread_id] = value

存储的数据结构实际上类似这样:

1
2
3
4
5
{
'Thread-1': {'id': 0, 'name': 'User-0'},
'Thread-2': {'id': 1, 'name': 'User-1'},
'Thread-3': {'id': 2, 'name': 'User-0'},
}

进程 VS 线程

https://liaoxuefeng.com/books/python/process-thread/process-vs-thread/index.html

nginx是多进程+异步IO的Web服务器.

在多进程和多线程中应该优选多进程.因为进程更稳定,多线程下某个线程出问题很有可能导致整个进程都出问题.而且多进程可以分布到多台服务器上,而多进程只能同一台机器上的多个CPU.

分布式进程

python的multiprocessing的子模块managers支持把多进程分布到多台机器上.一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络来通信.

比如之前我们写了一个用Queue通信的多进程程序,现在由于任务加重,希望把发送任务的进程和处理任务的进程分布到两台机器上.

通过managers模块把Queue通过网络暴露出去,就可以让其他机器访问Queue了.

在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是在分布式进程环境中,添加任务到Queue不可以直接对原始的task_queue进行操作,这样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加.

先在一台机器上启动任务进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# task_master.py

import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列
task_queue = queue.Queue()
# 接收结果的队列
result_queue = queue.Queue()

# 从BaseManager继承的QueueManager
class QueueManager(BaseManager):
pass

# 定义两个函数,用于获取任务队列和结果队列
def get_task_queue():
return task_queue
def get_result_queue():
return result_queue

# 注册这两个函数
QueueManager.register('get_task_queue', callable= get_task_queue)
QueueManager.register('get_result_queue',callable= get_result_queue)

# 绑定端口5000, 设置验证码'abc'
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue
manager.start()
# 通过网络访问Queue对象
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去
for i in range(10):
n = random.randint(0, 10000)
print(f'Put task {n}...')
task.put(n)
# 从result读取任务
print('try get results...')
for i in range(10):
r = result.get(timeout=10)
print(f'Result: {r}')
# 关闭
manager.shutdown()
print('master exit.')

在另一台机器上启动任务进程(本次测试在本机也行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# task_worker.py
import time, sys, queue
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager
class QueueManager(BaseManager):
pass

# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行task_master.py的机器
server_addr = '127.0.0.1'
print(f'Connect to Server {server_addr} ...')

# 端口和验证码注意保持与task_master.py设置的一致
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接
m.connect()
# 获取Queue的对象
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列
for i in range(10):
try:
n = task.get(timeout=1)
print(f'run task {n} * {n}...')
r = f'{n} * {n} = {n*n}'
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task queue is empty.')
# 处理结果
print('worker exit.')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

┌─────────────────────────────────────────┐ ┌──────────────────────────────────────┐
│task_master.py │ │ │task_worker.py
│ │ │ │
│ task = manager.get_task_queue() │ │ │ task = manager.get_task_queue() │
│ result = manager.get_result_queue() │ │ result = manager.get_result_queue() │
│ │ │ │ │ │ │
│ │ │ │ │ │
│ ▼ │ │ │ │ │
│ ┌─────────────────────────────────┐ │ │ │ │
│ │QueueManager │ │ │ │ │ │
│ │ ┌────────────┐ ┌──────────────┐ │ │ │ │ │
│ │ │ task_queue │ │ result_queue │ │◀───┼──┼──┼──────────────┘ │
│ │ └────────────┘ └──────────────┘ │ │ │ │
│ └─────────────────────────────────┘ │ │ │ │
└─────────────────────────────────────────┘ └──────────────────────────────────────┘


Network
  1. Queue对象存储在task_master.py中
  2. Queue可以通过网络访问是通过QueueManager实现的.由于QueueManager管理的不止一个Queue,所以要给每个Queue的网络调用接口起个名字,比如get_task_queue

python笔记-进程和线程
http://example.com/2024/08/28/python-process-thread/
作者
Peter Pan
发布于
2024年8月28日
许可协议