Как стать автором
Обновить

Асинхронный Python: различные формы конкурентности

Время на прочтение 8 мин
Количество просмотров 128K
Автор оригинала: Abu Ashraf Masnun
С появлением Python 3 довольно много шума об “асинхронности” и “параллелизме”, можно полагать, что Python недавно представил эти возможности/концепции. Но это не так. Мы много раз использовали эти операции. Кроме того, новички могут подумать, что asyncio является единственным или лучшим способом воссоздать и использовать асинхронные/параллельные операции. В этой статье мы рассмотрим различные способы достижения параллелизма, их преимущества и недостатки.

Определение терминов:


Прежде чем мы углубимся в технические аспекты, важно иметь некоторое базовое понимание терминов, часто используемых в этом контексте.

Синхронный и асинхронный:

В ​синхронных операциях задачи выполняются друг за другом. В асинхронных задачи могут запускаться и завершаться независимо друг от друга. Одна асинхронная задача может запускаться и продолжать выполняться, пока выполнение переходит к новой задаче. Асинхронные задачи ​не блокируют (не заставляют ждать завершения выполнения задачи) операции и обычно выполняются в фоновом режиме.

Например, вы должны обратиться в туристическое агентство, чтобы спланировать свой следующий отпуск. Вам нужно отправить письмо своему руководителю, прежде чем улететь. В синхронном режиме, вы сначала позвоните в туристическое агентство, и если вас попросят подождать, то вы будете ждать, пока вам не ответят. Затем вы начнёте писать письмо руководителю. Таким образом, вы выполняете задачи последовательно, одна за одной. [синхронное выполнение, прим. переводчика] Но, если вы умны, то пока вас попросили подождать [​повисеть на телефоне, прим. переводчика] вы начнёте писать e-mail и когда с вами снова заговорят вы приостановите написание, поговорите, а затем допишете письмо. Вы также можете попросить друга позвонить в агентство, а сами написать письмо. Это асинхронность, задачи не блокируют друг друга.

Конкурентность и параллелизм:

Конкурентность подразумевает, что две задачи выполняются совместно. В нашем предыдущем примере, когда мы рассматривали асинхронный пример, мы постепенно продвигались то в написании письма, то в разговоре с тур. агентством. Это ​конкурентность.

Когда мы попросили позвонить друга, а сами писали письмо, то задачи выполнялись ​параллельно.​

Параллелизм по сути является формой конкурентности. Но параллелизм зависит от оборудования. Например, если в CPU только одно ядро, то две задачи не могут выполняться параллельно. Они просто делят процессорное время между собой. Тогда это конкурентность, но не параллелизм. Но когда у нас есть несколько ядер [как друг в предыдущем примере, который является вторым ядром, прим. переводчика] мы можем выполнять несколько операций (в зависимости от количества ядер) одновременно.

Подытожим:

  • Синхронность: блокирует операции (блокирующие)
  • Асинхронность: не блокирует операции (неблокирующие)
  • Конкурентность: совместный прогресс (совместные)
  • Параллелизм: параллельный прогресс (параллельные)

Параллелизм подразумевает конкурентность. Но конкурентность не всегда подразумевает параллелизм.

Потоки и процессы


Python поддерживает потоки уже очень давно. Потоки позволяют выполнять операции конкурентно. Но есть проблема, связанная с Global Interpreter Lock (GIL) из-за которой потоки не могли обеспечить настоящий параллелизм. И тем не менее, с появлением multiprocessing можно использовать несколько ядер с помощью Python.

Потоки (Threads)

Рассмотрим небольшой пример. В нижеследующем коде функция worker будет выполняться в нескольких потоках асинхронно и одновременно.

import threading
import time
import random


def worker(number):
    sleep = random.randrange(1, 10)
    time.sleep(sleep)
    print("I am Worker {}, I slept for {} seconds".format(number, sleep))


for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    t.start()

print("All Threads are queued, let's see when they finish!")

А вот пример выходных данных:

$ python thread_test.py
All Threads are queued, let's see when they finish!
I am Worker 1, I slept for 1 seconds
I am Worker 3, I slept for 4 seconds
I am Worker 4, I slept for 5 seconds
I am Worker 2, I slept for 7 seconds
I am Worker 0, I slept for 9 seconds

Таким образом мы запустили 5 потоков для совместной работы и после их старта (т.е. после запуска функции worker) операция не ждёт завершения работы потоков прежде чем перейти к следующему оператору print. Это асинхронная операция.

В нашем примере мы передали функцию в конструктор Thread. Если бы мы хотели, то могли бы реализовать подкласс с методом (ООП стиль).

Дальнейшее чтение:

Чтобы узнать больше о потоках, воспользуйтесь ссылкой ниже:


Global Interpreter Lock (GIL)

GIL был представлен, чтобы сделать обработку памяти CPython проще и обеспечить наилучшую интеграцию с C(например, с расширениями). GIL — это механизм блокировки, когда интерпретатор Python запускает в работу только один поток за раз. Т.е. только один поток может исполняться в байт-коде Python единовременно. GIL следит за тем, чтобы несколько потоков не выполнялись параллельно.

Краткие сведения о GIL:

  • Одновременно может выполняться один поток.
  • Интерпретатор Python переключается между потоками для достижения конкурентности.
  • GIL применим к CPython (стандартной реализации). Но такие как, например, Jython и IronPython не имеют GIL.
  • GIL делает однопоточные программы быстрыми.
  • Операциям ввода/вывода GIL обычно не мешает.
  • GIL позволяет легко интегрировать непотокобезопасные библиотеки на C, благодаря GIL у нас есть много высокопроизводительных расширений/модулей, написанных на C.
  • Для CPU зависимых задач интерпретатор делает проверку каждые N тиков и переключает потоки. Таким образом один поток не блокирует другие.

Многие видят в GIL слабость. Я же рассматриваю это как благо, ведь были созданы такие библиотеки как NumPy, SciPy, которые занимают особое, уникальное положение в научном обществе.

Дальнейшее чтение:

Эти ресурсы позволят углубиться в GIL:


Процессы (Processes)

Чтобы достичь параллелизма в Python был добавлен модуль multiprocessing, который предоставляет API, и выглядит очень похожим, если вы использовали threading раньше.

Давайте просто пойдем и изменим предыдущий пример. Теперь модифицированная версия использует Процесс вместо Потока.

import multiprocessing
import time
import random


def worker(number):
    sleep = random.randrange(1, 10)
    time.sleep(sleep)
    print("I am Worker {}, I slept for {} seconds".format(number, sleep))


for i in range(5):
    t = multiprocessing.Process(target=worker, args=(i,))
    t.start()

print("All Processes are queued, let's see when they finish!")

Что же изменилось? Я просто импортировал модуль multiprocessing вместо threading. А затем, вместо потока я использовал процесс. Вот и всё! Теперь вместо множества потоков мы используем процессы которые запускаются на разных ядрах CPU (если, конечно, у вашего процессора несколько ядер).

С помощью класса Pool мы также можем распределить выполнение одной функции между несколькими процессами для разных входных значений. Пример из официальных документов:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3]))

Здесь вместо того, чтобы перебирать список значений и вызывать функцию f по одному, мы фактически запускаем функцию в разных процессах. Один процесс выполняет f(1), другой-f(2), а другой-f (3). Наконец, результаты снова объединяются в список. Это позволяет нам разбить тяжелые вычисления на более мелкие части и запускать их параллельно для более быстрого расчета.

Дальнейшее чтение:


Модуль concurrent.futures

Модуль concurrent.futures большой и позволяет писать асинхронный код очень легко. Мои любимчики ThreadPoolExecutor и ProcessPoolExecutor. Эти исполнители поддерживают пул потоков или процессов. Мы отправляем наши задачи в пул, и он запускает задачи в доступном потоке / процессе. Возвращается объект Future, который можно использовать для запроса и получения результата по завершении задачи.

А вот пример ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor
from time import sleep
 
def return_after_5_secs(message):
    sleep(5)
    return message
 
pool = ThreadPoolExecutor(3)
 
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print(future.result())

У меня есть статья о concurrent.futures masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html. Она может быть полезна при более глубоком изучении этого модуля.

Дальнейшее чтение:


Asyncio — что, как и почему?


У вас, вероятно, есть вопрос, который есть у многих людей в сообществе Python — что asyncio приносит нового? Зачем нужен был еще один способ асинхронного ввода-вывода? Разве у нас уже не было потоков и процессов? Давай посмотрим!

Зачем нам нужен asyncio?

Процессы очень дорогостоящие [с точки зрения потребления ресурсов, прим. переводчика] для создания. Поэтому для операций ввода/вывода в основном выбираются потоки. Мы знаем, что ввод-вывод зависит от внешних вещей — медленные диски или неприятные сетевые лаги делают ввод-вывод часто непредсказуемым. Теперь предположим, что мы используем потоки для операций ввода-вывода. 3 потока выполняют различные задачи ввода-вывода. Интерпретатор должен был бы переключаться между конкурентными потоками и давать каждому из них некоторое время по очереди. Назовем потоки — T1, T2 и T3. Три потока начали свою операцию ввода-вывода. T3 завершает его первым. T2 и T1 все еще ожидают ввода-вывода. Интерпретатор Python переключается на T1, но он все еще ждет. Хорошо, интерпретатор перемещается в T2, а тот все еще ждет, а затем перемещается в T3, который готов и выполняет код. Вы видите в этом проблему?

T3 был готов, но интерпретатор сначала переключился между T2 и T1 — это понесло расходы на переключение, которых мы могли бы избежать, если бы интерпретатор сначала переключился на T3, верно?

Что есть asynio?

Asyncio предоставляет нам цикл событий наряду с другими крутыми вещами. Цикл событий (event loop) отслеживает события ввода/вывода и переключает задачи, которые готовы и ждут операции ввода/вывода [цикл событий — программная конструкция, которая ожидает прибытия и производит рассылку событий или сообщений в программе, прим. переводчика].

Идея очень проста. Есть цикл обработки событий. И у нас есть функции, которые выполняют асинхронные операции ввода-вывода. Мы передаем свои функции циклу событий и просим его запустить их для нас. Цикл событий возвращает нам объект Future, словно обещание, что в будущем мы что-то получим. Мы держимся за обещание, время от времени проверяем, имеет ли оно значение (нам очень не терпится), и, наконец, когда значение получено, мы используем его в некоторых других операциях [т.е. мы послали запрос, нам сразу дали билет и сказали ждать, пока придёт результат. Мы периодически проверяем результат и как только он получен мы берем билет и по нему получаем значение, прим. переводчика].

Asyncio использует генераторы и корутины для остановки и возобновления задач. Прочитать детали вы можете здесь:


Как использовать asyncio?

Прежде чем мы начнём, давайте взглянем на пример:

import asyncio
import datetime
import random


async def my_sleep_func():
    await asyncio.sleep(random.randint(0, 5))


async def display_date(num, loop):
    end_time = loop.time() + 50.0
    while True:
        print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
        if (loop.time() + 1.0) >= end_time:
            break
        await my_sleep_func()


loop = asyncio.get_event_loop()

asyncio.ensure_future(display_date(1, loop))
asyncio.ensure_future(display_date(2, loop))

loop.run_forever()

Обратите внимание, что синтаксис async/await предназначен только для Python 3.5 и выше. Пройдёмся по коду:

  • У нас есть асинхронная функция display_date, которая принимает число (в качестве идентификатора) и цикл обработки событий в качестве параметров.
  • Функция имеет бесконечный цикл, который прерывается через 50 секунд. Но за этот период, она неоднократно печатает время и делает паузу. Функция await может ожидать завершения выполнения других асинхронных функций (корутин).
  • Передаем функцию в цикл обработки событий (используя метод ensure_future).
  • Запускаем цикл событий.

Всякий раз, когда происходит вызов await, asyncio понимает, что функции, вероятно, потребуется некоторое время. Таким образом, он приостанавливает выполнение, начинает мониторинг любого связанного с ним события ввода-вывода и позволяет запускать задачи. Когда asyncio замечает, что приостановленный ввод-вывод функции готов, он возобновляет функцию.

Делаем правильный выбор


Только что мы прошлись по самым популярным формам конкурентности. Но остаётся вопрос — что следует выбрать? Это зависит от вариантов использования. Из моего опыта я склонен следовать этому псевдо-коду:

if io_bound:
    if io_very_slow:
        print("Use Asyncio")
    else:
       print("Use Threads")
else:
    print("Multi Processing")

  • CPU Bound => Multi Processing
  • I/O Bound, Fast I/O, Limited Number of Connections => Multi Threading
  • I/O Bound, Slow I/O, Many connections => Asyncio

[Прим. переводчика]

Теги:
Хабы:
+19
Комментарии 36
Комментарии Комментарии 36

Публикации

Истории

Работа

Python разработчик
130 вакансий
Data Scientist
66 вакансий

Ближайшие события

Московский туристический хакатон
Дата 23 марта – 7 апреля
Место
Москва Онлайн
Геймтон «DatsEdenSpace» от DatsTeam
Дата 5 – 6 апреля
Время 17:00 – 20:00
Место
Онлайн