Как контролировать параллелизм или параллелизм установки Airflow?

В некоторых моих установках Apache Airflow группы DAG или запланированные задачи не запускаются, даже если планировщик не загружен полностью. Как я могу увеличить количество групп DAG или задач, которые могут выполняться одновременно?

Точно так же, если моя установка находится под высокой нагрузкой, и я хочу ограничить, насколько быстро мои рабочие Airflow будут извлекать задачи из очереди (например, чтобы уменьшить потребление ресурсов), что я могу настроить, чтобы уменьшить среднюю нагрузку?


person hexacyanide    schedule 30.05.2019    source источник


Ответы (3)


Вот расширенный список параметров конфигурации, доступных начиная с Airflow v1.10.2. Некоторые из них могут быть установлены для каждой группы DAG или для каждого оператора, но могут также вернуться к значениям по умолчанию для всей настройки, если они не указаны.


Параметры, которые можно указать для каждой группы данных:

  • concurrency: количество экземпляров задач, разрешенных для одновременного выполнения во всех активных запусках группы доступности базы данных, для которой это установлено. По умолчанию core.dag_concurrency, если не установлен
  • max_active_runs: максимальное количество активных запусков для этой группы DAG. Планировщик не будет создавать новые активные запуски DAG после достижения этого предела. По умолчанию core.max_active_runs_per_dag, если не установлен

Примеры:

# Only allow one run of this DAG to be running at any given time
dag = DAG('my_dag_id', max_active_runs=1)

# Allow a maximum of 10 tasks to be running across a max of 2 active DAG runs
dag = DAG('example2', concurrency=10, max_active_runs=2)

Параметры, которые можно указать для каждого оператора:

  • pool: пул для выполнения задачи. Пулы можно использовать для ограничения параллелизма для < em> только часть задач
  • task_concurrency: ограничение параллелизма для одной и той же задачи при нескольких запусках DAG

Пример:

t1 = BaseOperator(pool='my_custom_pool', task_concurrency=12)

Параметры, указанные для всей настройки Airflow:

  • core.parallelism: максимальное количество задач, выполняемых во всей установке Airflow
  • core.dag_concurrency: максимальное количество задач, которые могут выполняться в одной группе DAG (при нескольких запусках DAG)
  • core.non_pooled_task_slot_count: количество слотов задач, выделенных задачам, не выполняющимся в пуле
  • core.max_active_runs_per_dag: максимальное количество активных запусков DAG на DAG
  • scheduler.max_threads: сколько потоков должен использовать процесс планировщика для планирования DAG
  • celery.worker_concurrency: максимальное количество экземпляров задач, которые работник будет обрабатывать за раз при использовании CeleryExecutor
  • celery.sync_parallelism: количество процессов, которые CeleryExecutor должен использовать для синхронизации состояния задачи
person hexacyanide    schedule 30.05.2019
comment
Отличный ответ, большое вам спасибо! Четкое объяснение всех параметров, связанных с параллелизмом, в одном месте. - person Timur; 19.06.2019
comment
Я думаю, что task_concurrency неправильно определен, если вы посмотрите на комментарий ниже с иллюстрацией. Определение должно быть таким: ›ограничение параллелизма для выполнения одной и той же задачи при нескольких выполнениях. - person Philipp Johannis; 23.02.2021
comment
@PhilippJohannis Спасибо! Я отредактировал ответ. - person hexacyanide; 11.05.2021

Иллюстрация трех основных переменных управления параллелизмом:

иллюстрация

https://airflow.apache.org/docs/stable/faq.html#how-can-my-airflow-dag-run-faster.

person skwon    schedule 18.09.2020
comment
Спасибо за объяснение! Вполне понятно. - person DenisOgr; 23.04.2021

Проверьте конфигурацию воздушного потока, для которой используется core.executor. SequentialExecutor будет выполняться последовательно, поэтому вы можете выбрать Local Executor или Clery Executor, которые будут выполнять задачу параллельно. После этого вы можете использовать другие параметры, упомянутые @hexacyanide

person Salman Faris    schedule 17.06.2019