概念 通俗点地理解:
对于操作系统来说,一个任务就是一个进程(process),比如打开一个浏览器就是启动一个浏览器进程,打开两个记事本就是打开两个记事本进程.
有些进程它同时不止干一件事,比如word,它同时可以打字,拼写检查,打印等.在一个进程内部可以同时运行多个”子任务”,则称之为线程(Thread).
由于每个进程至少要干一件事,所以每个进程至少要有一个线程.
进程(process) 进程是计算机中正在运行的程序的实例
它是操作系统分配资源的基本单位
所以每个进程都有自己的内存空间,系统资源和状态信息 线程(thread) 线程是进程内的执行单元
它是cpu调度和分配的基本单位
同一进程内多个线程共享该进程的资源 区别 资源占用 开销 通信 安全 并行 进程 每个进程独立 创建和切换开销大 进程间通信(IPC)比较复杂 进程之间不会相互影响 在多核处理器上实现真正并行 线程 每个线程共享进程的资源 创建和切换开销小 线程间通信比较简单,直接读写进程中的数据 一个线程的奔溃可能导致整个进程奔溃 在单核处理器上是并发执行,在多核上可以并行
并发与并行(concurrency & parallelism) 进程在多核处理器上实现真正并行: 多个进程在多核处理器上同时运行,每个核心执行一个进程,实现真正并行.操作系统将不同进程分配到不同的cpu核心上,这是在硬件级别实现的并行. 线程在单核处理器上是并发执行: 多线程通过时间片轮转(time-slicing)实现的并发,线程上下文会不停地,给人一种同时执行的错觉. 这种称为并发(concurrency),并不是真正的并行. 线程在多核上可以并行: 类似多进程,不同的线程分配到不同的核心上同时执行. 这种称为并行(parallelism) 那么,多任务的执行就有三种模型:
通常,同时执行多个任务时,各个任务之间并不是完全没有关联,通常都需要互相通信和协调,有时,任务1需要等待任务2完成后才能继续执行,有时任务3和4又不能同时执行,所以多进程和多线程程序的复杂度是很高的.
多进程 fork unix/linux 提供了一个fork()
调用. 它调用一次,返回两次.因为操作系统会自动把当前进程(父进程)复制一份(子进程),然后分别在父子进程内返回.在fork()
中:
python的os
模块有fork调用:
1 2 3 4 5 6 7 8 9 10 import osprint (f'process {os.getpid()} start...' ) pid = os.fork() if pid == 0 : print (f'I am child process {os.getpid()} , my pranent is {os.getppid()} ' )else : print (f'I {os.getpid} just create a child process {pid} .' )
输出如下:
1 2 3 Process (876) start... I (876) just created a child process (877). I am child process (877) and my parent is 876.
因为父进程需要记住每个子进程的pid以便后续:
等待子进程结束(wait) 向子进程发送信号(kill) 监控子进程状态 而子进程可以通过getppid()
获取父进程的pid,所以不需要在fork()时候特别记录
multiprocessing windows没有fork
调用,如果要在windows上使用python编写多进程程序.multiprocessing
就是一个跨平台版本的多进程模块.
1 2 3 4 5 6 7 8 9 10 11 12 13 from multiprocessing import Process import osdef run_proc (name ): print (f'Run child process {name} ({os.getpid()} )...' ) if __name__ == '__main__' : print (f'Parent process {os.getpid()} .' ) p = Process(traget=run_proc, args=('test' ,)) print ('Child process will start.' ) p.start() p.join() print ('Child process end.' )
Pool 需要大量创建子进程时,可以通过进程池的方式批量创建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from multiprocessing import Poolimport os, time, randomdef long_time_task (name ): print (f'Run task {name} ({os.getpid()} )' ) strat = time.time() time.sleep(random.random() * 3 ) end = time.time() print (f'Task {name} runs {(end - start)} seconds.' ) if __name__ == '__main__' : print (f'Parent process {os.getpid()} ' ) p = Pool(4 ) for i in range (5 ): p.apply_async(long_time_task, arg=(i,)) print ('Waitting for all subprocesses done...' ) p.close() p.join() print ('All subprocesses done.' )
1 2 3 4 5 6 7 8 9 10 11 12 13 Parent process 669. Waiting for all subprocesses done... Run task 0 (671)... Run task 1 (672)... Run task 2 (673)... Run task 3 (674)... # 这里服务器以4核为例,Pool(4)同时执行4个进程,所以task0-3同时执行 Task 2 runs 0.14 seconds. Run task 4 (673)... # task 4要等前面某个task完成后才能执行,如果你的服务器是8核,则要写Pool(9),才能看到等待效果 Task 1 runs 0.27 seconds. Task 3 runs 0.86 seconds. Task 0 runs 1.41 seconds. Task 4 runs 1.91 seconds. All subprocesses done.
subprocess 很多时候子进程是一个外部程序.我们创建子进程后还要控制子进程的输入输出.subprocess
模块可以很方便地做到.
1 2 3 4 5 import subprocessprint ('run nslookup www.python.org' ) r = subprocess.call(['nslookup' , 'www.python.org' ])print ('Exit code:' , r)
1 2 3 4 5 6 7 8 9 10 run nslookup www.python.org Server: 192.168.19.4 Address: 192.168.19.4#53 Non-authoritative answer: www.python.org canonical name = python.map.fastly.net. Name: python.map.fastly.net Address: 199.27.79.223 Exit code: 0
如果子进程需要输入,则可以通过communicate()
方法
1 2 3 4 5 6 7 import subprocessprint ('$ nslookup' ) p = subprocess.Popen(['nslookup' ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate(b'set q=mx\npython.org\nexit\n' )print (output.decode('utf-8' ))print ('Exit code:' , p.returncode)
实际实现的就是这种效果:
1 2 3 4 5 6 7 8 9 10 11 $ nslookup > set q=mx> python.org Server: 127.0.0.53 Address: 127.0.0.53#53 Non-authoritative answer: python.org mail exchanger = 50 mail.python.org. Authoritative answers can be found from:> exit
进程间通信(IPC) python的multiprocessing
模块包装了底层的机制,提供queue
, pipes
等多种方式交换数据.
特性 Queue Pipes 进程数量 支持多进程 仅支两个进程 通信方向 单向(FIFO) 双向 使用场景 多生产者/消费者(实际开发中如果不需要pipes的特殊功能,建议使用Queue,因为它更安全更易用) 点对点通信 安全性 进程安全(并发时不会出现数据混乱,因为有锁机制) 自行处理同步 性能 较慢(有锁机制) 较快
更多通讯方式:
通信方式 优点 缺点 适用场景 Queue 使用简单,进程安全 相对较慢 多生产者/消费者 Pipe 速度快,双向通讯 仅支持两个进程 两进程间频繁通信 共享内存 最快的IPC方式 需要自行处理同步 大量数据共享 Manager 支持复杂的python对象 慢 需要共享复杂数据结构 Value/Array 简单高效 仅支持基本数据类型 共享简单数据 文件系统 简单,持久化 最慢,需要IO操作 少量数据,需要持久化
下面以Queue
为例,在父进程中创建两个子进程,一个往Queue写数据,一个往Queue读数据.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from multiprocessing import Process, Queueimport os, time, randomdef write (q ): print (f'Process to write: {os.getpid()} ' ) for value in ['A' , 'B' , 'C' ]: print (f'Put {value} to queue...' ) q.put(value) time.sleep(random.random()) def read (q ): print (f'Process to read {os.getpid()} ' ) while True : value = q.get(True ) print (f'Get {value} from queue' ) if __name__=='__main__' : q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pr.start() pw.join() pr.terminate()
父进程和子进程间的通讯必须通过pickle序列化来传递数据.也就是说传入的args必须是可序列化的对象.比如:
基本数据类型(数字, 字符串, 布尔值) 列表,字典,元组等内置容器 类实例(该类需要可实例化) 以下类型则不能被实例化:
函数对象 线程对象 文件句柄 数据库连接 Socket连接 当你在windows系统下使用multiprocess遇到问题,首先检查是否涉及不能序列化的对象.
多线程 threading 一个进程至少有一个线程,线程是操作系统直接支持的执行单元.python的线程是真正的Posix Thread,不是模拟出来的线程.
python的标准库有两个模块: _thread
和threading
.
threading
是_thread
的高级封装,一般我们只要使用threading
就行.
任何进程
都会默认启动一个线程,称为主线程
,主线程又可以启动新的线程.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import time, threadingdef loop (): thread_name = threading.current_thread().name print (f'thread {thread_name} is running...' ) n = 0 while n < 5 : n = n + 1 print (f'thread {thread_name} >>> {n} ' ) time.sleep(1 ) print (f'thread {thread_name} ended' ) print (f'thread {threading.current_thread().name} is running' ) t = threading.Thread(target=loop, name='LoopThread' ) t.start() t.join()print (f'thread {threading.current_thread().name} ended.' )
1 2 3 4 5 6 7 8 9 thread MainThread is running... thread LoopThread is running... thread LoopThread >>> 1 thread LoopThread >>> 2 thread LoopThread >>> 3 thread LoopThread >>> 4 thread LoopThread >>> 5 thread LoopThread ended. thread MainThread ended.
Lock 多进程中,同一个变量各自有一份拷贝存在于每个进程中,互不影响 多线程中,所有变量都是共享的,所以任何一个变量都可以被任何一个线程修改 下面是一个多线程同时操作导致变量数据混乱的例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import time, threading balance = 0 def change_it (n ): global balance balance = balance + n balance = balance - n def run_thread (n ): for i in range (10000000 ): change_it(n) t1 = threading.Thread(target=run_thread, args=(5 ,)) t2 = threading.Thread(target=run_thread, args=(8 ,)) t1.start() t2.start() t1.join() t2.join()print (balance)
但是,由于线程的调度是由操作系统决定的,当t1
,t2
交替执行,balance就有出错的可能
因为,高级语言的一条语句,在CPU执行时,实际上是若干条语句.比如:
在实际执行时,会分两步
1 2 x = balance + n # 存入临时变量x balance = x # 把临时变量的值赋给balance
t1,t2交替执行,临时变量x
在两个线程中单独存在,可能会出现这样的结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 # 初始值 balance = 0 t1: x1 = balance + 5 # x1 = 0 + 5 = 5 t2: x2 = balance + 8 # x2 = 0 + 8 = 8 t2: balance = x2 # balance = 8 t1: balance = x1 # balance = 5 t1: x1 = balance - 5 # x1 = 5 - 5 = 0 t1: balance = x1 # balance = 0 t2: x2 = balance - 8 # x2 = 0 - 8 = -8 t2: balance = x2 # balance = -8# 结果 balance = -8
究其原因,是因为修改balance
需要多条语句,而执行这几条语句时,线程可能中断,从而导致把同一个对象的内容改乱.
那么要确保一个线程修改balance
的时候,别的线程一定不能改,此时就需要锁机制
.
锁只有一个,无论有多少线程,同一时刻最多只能由一个线程持有锁.从而避免并发冲突.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import threading balance = 0 lock = threading.Lock()def run_thread (n ): for i in range (100000 ): lock.acquire() try : change_it(n) finally : lock.release()
当多个线程同时执行lock.acquire()
时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就要继续等待知道获得锁为止.使用try...finally
来保证锁的释放.
所以锁虽然可以保证一个线程从头到尾地完整执行,但是也会导致其他线程一直处于等待状态,没办法实现真正并发,大大降低效率.
另外,可以存在多个锁,不同的线程持有不同的锁,并试图获取对方的锁时,可能会导致死锁
,多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止.
python的多线程 按理来说,多核CPU可以同时跑多个线程,假如创建n个死循环线程,那么n个核就会被100%跑满.C, C++, Java都是如此.但是如果用python来实现时:
1 2 3 4 5 6 7 8 9 10 import threading, multiprocessingdef loop (): x = 0 while True : x = x ^ 1 for i in range (multiprocessing.cpu_count()): t = threading.Thread(target=loop) t.start()
你会发现,cpu占用只去到102%,也就是仅跑满一个核.
因为python解释器CPython在设计时,引入了GIL(Global Interpreter Lock)全局锁: 任何python线程执行前,必须先获得GIL锁,每执行100条字节码,解释器就会自动释放GIL锁,这才轮到别的进程.有了GIL的存在,python便无法真正地实现多线程并发.
目前(2024),GIL还无法移除,因此如果要使用python实现并发提升效率,目前主要的解决方案是:
使用多进程编程 使用asyncio异步 使用C扩展在关键性能代码中释放GIL ThreadLocal 多线程下,每个线程都有自己的数据.一个线程使用自己的局部变量比使用全局变量好.局部变量只有线程自己能用,全局变量要加锁.
ThreadLocal
是一个特殊的对象,它能够为每个线程存储独立的数据副本.这意味着,即使多个线程同时访问同一个ThreadLocal
对象,每个线程都只能看到和操作自己的数据,不会影响其他线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import threadingfrom threading import local thread_local = local()def worker (): thread_local.value = threading.current_thread().name print (f'线程 {threading.current_theead().name} 设置的值: {thread_local.value} ' ) t1 = threading.Thread(target=worker, name='Thread-1' ) t2 = threading.Thread(target=worker, name='Thread-2' ) t1.start() t2.start() t1.join() t2.join()
主要用途包括:
线程隔离: 每个线程都有自己的数据副本,避免了线程间的数据竞争. 全局变量替代: 可以避免使用全局变量来传递线程相关数据. 常见应用场景:数据库连接管理: 每个线程维护自己的数据库连接 用户身份信息: 在web应用重保存当前请求用户的信息 事务管理: 维护线程本地的事务状态 ThreadLocal
在内存中的存储机制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class ThreadLocal : def __init__ (self ): self ._storage = {} def get (self ): thread_id = current_thread_id() return self ._storage.get(thread_id) def set (self, value ): thread_id = current_thread_id() self ._storage[thread_id] = value
存储的数据结构实际上类似这样:
1 2 3 4 5 { 'Thread-1' : {'id' : 0 , 'name' : 'User-0' }, 'Thread-2' : {'id' : 1 , 'name' : 'User-1' }, 'Thread-3' : {'id' : 2 , 'name' : 'User-0' }, }
进程 VS 线程 https://liaoxuefeng.com/books/python/process-thread/process-vs-thread/index.html
nginx是多进程+异步IO的Web服务器.
在多进程和多线程中应该优选多进程.因为进程更稳定,多线程下某个线程出问题很有可能导致整个进程都出问题.而且多进程可以分布到多台服务器上,而多进程只能同一台机器上的多个CPU.
分布式进程 python的multiprocessing
的子模块managers
支持把多进程分布到多台机器上.一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络来通信.
比如之前我们写了一个用Queue
通信的多进程程序,现在由于任务加重,希望把发送任务的进程和处理任务的进程分布到两台机器上.
通过managers
模块把Queue
通过网络暴露出去,就可以让其他机器访问Queue
了.
在一台机器上写多进程程序时,创建的Queue
可以直接拿来用,但是在分布式进程环境中,添加任务到Queue不可以直接对原始的task_queue
进行操作,这样就绕过了QueueManager
的封装,必须通过manager.get_task_queue()
获得的Queue
接口添加.
先在一台机器上启动任务进程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import random, time, queuefrom multiprocessing.managers import BaseManager task_queue = queue.Queue() result_queue = queue.Queue()class QueueManager (BaseManager ): pass def get_task_queue (): return task_queuedef get_result_queue (): return result_queue QueueManager.register('get_task_queue' , callable = get_task_queue) QueueManager.register('get_result_queue' ,callable = get_result_queue) manager = QueueManager(address=('' , 5000 ), authkey=b'abc' ) manager.start() task = manager.get_task_queue() result = manager.get_result_queue()for i in range (10 ): n = random.randint(0 , 10000 ) print (f'Put task {n} ...' ) task.put(n)print ('try get results...' )for i in range (10 ): r = result.get(timeout=10 ) print (f'Result: {r} ' ) manager.shutdown()print ('master exit.' )
在另一台机器上启动任务进程(本次测试在本机也行)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import time, sys, queuefrom multiprocessing.managers import BaseManagerclass QueueManager (BaseManager ): pass QueueManager.register('get_task_queue' ) QueueManager.register('get_result_queue' ) server_addr = '127.0.0.1' print (f'Connect to Server {server_addr} ...' ) m = QueueManager(address=(server_addr, 5000 ), authkey=b'abc' ) m.connect() task = m.get_task_queue() result = m.get_result_queue()for i in range (10 ): try : n = task.get(timeout=1 ) print (f'run task {n} * {n} ...' ) r = f'{n} * {n} = {n*n} ' time.sleep(1 ) result.put(r) except Queue.Empty: print ('task queue is empty.' )print ('worker exit.' )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 │ ┌─────────────────────────────────────────┐ ┌──────────────────────────────────────┐ │task_master.py │ │ │task_worker.py │ │ │ │ │ │ task = manager.get_task_queue () │ │ │ task = manager.get_task_queue () │ │ result = manager.get_result_queue () │ │ result = manager.get_result_queue () │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ │ │ ┌─────────────────────────────────┐ │ │ │ │ │ │QueueManager │ │ │ │ │ │ │ │ ┌────────────┐ ┌──────────────┐ │ │ │ │ │ │ │ │ task_queue │ │ result_queue │ │◀───┼──┼──┼──────────────┘ │ │ │ └────────────┘ └──────────────┘ │ │ │ │ │ └─────────────────────────────────┘ │ │ │ │ └─────────────────────────────────────────┘ └──────────────────────────────────────┘ │ Network
Queue对象存储在task_master.py中 Queue可以通过网络访问是通过QueueManager实现的.由于QueueManager管理的不止一个Queue,所以要给每个Queue的网络调用接口起个名字,比如get_task_queue