Python 协程、线程、进程使用

总结一下Python 协程、线程、进程的用法,基于Python 3.7。

协程

协程是单线程的。协程的作用,是在执行函数A时,可以随时中断,去执行函数B,然后中断继续执行函数A(可以自由切换)。但这一过程并不是函数调用(没有调用语句),这一整个过程看似像多线程,然而协程只有一个线程执行。

下面记录一下 asyncio 协程的用法。

设置超时的协程

如果要求协程有超时,可以使用 asyncio.wait() 函数,其超时后不会抛出 asyncio.TimeoutError,当超时,未完成的Future 或 Task 将在指定秒数后被返回。

import asyncio


async def crawl_page(url):
    print('begin crawl page', url)
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('end crawl page', url)
    return url.split('_')[0]


async def main(urls):
    tasks = {asyncio.create_task(crawl_page(url)): url for url in urls}
    # wait 函数不要直接传入协程对象
    completed, pending = await asyncio.wait(tasks, timeout=4)
    for task in pending:
        print(tasks[task], 'timeout')
        task.cancel()

    for task in completed:
        print(tasks[task], task.result())


if __name__ == '__main__':
    asyncio.run(main(['1_url_3', '2_url_1', '3_url_2', '4_url_4', '5_url_3']))

输出结果为:

begin crawl page 1_url_3
begin crawl page 2_url_1
begin crawl page 3_url_2
begin crawl page 4_url_4
begin crawl page 5_url_3
end crawl page 2_url_1
end crawl page 3_url_2
end crawl page 1_url_3
end crawl page 5_url_3
4_url_4 timeout
5_url_3 5
1_url_3 1
3_url_2 3
2_url_1 2

不设置超时的协程

import asyncio


async def crawl_page(url):
    print('begin crawl page', url)
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('end crawl page', url)
    return url.split('_')[0]


async def main(urls):
    tasks = {asyncio.create_task(crawl_page(url)): url for url in urls}

    # await asyncio.gather(*tasks)
    for task in tasks:
        await task
        print(tasks[task], 'end and result is', task.result())


if __name__ == '__main__':
    asyncio.run(main(['1_url_3', '2_url_1', '3_url_2', '4_url_4', '5_url_3']))

输出结果为:

begin crawl page 1_url_3
begin crawl page 2_url_1
begin crawl page 3_url_2
begin crawl page 4_url_4
begin crawl page 5_url_3
end crawl page 2_url_1
end crawl page 3_url_2
end crawl page 5_url_3
end crawl page 1_url_3
1_url_3 end and result is 1
2_url_1 end and result is 2
3_url_2 end and result is 3
end crawl page 4_url_4
4_url_4 end and result is 4
5_url_3 end and result is 5

asyncio 的缺点

Asyncio下必须得有相应的 Python 库支持。比如 requests 库就不兼容 Asyncio, aiohttp 库与其兼容。

import asyncio
import aiohttp


async def crawl_page(url):
    print('begin crawl page', url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            print('end crawl page', url)
            return resp.content_length


async def main():
    base_url = 'https://www.baidu.com/s?wd={}'
    urls = [base_url.format(i) for i in range(10)]
    tasks = {asyncio.create_task(crawl_page(url)): url for url in urls}
    await asyncio.gather(*tasks)


if __name__ == '__main__':
    asyncio.run(main())

输出结果为:

begin crawl page https://www.baidu.com/s?wd=0
begin crawl page https://www.baidu.com/s?wd=1
begin crawl page https://www.baidu.com/s?wd=2
begin crawl page https://www.baidu.com/s?wd=3
begin crawl page https://www.baidu.com/s?wd=4
begin crawl page https://www.baidu.com/s?wd=5
begin crawl page https://www.baidu.com/s?wd=6
begin crawl page https://www.baidu.com/s?wd=7
begin crawl page https://www.baidu.com/s?wd=8
begin crawl page https://www.baidu.com/s?wd=9
end crawl page https://www.baidu.com/s?wd=3
end crawl page https://www.baidu.com/s?wd=9
end crawl page https://www.baidu.com/s?wd=1
end crawl page https://www.baidu.com/s?wd=0
end crawl page https://www.baidu.com/s?wd=2
end crawl page https://www.baidu.com/s?wd=4
end crawl page https://www.baidu.com/s?wd=6
end crawl page https://www.baidu.com/s?wd=5
end crawl page https://www.baidu.com/s?wd=8
end crawl page https://www.baidu.com/s?wd=7

线程

使用 concurrent.futures.ThreadPoolExecutor,如下:

import concurrent.futures
import requests

UserAgent = 'Mozilla/5.0 (iPhone; CPU iPhone OS 11_0 like Mac OS X) AppleWebKit'
Referer = 'https://www.baidu.com'
Headers = {'User-Agent': UserAgent,
           'Referer': Referer}


def crawl_page(url):
    print('begin crawl page', url)
    req = requests.get(url, headers=Headers)
    print('end crawl page', url)
    return req.status_code


def crawl_all(urls):
    results = {}
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = {executor.submit(crawl_page, url): url for url in urls}
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            results[url] = future.result()
    return results


def main():
    base_url = 'https://www.baidu.com/s?wd={}'
    urls = [base_url.format(i) for i in range(10)]
    results = crawl_all(urls)


if __name__ == '__main__':
    main()

输出结果为:

begin crawl page https://www.baidu.com/s?wd=0
begin crawl page https://www.baidu.com/s?wd=1
begin crawl page https://www.baidu.com/s?wd=2
begin crawl page https://www.baidu.com/s?wd=3
begin crawl page https://www.baidu.com/s?wd=4
end crawl page https://www.baidu.com/s?wd=0
begin crawl page https://www.baidu.com/s?wd=5
end crawl page https://www.baidu.com/s?wd=4
begin crawl page https://www.baidu.com/s?wd=6
end crawl page https://www.baidu.com/s?wd=2
begin crawl page https://www.baidu.com/s?wd=7
end crawl page https://www.baidu.com/s?wd=1
begin crawl page https://www.baidu.com/s?wd=8
end crawl page https://www.baidu.com/s?wd=3
begin crawl page https://www.baidu.com/s?wd=9
end crawl page https://www.baidu.com/s?wd=7
end crawl page https://www.baidu.com/s?wd=6
end crawl page https://www.baidu.com/s?wd=5
end crawl page https://www.baidu.com/s?wd=9
end crawl page https://www.baidu.com/s?wd=8

进程

进程与线程用法相似,使用 concurrent.future.ProcessPoolExecutor,不过不需要设置 max_workers:

import time
import concurrent.futures


def cpu_bound(number):
    # print('begin', number)
    return sum(i * i for i in range(number))


def sum_all(numbers):
    # 空列表
    results = list(range(len(numbers)))
    with concurrent.futures.ProcessPoolExecutor() as ps:
        ps_results = {ps.submit(cpu_bound, numbers[i]): i for i in range(len(numbers))}
        for process in concurrent.futures.as_completed(ps_results):
            results[ps_results[process]] = process.result()
    return results


def main():
    start_time = time.perf_counter()
    numbers = [10000000 + x for x in range(20)]
    results = sum_all(numbers)
    end_time = time.perf_counter()
    print('Finished in {} seconds'.format(end_time - start_time))
    # print(results)


if __name__ == '__main__':
    main()