本文为作者原创,转载请先与作者联系。同发于SegmentFault简书

引言

随着node.js的盛行,相信大家今年多多少少都听到了异步编程这个概念。Python社区虽然对于异步编程的支持相比其他语言稍显迟缓,但是也在Python3.4中加入了asyncio,在Python3.5上又提供了async/await语法层面的支持,刚正式发布的Python3.6中asynico也已经由临时版改为了稳定版。下面我们就基于Python3.4+来了解一下异步编程的概念以及asyncio的用法。


什么是协程

通常在Python中我们进行并发编程一般都是使用多线程或者多进程来实现的,对于计算型任务由于GIL的存在我们通常使用多进程来实现,而对与IO型任务我们可以通过线程调度来让线程在执行IO任务时让出GIL,从而实现表面上的并发。

其实对于IO型任务我们还有一种选择就是协程,协程是运行在单线程当中的“并发”,协程相比多线程一大优势就是省去了多线程之间的切换开销,获得了更大的运行效率。Python中的asyncio也是基于协程来进行实现的。在进入asyncio之前我们先来了解一下Python中怎么通过生成器进行协程来实现并发。

example1

我们先来看一个简单的例子来了解一下什么是协程(coroutine),对生成器不了解的朋友建议先看一下Stackoverflow上面的这篇高票回答

1
2
3
4
5
6
7
8
9
10
11
12
>>> def coroutine():
... reply = yield 'hello'
... yield reply
...
>>> c = coroutine()
>>> next(c)
'hello'
>>> c.send('world')
'world'

example2

下面这个程序我们要实现的功能就是模拟多个学生同时向一个老师提交作业,按照传统的话我们或许要采用多线程/多进程,但是这里我们可以采用生成器来实现协程用来模拟并发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from collections import deque
def student(name, homeworks):
for homework in homeworks.items():
yield (name, homework[0], homework[1]) # 学生"生成"作业给老师
class Teacher(object):
def __init__(self, students):
self.students = deque(students)
def handle(self):
"""老师处理学生作业"""
while len(self.students):
student = self.students.pop()
try:
homework = next(student)
print('handling', homework[0], homework[1], homework[2])
except StopIteration:
pass
else:
self.students.appendleft(student)

下面我们来调用一下这个程序。

1
2
3
4
5
Teacher([
student('Student1', {'math': '1+1=2', 'cs': 'operating system'}),
student('Student2', {'math': '2+2=4', 'cs': 'computer graphics'}),
student('Student3', {'math': '3+3=5', 'cs': 'compiler construction'})
]).handle()

这是输出结果,我们仅仅只用了一个简单的生成器就实现了并发(concurrence),注意不是并行(parallel),因为我们的程序仅仅是运行在一个单线程当中。

1
2
3
4
5
6
handling Student3 cs compiler construction
handling Student2 cs computer graphics
handling Student1 cs operating system
handling Student3 math 3+3=5
handling Student2 math 2+2=4
handling Student1 math 1+1=2


使用asyncio模块实现协程

从Python3.4开始asyncio模块加入到了标准库,通过asyncio我们可以轻松实现协程来完成异步IO操作。

解释一下下面这段代码,我们自己定义了一个协程display_date(num, loop),然后它使用关键字yield from来等待协程asyncio.sleep(2)的返回结果。而在这等待的2s之间它会让出CPU的执行权,直到asyncio.sleep(2)返回结果。gather()或者wait()来返回future的执行结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# coroutine.py
import asyncio
import datetime
@asyncio.coroutine # 声明一个协程
def display_date(num, loop):
end_time = loop.time() + 10.0
while True:
print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
if (loop.time() + 1.0) >= end_time:
break
yield from asyncio.sleep(2) # 阻塞直到协程sleep(2)返回结果
loop = asyncio.get_event_loop() # 获取一个event_loop
tasks = [display_date(1, loop), display_date(2, loop)]
loop.run_until_complete(asyncio.gather(*tasks)) # "阻塞"直到所有的tasks完成
loop.close()

下面是运行结果,注意到并发的效果没有,程序从开始到结束只用大约10s,而在这里我们并没有使用任何的多线程/多进程代码。在实际项目中你可以将asyncio.sleep(secends)替换成相应的IO任务,比如数据库/磁盘文件读写等操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
ziwenxie :: ~ » python coroutine.py
Loop: 1 Time: 2016-12-19 16:06:46.515329
Loop: 2 Time: 2016-12-19 16:06:46.515446
Loop: 1 Time: 2016-12-19 16:06:48.517613
Loop: 2 Time: 2016-12-19 16:06:48.517724
Loop: 1 Time: 2016-12-19 16:06:50.520005
Loop: 2 Time: 2016-12-19 16:06:50.520169
Loop: 1 Time: 2016-12-19 16:06:52.522452
Loop: 2 Time: 2016-12-19 16:06:52.522567
Loop: 1 Time: 2016-12-19 16:06:54.524889
Loop: 2 Time: 2016-12-19 16:06:54.525031
Loop: 1 Time: 2016-12-19 16:06:56.527713
Loop: 2 Time: 2016-12-19 16:06:56.528102

在Python3.5中为我们提供更直接的对协程的支持,引入了async/await关键字,上面的代码我们可以这样改写,使用async代替了@asyncio.coroutine,使用了await代替了yield from,这样我们的代码变得更加简洁可读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import datetime
async def display_date(num, loop): # 声明一个协程
end_time = loop.time() + 10.0
while True:
print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(2) # 等同于yield from
loop = asyncio.get_event_loop() # 获取一个event_loop
tasks = [display_date(1, loop), display_date(2, loop)]
loop.run_until_complete(asyncio.gather(*tasks)) # "阻塞"直到所有的tasks完成
loop.close()


asyncio模块的其他方法

开启事件循环有两种方法,一种方法就是通过调用run_until_complete,另外一种就是调用run_forever。run_until_complete内置add_done_callback,使用run_forever的好处是可以通过自己自定义add_done_callback,具体差异请看下面两个例子。

run_until_complete()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio
async def slow_operation(future):
await asyncio.sleep(1)
future.set_result('Future is done!')
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
print(loop.is_running()) # False
loop.run_until_complete(future)
print(future.result())
loop.close()

run_forever()

run_forever相比run_until_complete的优势是添加了一个add_done_callback,可以让我们在task(future)完成的时候调用相应的方法进行后续处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
async def slow_operation(future):
await asyncio.sleep(1)
future.set_result('Future is done!')
def got_result(future):
print(future.result())
loop.stop()
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
future.add_done_callback(got_result)
try:
loop.run_forever()
finally:
loop.close()

这里还要注意一点,即使你调用了协程方法,但是如果事件循环没有开启,协程也不会执行,参考官方文档的描述,我刚被坑过。

Calling a coroutine does not start its code running – the coroutine object returned by the call doesn’t do anything until you schedule its execution. There are two basic ways to start it running: call await coroutine or yield from coroutine from another coroutine (assuming the other coroutine is already running!), or schedule its execution using the ensure_future() function or the AbstractEventLoop.create_task() method. Coroutines (and tasks) can only run when the event loop is running.

Call

call_soon()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
def hello_world(loop):
print('Hello World')
loop.stop()
loop = asyncio.get_event_loop()
# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)
# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()

下面是运行结果,我们可以通过call_soon提前注册我们的task,并且也可以根据返回的Handle进行cancel。

1
Hello World

call_later()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
import datetime
def display_date(end_time, loop):
print(datetime.datetime.now())
if (loop.time() + 1.0) < end_time:
loop.call_later(1, display_date, end_time, loop)
else:
loop.stop()
loop = asyncio.get_event_loop()
# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)
# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()

改动一下上面的例子我们来看一下call_later的用法,注意这里并没有像上面那样使用while循环进行操作,我们可以通过call_later来设置每隔1秒去调用display_date()方法。

1
2
3
4
5
2016-12-24 19:17:13.421649
2016-12-24 19:17:14.422933
2016-12-24 19:17:15.424315
2016-12-24 19:17:16.425571
2016-12-24 19:17:17.426874

Chain coroutines

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0) # 协程compute不会继续往下面执行,直到协程sleep返回结果
return x + y
async def print_sum(x, y):
result = await compute(x, y) # 协程print_sum不会继续往下执行,直到协程compute返回结果
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

下面是输出结果

1
2
3
ziwenxie :: ~ » python chain.py
Compute 1 + 2 ...
1 + 2 = 3


Queue

在asyncio使用Queue来模拟生产者-消费者模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import asyncio
import random
async def produce(queue, n):
for x in range(n):
# produce an item
print('producing {}/{}'.format(x, n))
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
item = str(x)
# put the item in the queue
await queue.put(item)
async def consume(queue):
while True:
# wait for an item from the producer
item = await queue.get()
# process the item
print('consuming {}...'.format(item))
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
# Notify the queue that the item has been processed
queue.task_done()
async def run(n):
queue = asyncio.Queue()
# schedule the consumer
consumer = asyncio.ensure_future(consume(queue))
# run the producer and wait for completion
await produce(queue, n)
# wait until the consumer has processed all items
await queue.join()
# the consumer is still awaiting for an item, cancel it
consumer.cancel()
loop = asyncio.get_event_loop()
loop.run_until_complete(run(10))
loop.close()


在爬虫中使用asyncio来实现异步IO

下面我们来通过一个简单的例子来看一下怎么在Python爬虫项目中使用asyncio。by the way: 在asyncio中使用requests没有任何意义,requests是基于同步实现的,目前也没有要支持asyncio的动向,如果要充分发回异步的威力,应该使用aiohttp。而且也要合理使用concurrent.futures模块提供的线程池/进程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import aiohttp
import asyncio
import time
NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'
async def fetch_async(a):
async with aiohttp.request('GET', URL.format(a)) as r:
data = await r.json()
return data['args']['a']
start = time.time()
event_loop = asyncio.get_event_loop()
tasks = [fetch_async(num) for num in NUMBERS]
results = event_loop.run_until_complete(asyncio.gather(*tasks))
for num, result in zip(NUMBERS, results):
print('fetch({}) = {}'.format(num, result))
print('Use asyncio+aiohttp cost: {}'.format(time.time() - start))

下面是运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ziwenxie :: ~ » python example1.py
fetch(0) = 0
fetch(1) = 1
fetch(2) = 2
fetch(3) = 3
fetch(4) = 4
fetch(5) = 5
fetch(6) = 6
fetch(7) = 7
fetch(8) = 8
fetch(9) = 9
fetch(10) = 10
fetch(11) = 11
Use asyncio+aiohttp cost: 0.8980867862701416

如果使用传统的多线程和ThreadPool/ProcessPool方式的话,由于多线程/多进程之间切换的开销速度会慢了许多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import requests
import time
from concurrent.futures import ThreadPoolExecutor
NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'
def fetch(a):
r = requests.get(URL.format(a))
return r.json()['args']['a']
start = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
for num, result in zip(NUMBERS, executor.map(fetch, NUMBERS)):
print('fetch({}) = {}'.format(num, result))
print('Use requests+ThreadPoolExecutor cost: {}'.format(time.time() - start))

线程池的执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ziwenxie :: ~ » python example2.py
fetch(0) = 0
fetch(1) = 1
fetch(2) = 2
fetch(3) = 3
fetch(4) = 4
fetch(5) = 5
fetch(6) = 6
fetch(7) = 7
fetch(8) = 8
fetch(9) = 9
fetch(10) = 10
fetch(11) = 11
Use requests+ThreadPoolExecutor cost: 3.356502056121826

进程池的执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
fetch(0) = 0
fetch(1) = 1
fetch(2) = 2
fetch(3) = 3
fetch(4) = 4
fetch(5) = 5
fetch(6) = 6
fetch(7) = 7
fetch(8) = 8
fetch(9) = 9
fetch(10) = 10
fetch(11) = 11
Use requests+ProcessPoolExecutor cost: 3.2979931831359863


References

DOCUMENTATION OF ASYNCIO1
DOCUMENTATION OF ASYNCIO2
COROUTINES AND ASYNC/AWAIT
GOLD-XITU
STACKOVERFLOW
PyMOTW-3