Автор: Ганеш Гс

Введение

В этой статье основное внимание уделяется подробному объяснению того, как можно уменьшить задержку для связанных операций ввода-вывода. Чаще всего мы наблюдаем повышенную задержку при чтении/записи данных на диск, в сеть или в любых режимах хранения данных. Эта задержка связана со временем, затрачиваемым на ожидание завершения операций ввода/вывода. Помня обо всем этом, давайте обсудим несколько методов, позволяющих добиться наилучшего «уменьшения задержки» при чтении данных из корзины s3 (в этом примере данные хранятся в корзине s3). , но тот же подход можно использовать независимо от того, где хранятся ваши данные).

Многопоточность

Многопоточность — это процесс одновременного выполнения нескольких потоков. Теперь давайте проверим многопоточность в контексте Python. Python имеет множество библиотек. Когда дело доходит до многопоточности, одной из популярных библиотек является ‘ concurrent.futures’. В этой библиотеке у нас есть важные подклассы Executor, называемые ThreadPoolExecutor и ProcessPoolExecutor.

Исполнитель пула потоков

ThreadPoolExecutor запускает каждого из ваших рабочих процессов в отдельных потоках внутри основного процесса. Этот подкласс класса Executor использует многопоточность и создает пул потоков для отправки задач. Этот пул назначает задачи доступным потокам и планирует их выполнение. Выбор количества воркеров/потоков зависит от пользователя или задач.

Исполнитель ProcessPool

ProcessPoolExecutor запускает каждого из ваших рабочих процессов в своем дочернем процессе. Этот подкласс класса Executor использует многопроцессорность, и мы создаем пул процессов для отправки задач. Затем этот пул назначает задачи доступным процессам и планирует их запуск.

Когда и где следует использовать эти процессоры?

Если вы имеете дело с операцией, связанной с вводом-выводом, многопоточность — хороший вариант, позволяющий сократить время ожидания ввода-вывода.

Если вы имеете дело с задачами, связанными с ЦП, многопроцессорность — хороший вариант, чтобы мы могли получить выгоду от нескольких ЦП.

Поскольку мы сосредоточены на операциях, связанных с вводом-выводом, многопоточность превзойдет многопроцессорность по следующим причинам:

я. Для программ, привязанных к процессору, оптимальным вариантом будет многопроцессорная обработка, а не многопоточность из-за глобальной блокировки интерпретатора (GIL).
ii. Для программ, связанных с вводом-выводом, многопроцессорность улучшит производительность, но накладные расходы, как правило, выше, чем при использовании многопоточности.

Глобальная блокировка интерпретатора

Глобальная блокировка интерпретатора или GIL — печально известная функция Python. Это позволяет только одному потоку контролировать интерпретатор Python, а это означает, что только один поток может находиться в состоянии выполнения в любой момент времени.

GIL в многопоточной версии

В многопоточной версии GIL предотвращает параллельное выполнение потоков, привязанных к ЦП. GIL не оказывает большого влияния на производительность многопоточных программ, связанных с вводом-выводом, поскольку блокировка распределяется между потоками, пока они ожидают ввода-вывода.

Чтобы было понятнее, давайте взглянем на диаграмму ниже:

Если мы выполним его последовательно, мы увидим, что много времени уходит на выполнение низкоуровневых операций ввода-вывода. Тогда как в многопоточной (параллельной) версии, когда поток 1 (T1) ожидает завершения операции ввода-вывода, он освобождает блокировку GIL, чтобы поток 2 (T2) мог ее захватить. После завершения ввода-вывода он будет ждать, пока T2 снимет блокировку, чтобы T1 мог завершить свое выполнение. В целом мы видим, что мы сократили время ожидания ввода/вывода.

С точки зрения операций, связанных с ЦП, лучше использовать многопроцессорность вместо многопоточности, потому что, в конце концов, хотя мы и используем многопоточность, она считается однопоточной программой из-за GIL. Если мы используем многопроцессорность для потоков, привязанных к процессору, у нас будет наш интерпретатор GIL для каждого подпроцесса.

Кэширование

Кэширование часто используемых данных является эффективным методом. Сочетание многопоточности с кэшированием имеет множество реализаций. Двумя такими приложениями являются Наименее часто используемые (LRU ) и Наименее часто используемые (LFU). LRU отбрасывает из кэша данные, которые использовались реже всего, чтобы освободить место для новых данных, в то время как LFU отбрасывает наименее часто используемые данные из кэша, чтобы освободить место для новых данных.

TTLCache

Что, если нам нужно кэшировать эти данные на несколько минут/часов/целый день?

Библиотека cachetools в Python соответствует реализации LRU вместе с атрибутом «время жизни». Это помогает дать время/жизнь каждому объекту в кэш-памяти, а также снижает задержку.

Примеры

Следующее упражнение было выполнено в экземпляре типа ml.t2.medium, где у нас есть два виртуальных ЦП (vCPU) и 4 ГБ ОЗУ.

Файлы рассмотрены

На приведенном выше изображении перечислены файлы примеров, которые находятся в s3. Давайте попробуем последовательно прочитать эти CSV-файлы с помощью традиционного клиента boto3. На изображении ниже показано время, затраченное на чтение всех трех файлов.

импортировать pandas как pd
import boto3
import time
import datetime#creating boto3 client
client = boto3.client('s3')
Bucket_name = ' checkreadtime'
keys = ['FILE1.csv','FILE2.csv','FILE3.csv','FILE4.csv','FILE5.csv','FILE6.csv']
# функция для чтения данных из s3
def read_data_from_s3(bucket_name,key):
obj = client.get_object(Bucket=bucket_name, Key=key)
input_df = pd.read_csv(obj['Body '])
return input_df#ИСПОЛЬЗУЕТСЯ МОДУЛЬ ВРЕМЕНИ ДЛЯ ИЗМЕРЕНИЯ ВРЕМЕНИ НАСТЕННЫХ ЧАСОВ, ТАКЖЕ МОЖНО ИСПОЛЬЗОВАТЬ ПРОФИЛИРОВАНИЕ ЛИНИИ
для ключей в ключах:
print(key , “ Has “ , len(read_data_from_s3 (bucket_name,ключ)) , «Записи»)

Ниже приведено среднее время, затраченное на приведенный выше код после выполнения примерно 20 итераций.

Общее затраченное время — 2,634592115879059

Приведенный выше метод занимает около 2,6 секунды, чтобы прочитать все файлы CSV из s3. Обратите внимание, что есть и другие форматы файлов, например перо, паркет и т. д.

Давайте попробуем использовать многопоточность.

import concurrent.futures
# рабочая функция каждого потока, который будет получать данные из s3
def download(job):
Bucket, Key, FileName = Job
s3_client = boto3.client('s3')
obj = s3_client.get_object(Bucket=bucket, Key=key)
input_df = pd.read_csv(obj['Body'])
return input_df # Создаем задания
jobs = [(bucket_name, key, key.replace('/', '_')) for key in keys[:]]
#создаем пул потоков для выполнения работы
pool = concurrent.futures.ThreadPoolExecutor(max_workers=5)
FILE1, FILE2, FILE3, FILE4, FILE5, FILE6 = pool.map(download, jobs)

Ниже приведено среднее время, затраченное на приведенный выше код после выполнения около 20 итераций.

Общее затраченное время — 2,0443724632263183

Судя по двум приведенным выше примерам, мы сокращаем почти 600 мс всего за несколько строк кода. Мы знаем, что по умолчанию потоки асинхронизированы. В приведенных выше результатах мы создаем пул из 5 потоков и назначаем задачи. Функция карты помогает нам получать синхронизированные результаты.

Давайте попробуем кэшировать вместе с многопоточностью.

import cachetools.func
@cachetools.func.ttl_cache(maxsize=None, ttl=60*10)
def download(job):
ведро, ключ, имя файла = job< br /> s3_client = boto3.client('s3')
obj = s3_client.get_object(Bucket=bucket, Key=key)
input_df = pd.read_csv(obj['Body'])
return input_dfjobs = [(bucket_name, key, key.replace('/', '_')) for key in keys[:]]
# создайте пул потоков для выполнения работы
pool = concurrent.futures.ThreadPoolExecutor(max_workers=5)
ФАЙЛ1,ФАЙЛ2,ФАЙЛ3,ФАЙЛ4,ФАЙЛ5,ФАЙЛ6 = pool.map(загрузка, задания)

Ниже показано среднее время, затрачиваемое на кэширование и многопоточность.

Общее затраченное время — 0,0005709171295166015

Этот метод здесь резко снижает задержку. Здесь мы использовали функцию decorator в Python для реализации этого механизма кэширования.

Обратите внимание, что мы также можем написать собственные функции кэширования, изменив следующие атрибуты:
Maxsize — мы можем задать максимальный размер кэша.
Ttl — (в секундах), где мы можем передать время для каждого объекта, который кэшируется.

Заключение

При правильной реализации многопоточности и кэширования мы можем увидеть повышение производительности в любых операциях, связанных с вводом-выводом.

Описанный выше процесс является одним из многих способов добиться снижения задержки. Мы можем настроить тот же процесс, увеличив/уменьшив количество рабочих процессов в подклассе ThreadPoolExecutor, чтобы уменьшить задержку. Можно также использовать другие доступные библиотеки кэширования. Использование приведенной выше иллюстрации в режиме реального времени с правильной архитектурой реализации может значительно сократить задержку в программе, связанной с вводом-выводом.

Первоначально опубликовано на https://www.tigeranalytics.com 6 августа 2020 г.