python笔记-异步IO

CPU速度远远快于磁盘,网络等IO.在一个线程中,cpu执行代码的速度极快,但一旦遇到IO操作,比如读写文件,发送网络数据时,就需要等待IO操作完成,才能继续进行下一步.这种情况称为同步IO.此时一个IO就阻塞了当前线程,导致代码无法继续,所以我们需要多线程或者多进程,为多个用户服务.每个用户分配一个线程,如果遇到IO导致线程被挂起,起码其他用户不会受影响.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 真正意义上的 异步IO 是说内核直接将数据拷贝至用户态的内存单元,再通知程序直接去读取数据。
# select / poll / epoll 都是同步IO的多路复用模式

# 1.同步和异步
# 同步和异步关注的是消息通信机制
# 所谓同步,就是在发出一个*调用*时,没得到结果之前,该*调用*就不返回。但是一旦调用返回就得到返回值了,*调用者*主动等待这个*调用*的结果
# 所谓异步,就是在发出一个*调用*时,这个*调用*就直接返回了,不管返回有没有结果。当一个异步过程调用发出后,*被调用者*通过状态,通知来通知*调用者*,或者通过回调函数处理这个调用

# 2.阻塞和非阻塞
# 阻塞和非阻塞关注的是程序在等待调用结果时的状态
# 阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才返回
# 非阻塞调用是指在不能立即得到结果之前,该调用不会阻塞当前线程

# 网络上的例子
#老张爱喝茶,废话不说,煮开水。
#出场人物:老张,水壶两把(普通水壶,简称水壶;会响的水壶,简称响水壶)。
#1 老张把水壶放到火上,立等水开。(同步阻塞);立等就是阻塞了老张去干别的事,老张得一直主动的看着水开没,这就是同步
#2 老张把水壶放到火上,去客厅看电视,时不时去厨房看看水开没有。(同步非阻塞);老张去看电视了,这就是非阻塞了,但是老张还是得关注着水开没,这也就是同步了
#3 老张把响水壶放到火上,立等水开。(异步阻塞);立等就是阻塞了老张去干别的事,但是老张不用时刻关注水开没,因为水开了,响水壶会提醒他,这就是异步了
#4 老张把响水壶放到火上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶。(异步非阻塞);老张去看电视了,这就是非阻塞了,而且,等水开了,响水壶会提醒他,这就是异步了
#所谓同步异步,只是对于水壶而言。普通水壶,同步;响水壶,异步。对应的也就是消息通信机制
#虽然都能干活,但响水壶可以在自己完工之后,提示老张水开了。这是普通水壶所不能及的。同步只能让调用者去轮询自己(情况2中),造成老张效率的低下。
#所谓阻塞非阻塞,仅仅对于老张而言。立等的老张,阻塞;对应的也就是程序等待结果时的状态
#看电视的老张,非阻塞。
#情况1和情况3中老张就是阻塞的,媳妇喊他都不知道。虽然3中响水壶是异步的,可对于立等的老张没有太大的意义。所以一般异步是配合非阻塞使用的,这样才能发挥异步的效用。

异步IO

多进程/多线程虽然可以解决并发问题,但系统不能无上限地增加线程.由于系统切换线程的开销也很大,所以一旦线程数量过多,cpu的时间就花在线程切换上,真正运行代码的时间就减少了,性能就会下降.我们要解决的核心问题是CPU和IO设备处理速度的严重不匹配.多进程/多线程只是解决这个问题的一个办法,而另一个办法就是异步IO.

异步IO:

当代码需要执行一个耗时的IO操作时,它只发出IO指令,并不等待IO结果,然后就直接执行其他代码.一段时间后,当IO返回结果时,再通知CPU进行处理.

异步IO模型需要一个消息循环,在这个循环中,主线程不断地重复”读取消息-处理消息”这个过程:

1
2
3
4
loop = get_event_loop()
while True:
event = loop.get_event()
process_event(event)

消息模型其实早在应用在桌面应用程序中了。一个GUI程序的主线程就负责不停地读取消息并处理消息。所有的键盘、鼠标等消息都被发送到GUI程序的消息队列中,然后由GUI程序的主线程处理。

由于GUI线程处理键盘、鼠标等消息的速度非常快,所以用户感觉不到延迟。某些时候,GUI线程在一个消息处理的过程中遇到问题导致一次消息处理时间过长,此时,用户会感觉到整个GUI程序停止响应了,敲键盘、点鼠标都没有反应。这种情况说明在消息模型中,处理一个消息必须非常迅速,否则,主线程将无法及时处理消息队列中的其他消息,导致程序看上去停止响应。

消息模型是如何解决同步IO必须等待IO操作这一问题的呢?当遇到IO操作时,代码只负责发出IO请求,不等待IO结果,然后直接结束本轮消息处理,进入下一轮消息处理过程。当IO操作完成后,将收到一条“IO完成”的消息,处理该消息时就可以直接获取IO操作结果。

在“发出IO请求”到收到“IO完成”的这段时间里,同步IO模型下,主线程只能挂起,但异步IO模型下,主线程并没有休息,而是在消息循环中继续处理其他消息。这样,在异步IO模型下,一个线程就可以同时处理多个IO请求,并且没有切换线程的操作。对于大多数IO密集型的应用程序,使用异步IO将大大提升系统的多任务处理能力。

协程

又称微线程/纤程.英文名Coroutine

子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B又调用C,C执行完毕返回,B执行完毕返回,最后才是A执行完毕.

所以子程序调用是通过栈实现的,一个线程就是执行一个子程序.

子程序调用总是一个入口,一次返回,调用顺序是明确的.

但协程与子程序不同.携程看上去也是子程序,但是在执行过程中,子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行.这一点有点类似CPU的中断.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 比如子程序A和B
def A():
print('1')
print('2')
print('3')

def B():
print('x')
print('y')
print('z')

# 假如由携程执行,结果可能就是
1
2
x
y
3
z

协程的特点在于,它是一个线程执行,与多线程相比优势有:

  • 极高的执行效率: 子程序切换不是线程切换,而是由程序自身控制.因此没有切换的开销.与多线程相比,线程越多,协程的性能优势越明显.
  • 不需要线程的锁机制: 因为协程只有一个线程,在协程中控制共享资源不加锁,只需要判断状态,所以执行效率比多线程高.

由于协程是一个线程执行,要充分利用多核CPU,最简单的方法就是多线程+协程.

底层实现机制:

  1. python协程的本质是基于生成器(generator)实现的状态机
  2. 通过字节码操作来实现协程的挂起和恢复
  3. 事件循环负责调度和切换不同的协程

与操作系统的关系:

  1. 协程是用户态的轻量级线程
  2. 切换由程序自身控制,不需要系统介入
  3. 没有线程上下文切换的开销,无锁

来看一个例子:

传统生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就会死锁.

如果改用协程:

  1. 生产者生产一条消息后通过yield让出CPU,切换到消费者
  2. 消费者开始处理消息,但生产者并不需要等待消费者处理完
  3. 消费者可以再处理过程中随时通过yield让出CPU,切换回生产者继续生产新消息
  4. 两者可以交替执行,互不阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def consumer():
r = '' # 初始化返回值
while True:
n = yield r # 核心语句: 让consumer函数变成一个生成器,接受发送过来的值,赋值给n,同时产出r
if not n: # n=None,if not n--> True, 表示想consumer 生成器不发送东西的话就会直接返回
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK' # 设置返回值

def produce(c):
c.send(None) # 启动生成器,发送None
n = 0
while n < 5: # 生产5次
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n) # 发送生产的值,会从yield语句继续执行,yield获取新的n并返回新的r
print('[PRODUCER] Consumer return: %s' % r)
c.close()

c = consumer()
produce(c)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK

这里最难理解的就是n = yield r

这一行的执行顺序是:

  • 先返回”返回值r”给调用者
  • 然后暂停在这里
  • 等待下一次send()时,把send的值赋给n

所以上面consumer的代码执行顺序大概这样:

  1. 执行到yield行,先返回”返回值(r)”,然后暂停在这里,等待下一个send
  2. 下一个send到来,把send的值赋给n,然后继续执行下面的代码,再次执行到yield行,再一次返回返回值(r),再一次暂停
  3. 等待再下一个send继续循环

理论对比

先来理一下协程,异步IO和asyncio三者:

  1. 协程(Coroutine)

    • 是一种编程概念,是实现异步编程的基础

    • 可以理解为能暂停的函数: 在等待IO时可以主动让出控制权

    • 使用async/await语法来定义

      1
      2
      3
      async def fetch_data():
      await asyncio.sleep(1) # 暂停,让出控制权
      return data
  2. 异步IO(Asynchronous IO)

    • 是一种IO操作模型

    • 发起IO请求后不等待结果,继续执行其他任务

    • 当IO结果就绪时再回来处理

      1
      2
      3
      4
      5
      # 例如读文件
      async def read_file():
      async with aiofiles.open('file.txt') as f:
      content = await f.read() # 不阻塞等待
      return content
  3. asyncio

    • python的异步编程框架

    • 提供运行和管理协程的完整工具集

    • 包含时间循环,任务调度,IO操作等功能

      1
      2
      3
      4
      5
      6
      7
      8
      9
      # 例如
      async def main():
      task1 = asyncio.create_task(fetch_data())
      task2 = asyncio.create_task(fread_file())
      # 并发执行多个协程 (虽然是并发,但实际上还是单线程在工作)
      results = await asyncio.gather(task1, task2)

      # 启动时间循环运行协程
      asyncio.run(main())
    • asyncio 是框架
    • 协程是核心机制
    • 异步IO是主要应用场景

    优点:

    1. 一个线程可以同时处理多个IO操作
    2. IO等待时间被利用起来做其他工作
    3. 避免多线程的复杂性

asyncio

简介

asyncio是python的异步编程框架,用于编写并发代码.它使用协程,事件循环和任务实现异步编程.

核心概念

  • 协程(Coroutine): 可以暂停执行的函数
  • 事件循环(Event Loop): 程序调度和执行异步代码的核心
  • 任务(Task): 协程的包装器,用于跟踪协程的执行
  • Future: 表示异步操作的最终结果
  • await: 等待一个协程完成

关键字和语法

基础关键字

  • async def: 定义一个协程函数
  • async with: 异步上下文管理器,相当于with的异步版本
  • await: 等待一个协程执行完成
  • asyncio.run(): 运行异步程序的入口点
  • asyncio.create_task(): 创建一个任务
  • asyncio.gather(): 并发运行多个协程
  • asyncio.sleep(n): 当前协程暂停执行n秒,不会阻塞事件循环,其他协程正常运行.

常用API

1
2
3
4
5
6
7
8
9
10
# 创建事件循环
loop = asyncio.get_event_loop()
# 运行协程
asyncio.run(main())
# 创建任务
task = asyncio.create_task(coro())
# 等待多个协程完成
await asyncio.gather(coro1(), coro2())
# 延时
await asyncio.sleep(1)

异步爬虫例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import asyncio # python标准库,提供异步编程的基础设施
import aiohttp # 第三方库,提供异步的http客户端和服务端
from bs4 import BeautifulSoup # 用于解析html和xml

# 这个函数使用aiohttp发送一个异步GET请求到指定的url
async def fetch_html(session, url): # async def 定义一个异步函数
async with session.get(url) as response: # session.get()发送一个GET请求到指定url并返回一个ClientResponse对象
# async with用于正确管理异步上下文,确保在请求完成后正确释放资源
if response.statue == 200:
return await response.text() # 获取响应的文本内容(HTML)
else:
return None

# 这里就是使用beautiful soup来解析html
"""
此函数其实没有执行任何异步操作,所以它也可以定义为一个普通的同步函数,但是把它也定义为异步函数也是没问题的,主要是为了:
1. 保持一致性
2. 未来扩展考虑, 可能未来需要在这个函数内加上异步操作
不过从性能角度考虑,如果一个函数内没有任何异步操作,最好还是直接定义为同步函数,不然会增加开销
"""
async def extract_links(html):
soup = Beautifulsoup(html, 'html.parser')
links = [a['href'] for a in soup.find_all('a', href=True)]
return links

# 递归爬取网页
async def crawl(url, max_depth=2, depth=0):
async with aiohttp.ClientSession() as session: # 创建一个ClientSession对象,用于发送HTTP请求
html = await fetch_html(session, url) # 调用fetch_html函数获取html
if html:
print(f"Crawling: {url}") # 如果获取到html就打印正在爬取的url
links = await extract_link(html) # 并调用extract_link提取连接
if depth < max_depth: # 递归调用,爬取depth深度的网页
for link in links:
await crawl(link, max_depth, depth+1)
# 主函数
async def main():
start_url = "https://example.com"
await crawl(start_url)

if __name__ == "__main__":
asyncio.run(main()) # 创建事件循环,并在该循环中运行main()

再来看一下关键字的用法:

  • async def:

    此关键字用于定义一个异步函数,需要在以下情况使用async def:

    1. 函数内部需要使用await等待一个耗时的异步操作完成,比如网络请求,文件IO等
    2. 函数需要调用另一个异步函数
  • await:

    此关键字只能在async def定义个异步函数内使用.作用是暂停当前异步函数的执行,等待一个耗时的异步操作完成,然后继续执行.一般在以下情况下使用:

    1. 发送网络请求,等待响应返回
    2. 读写文件,等待IO完成
    3. 调用另一个异步函数,等待它执行完毕
  • async with:

    当你使用了一个实现了__aenter____aexit__方法的异步上下文管理器对象,那么你应该使用async with来获取和释放该对象管理的资源.一些常见的异步上下文管理器包括但不限于:

    1. aiohttp.ClientSession
    2. asyncpg.Connection
    3. aiomysql.Connection
    4. aiofiles.open
  • asyncio.sleep():

    这是一个异步函数,可以用于模拟一个耗时的异步操作,让事件循环有机会切换到其他任务.在爬虫中如果你需要控制爬取的速度,避免给网站过高的负载,可以在每次爬取后使用await asyncio.sleep(delay)来暂停一段时间.

aiohttp

asyncio可以实现单线程并发IO.但是这是仅在客户端.如果把asyncio用来服务端,例如web服务器,http连接本身就是IO操作,所以可以通过单线程+async函数实现多用户的高并发操作.

asyncio实现了TCP,UDP,SSL等协议,而aiohttp则是基于asyncio实现的HTTP框架.

aiohttp是第三方库所以要用pip安装.

下面用aiohttp写一个http服务器,处理以下URL:

  • /: 首页返回index.html
  • /{name}: 根据URL 返回hello, {name}!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from aiohttp import web

async def index(request):
text = "<h1>Index Page</h1>"
return web.Response(text=text, content_type='text/html')

async def hello(request):
name = request.match_info.get('name', 'World')
text = f'<h1>Hello, {name}</h1>'
return web.Response(text=text, content_type='text/html')

app = web.Application()

# 添加路由
app.add_routes([web.get("/", index), web.get("/{name}", hello)])

if __name__ == '__main__':
web.run_app(app)

python笔记-异步IO
http://example.com/2024/10/29/python-asyncio/
作者
Peter Pan
发布于
2024年10月29日
许可协议