注:2019.3.21补充完善

前言

所谓「异步 IO」,就是你发起一个 IO 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知。

Asyncio 是并发(concurrency)的一种方式。对 Python 来说,并发还可以通过线程(threading)和多进程(multiprocessing)来实现。

Asyncio 并不能带来真正的并行(parallelism)。当然,因为 GIL(全局解释器锁)的存在,Python 的多线程也不能带来真正的并行。

可交给 asyncio 执行的任务,称为协程(coroutine)。一个协程可以放弃执行,把机会让给其它协程(await)

运行协程

调用协程函数,协程并不会开始运行,只是返回一个协程对象

协程对象要运行有两种方式:

  • 在另一个已经运行的协程中用 await 等待它
  • 通过 ensure_future 函数计划它的执行

简单来说,只有 loop 运行了,协程才可能运行。
下面先拿到当前线程缺省的 loop ,然后把协程对象交给 loop.run_until_complete,协程对象随后会在 loop 里得到运行。

loop = asyncio.get_event_loop()
loop.run_until_complete(do_some_work(3))

是一个阻塞(blocking)调用,直到协程运行结束,它才返回。这一点从函数名不难看出。
run_until_complete 的参数是一个 future,但是我们这里传给它的却是协程对象,之所以能这样,是因为它在内部做了检查,通过 ensure_future 函数把协程对象包装(wrap)成了 future。所以,我们也可以把上面代码写得更明显一些:

loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))   #手动包装一下

完整代码:

import asyncio

async def hello(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)

loop = asyncio.get_event_loop()
loop.run_until_complete(hello(3))
运行结果:

Waiting 3
<三秒后结束>

使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行,协程的目的也是让一些耗时的操作异步化。

await后面跟的必须是一个Awaitable对象,或者实现了相应协议的对象,查看Awaitable抽象类的代码,表明了只要一个类实现了await方法,那么通过它构造出来的实例就是一个Awaitable,并且Coroutine类也继承了Awaitable。

多个协程

import asyncio
from time import strftime

async def hello():
    print(strftime('[%H:%M:%S]'), end=' ')
    print("begin")
    await asyncio.sleep(1)
    print(strftime('[%H:%M:%S]'), end=' ')
    print("end")

loop = asyncio.get_event_loop()
tasks = [hello(),hello()]
# loop.run_until_complete(asyncio.wait(tasks))
loop.run_until_complete(asyncio.gather(*tasks))

执行结果:
[15:08:30] begin
[15:08:30] begin
[15:08:31] end
[15:08:31] end
  • 两个协程并发执行所以总时间是两秒
  • gather 起聚合的作用,把多个 futures 包装成单个 future,因为 loop.run_until_complete 只接受单个 future。
  • asyncio.gather 和 asyncio.wait 功能相似。

需要注意的:

  • await语法只能出现在async修饰的函数中
  • 协程函数中,可以通过await语法来挂起自身的协程,并切换到下一个协程直到该协程返回结果。

协程中运行阻塞函数

爬虫中使用协程比较多,这里使用requesst这个阻塞模块

由于requests模块阻塞了客户代码与asycio事件循环的唯一线程,因此在执行调用时,整个应用程序都会冻结,但如果一定要用requests模块,可以使用事件循环对象的run_in_executor方法,通过run_in_executor方法来新建一个线程来执行耗时函数

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Archerx
# @time: 2019/3/21 上午 11:20

import asyncio
import requests
import functools


async def run(url):
   print("start ", url)
   loop = asyncio.get_event_loop()
   # response = await loop.run_in_executor(None, requests.get, url)
   #functools实现多参数传入
   response = await loop.run_in_executor(None, functools.partial(requests.get,url=url,params='',timeout=1))
   print(response.status_code)


url_list = ['https://blog.ixuchao.cn/archives/54.html','https://blog.ixuchao.cn/archives/55.html','https://blog.ixuchao.cn/archives/53.html']
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

这个问题的解决方法是使用事件循环对象的run_in_executor方法。asyncio的事件循环在背后维护着一个ThreadPoolExecutor对象,我们可以调用run_in_executor方法,把可调用对象发给它执行,即可以通过run_in_executor方法来新建一个线程来执行耗时函数。

本质上类似于多线程跑IO密集型

run_in_executor方法参数如下:

    
AbstractEventLoop.run_in_executor(executor, func, *args)

executor 参数应该是一个 Executor 实例。如果为 None,则使用默认 executor。
func 就是要执行的函数
args 就是传递给 func 的参数

有了run_in_executor方法,就可以使用之前熟悉的模块创建协程并发了,而不需要使用特定的模块进行IO异步开发。

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Archerx
# @time: 2019/3/21 上午 11:20

import asyncio
from time import sleep, strftime
from concurrent import futures
executor = futures.ThreadPoolExecutor(max_workers=5)
async def blocked_sleep(name, t):  #函数中有await  函数必须是异步函数,加上async修饰
    print(strftime('[%H:%M:%S]'),end=' ')
    print('sleep {} is running {}s'.format(name, t))
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(executor, sleep, t)  #阻塞型函数必须这样来调用
    # await asyncio.sleep(t)   #或者是这样跟上面两行一个效果,5个sleep函数并行异步执行
    print(strftime('[%H:%M:%S]'),end=' ')
    print('sleep {} is end'.format(name))
    return t



tasks = [blocked_sleep(i,i) for i in range(1,6)]
tasks = [asyncio.ensure_future(task) for task in tasks]    #手动封装成task对象,调用task.result()获取结果。
loop = asyncio.get_event_loop()
# results = loop.run_until_complete(asyncio.wait(tasks))   #使用这个和下面代码功能类似
results = loop.run_until_complete(asyncio.gather(*tasks))


print('results: {}'.format(results))   #结果存入一个list



输出结果是

[19:49:32] sleep 3 is running 3s
[19:49:32] sleep 4 is running 4s
[19:49:32] sleep 1 is running 1s
[19:49:32] sleep 5 is running 5s
[19:49:32] sleep 2 is running 2s
[19:49:33] sleep 1 is end
[19:49:34] sleep 2 is end
[19:49:35] sleep 3 is end
[19:49:36] sleep 4 is end
[19:49:37] sleep 5 is end
result: [1, 2, 3, 4, 5]

tasks = (blocked_sleep(i, i) for i in range(1,6))产生一个生成器表达式,每个元素都是一个协程。我们将future传递给gather函数。

对于gather函数的使用方法如下:

asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

gather返回一个包含task对象结果的list

  • 在asyncio中调用阻塞函数时,需要使用asyncio维护的线程池来另开线程运行阻塞函数,防止阻塞事件循环所在的线程。
函数传参返回值返回值顺序
asyncio.gather可以传递多个协程或者Futures,函数会自动将协程包装成task(也可以手动包装),例如协程生成器。包含Futures结果的list按照原顺序排列
asyncio.waita list of futures返回两个Future集合 (done, pending)无序(暂定)
asyncio.as_completeda list of futures返回一个协程迭代器按照完成顺序

获取协程结果

直接输出函数返回结果

import asyncio
async def test1():
    print("1")
    print("2")
    return "stop"

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1())
loop.run_until_complete(task)
print(task.result())

由于协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象。所谓task对象是Future类的子类,保存了协程运行后的状态,用于未来获取协程的结果。我们也可以手动将协程对象定义成task,使用task = loop.create_task(test1())

等待task状态为finish,然后调用result方法获取返回值。

回调函数

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Archerx
# @time: 2019/3/21 上午 11:20

import asyncio
import functools

async def test1():
    print("1")
    print("2")
    return "stop"


def callback(param1,param2,future):  #回调函数中的future对象其实就是task对象
    print(param1,param2)
    print('CallBack:',future.result())   #future对象的result方法获取函数返回值


loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1())   #协程对象转换成task
# task.add_done_callback(callback)    #task绑定回调函数
task.add_done_callback(functools.partial(callback,"param1","param2"))  #回调函数如果需要接收多个参数,用偏函数导入
loop.run_until_complete(task)

future对象有几个状态:Pending、Running、Done、Cancelled。创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消,可以使用asyncio.Task获取事件循环的task。

整理一下概念和方法

  • event_loop事件循环:程序开启一个无限的循环,当把一些函数注册到事件循环上时,满足事件发生条件即调用相应的函数。
  • coroutine协程对象:指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象,协程对象需要注册到事件循环,由事件循环调用。
  • task任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
  • future:代表将来执行或没有执行的任务的结果,它和task上没有本质的区别
  • async/await关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

参考

已有 2 条评论

  1. 任翔:

    big master I love you

    2018-12-03 23:45 回复

preView