Автор: Ганеш Гс
Введение
В этой статье основное внимание уделяется подробному объяснению того, как можно уменьшить задержку для связанных операций ввода-вывода. Чаще всего мы наблюдаем повышенную задержку при чтении/записи данных на диск, в сеть или в любых режимах хранения данных. Эта задержка связана со временем, затрачиваемым на ожидание завершения операций ввода/вывода. Помня обо всем этом, давайте обсудим несколько методов, позволяющих добиться наилучшего «уменьшения задержки» при чтении данных из корзины s3 (в этом примере данные хранятся в корзине s3). , но тот же подход можно использовать независимо от того, где хранятся ваши данные).
Многопоточность
Многопоточность — это процесс одновременного выполнения нескольких потоков. Теперь давайте проверим многопоточность в контексте Python. Python имеет множество библиотек. Когда дело доходит до многопоточности, одной из популярных библиотек является ‘ concurrent.futures’. В этой библиотеке у нас есть важные подклассы Executor, называемые ThreadPoolExecutor и ProcessPoolExecutor.
Исполнитель пула потоков
ThreadPoolExecutor запускает каждого из ваших рабочих процессов в отдельных потоках внутри основного процесса. Этот подкласс класса Executor использует многопоточность и создает пул потоков для отправки задач. Этот пул назначает задачи доступным потокам и планирует их выполнение. Выбор количества воркеров/потоков зависит от пользователя или задач.
Исполнитель ProcessPool
ProcessPoolExecutor запускает каждого из ваших рабочих процессов в своем дочернем процессе. Этот подкласс класса Executor использует многопроцессорность, и мы создаем пул процессов для отправки задач. Затем этот пул назначает задачи доступным процессам и планирует их запуск.
Когда и где следует использовать эти процессоры?
Если вы имеете дело с операцией, связанной с вводом-выводом, многопоточность — хороший вариант, позволяющий сократить время ожидания ввода-вывода.
Если вы имеете дело с задачами, связанными с ЦП, многопроцессорность — хороший вариант, чтобы мы могли получить выгоду от нескольких ЦП.
Поскольку мы сосредоточены на операциях, связанных с вводом-выводом, многопоточность превзойдет многопроцессорность по следующим причинам:
я. Для программ, привязанных к процессору, оптимальным вариантом будет многопроцессорная обработка, а не многопоточность из-за глобальной блокировки интерпретатора (GIL).
ii. Для программ, связанных с вводом-выводом, многопроцессорность улучшит производительность, но накладные расходы, как правило, выше, чем при использовании многопоточности.
Глобальная блокировка интерпретатора
Глобальная блокировка интерпретатора или GIL — печально известная функция Python. Это позволяет только одному потоку контролировать интерпретатор Python, а это означает, что только один поток может находиться в состоянии выполнения в любой момент времени.
GIL в многопоточной версии
В многопоточной версии GIL предотвращает параллельное выполнение потоков, привязанных к ЦП. GIL не оказывает большого влияния на производительность многопоточных программ, связанных с вводом-выводом, поскольку блокировка распределяется между потоками, пока они ожидают ввода-вывода.
Чтобы было понятнее, давайте взглянем на диаграмму ниже:
Если мы выполним его последовательно, мы увидим, что много времени уходит на выполнение низкоуровневых операций ввода-вывода. Тогда как в многопоточной (параллельной) версии, когда поток 1 (T1) ожидает завершения операции ввода-вывода, он освобождает блокировку GIL, чтобы поток 2 (T2) мог ее захватить. После завершения ввода-вывода он будет ждать, пока T2 снимет блокировку, чтобы T1 мог завершить свое выполнение. В целом мы видим, что мы сократили время ожидания ввода/вывода.
С точки зрения операций, связанных с ЦП, лучше использовать многопроцессорность вместо многопоточности, потому что, в конце концов, хотя мы и используем многопоточность, она считается однопоточной программой из-за GIL. Если мы используем многопроцессорность для потоков, привязанных к процессору, у нас будет наш интерпретатор GIL для каждого подпроцесса.
Кэширование
Кэширование часто используемых данных является эффективным методом. Сочетание многопоточности с кэшированием имеет множество реализаций. Двумя такими приложениями являются Наименее часто используемые (LRU ) и Наименее часто используемые (LFU). LRU отбрасывает из кэша данные, которые использовались реже всего, чтобы освободить место для новых данных, в то время как LFU отбрасывает наименее часто используемые данные из кэша, чтобы освободить место для новых данных.
TTLCache
Что, если нам нужно кэшировать эти данные на несколько минут/часов/целый день?
Библиотека cachetools в Python соответствует реализации LRU вместе с атрибутом «время жизни». Это помогает дать время/жизнь каждому объекту в кэш-памяти, а также снижает задержку.
Примеры
Следующее упражнение было выполнено в экземпляре типа ml.t2.medium, где у нас есть два виртуальных ЦП (vCPU) и 4 ГБ ОЗУ.
Файлы рассмотрены
На приведенном выше изображении перечислены файлы примеров, которые находятся в s3. Давайте попробуем последовательно прочитать эти CSV-файлы с помощью традиционного клиента boto3. На изображении ниже показано время, затраченное на чтение всех трех файлов.
импортировать pandas как pd
import boto3
import time
import datetime#creating boto3 client
client = boto3.client('s3')
Bucket_name = ' checkreadtime'
keys = ['FILE1.csv','FILE2.csv','FILE3.csv','FILE4.csv','FILE5.csv','FILE6.csv']
# функция для чтения данных из s3
def read_data_from_s3(bucket_name,key):
obj = client.get_object(Bucket=bucket_name, Key=key)
input_df = pd.read_csv(obj['Body '])
return input_df#ИСПОЛЬЗУЕТСЯ МОДУЛЬ ВРЕМЕНИ ДЛЯ ИЗМЕРЕНИЯ ВРЕМЕНИ НАСТЕННЫХ ЧАСОВ, ТАКЖЕ МОЖНО ИСПОЛЬЗОВАТЬ ПРОФИЛИРОВАНИЕ ЛИНИИ
для ключей в ключах:
print(key , “ Has “ , len(read_data_from_s3 (bucket_name,ключ)) , «Записи»)
Ниже приведено среднее время, затраченное на приведенный выше код после выполнения примерно 20 итераций.
Общее затраченное время — 2,634592115879059
Приведенный выше метод занимает около 2,6 секунды, чтобы прочитать все файлы CSV из s3. Обратите внимание, что есть и другие форматы файлов, например перо, паркет и т. д.
Давайте попробуем использовать многопоточность.
import concurrent.futures
# рабочая функция каждого потока, который будет получать данные из s3
def download(job):
Bucket, Key, FileName = Job
s3_client = boto3.client('s3')
obj = s3_client.get_object(Bucket=bucket, Key=key)
input_df = pd.read_csv(obj['Body'])
return input_df # Создаем задания
jobs = [(bucket_name, key, key.replace('/', '_')) for key in keys[:]]
#создаем пул потоков для выполнения работы
pool = concurrent.futures.ThreadPoolExecutor(max_workers=5)
FILE1, FILE2, FILE3, FILE4, FILE5, FILE6 = pool.map(download, jobs)
Ниже приведено среднее время, затраченное на приведенный выше код после выполнения около 20 итераций.
Общее затраченное время — 2,0443724632263183
Судя по двум приведенным выше примерам, мы сокращаем почти 600 мс всего за несколько строк кода. Мы знаем, что по умолчанию потоки асинхронизированы. В приведенных выше результатах мы создаем пул из 5 потоков и назначаем задачи. Функция карты помогает нам получать синхронизированные результаты.
Давайте попробуем кэшировать вместе с многопоточностью.
import cachetools.func
@cachetools.func.ttl_cache(maxsize=None, ttl=60*10)
def download(job):
ведро, ключ, имя файла = job< br /> s3_client = boto3.client('s3')
obj = s3_client.get_object(Bucket=bucket, Key=key)
input_df = pd.read_csv(obj['Body'])
return input_dfjobs = [(bucket_name, key, key.replace('/', '_')) for key in keys[:]]
# создайте пул потоков для выполнения работы
pool = concurrent.futures.ThreadPoolExecutor(max_workers=5)
ФАЙЛ1,ФАЙЛ2,ФАЙЛ3,ФАЙЛ4,ФАЙЛ5,ФАЙЛ6 = pool.map(загрузка, задания)
Ниже показано среднее время, затрачиваемое на кэширование и многопоточность.
Общее затраченное время — 0,0005709171295166015
Этот метод здесь резко снижает задержку. Здесь мы использовали функцию decorator в Python для реализации этого механизма кэширования.
Обратите внимание, что мы также можем написать собственные функции кэширования, изменив следующие атрибуты:
Maxsize — мы можем задать максимальный размер кэша.
Ttl — (в секундах), где мы можем передать время для каждого объекта, который кэшируется.
Заключение
При правильной реализации многопоточности и кэширования мы можем увидеть повышение производительности в любых операциях, связанных с вводом-выводом.
Описанный выше процесс является одним из многих способов добиться снижения задержки. Мы можем настроить тот же процесс, увеличив/уменьшив количество рабочих процессов в подклассе ThreadPoolExecutor, чтобы уменьшить задержку. Можно также использовать другие доступные библиотеки кэширования. Использование приведенной выше иллюстрации в режиме реального времени с правильной архитектурой реализации может значительно сократить задержку в программе, связанной с вводом-выводом.
Первоначально опубликовано на https://www.tigeranalytics.com 6 августа 2020 г.