Электронная почта при сбое с помощью AWS SES в Apache Airflow DAG

Я пытаюсь отправлять мне сообщения Airflow по электронной почте с помощью AWS SES всякий раз, когда задача в моей группе DAG не запускается или пытается ее запустить. Я использую свои учетные данные AWS SES, а не общие учетные данные AWS.

Мой текущий airflow.cfg

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com 
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 25
smtp_mail_from = [email protected]

Текущая задача в моей группе DAG, предназначенная для преднамеренного сбоя и повторной попытки:

testfaildag_library_install_jar_jdbc = PythonOperator(
    task_id='library_install_jar',
    retries=3,
    retry_delay=timedelta(seconds=15),
    python_callable=add_library_to_cluster,
    params={'_task_id': 'cluster_create', '_cluster_name': CLUSTER_NAME, '_library_path':s3000://fakepath.jar},
    dag=dag,
    email_on_failure=True,
    email_on_retry=True,
    email=’[email protected]’,
    provide_context=True
)

Все работает так, как задумано, поскольку задача повторяет заданное количество раз и в конечном итоге терпит неудачу, за исключением того, что электронные письма не отправляются. Я также проверил логи в упомянутой выше задаче, и smtp никогда не упоминается.

Я посмотрел на аналогичный вопрос здесь, но единственное решение там не сработало. Кроме того, документация Airflow, такая как их пример здесь, похоже, не работает. для меня тоже.

Работает ли SES с функциями Airflow email_on_failure и email_on_retry?

В настоящее время я думаю об использовании функции on_failure_callback для вызова сценария python, предоставленного AWS здесь, чтобы отправить электронное письмо в случае сбоя, но на данный момент это не лучший вариант.

Спасибо, ценю любую помощь.


person Zack    schedule 01.06.2018    source источник
comment
Вы проверили адрес отправителя в SES?   -  person stdunbar    schedule 01.06.2018
comment
@stdunbar да, проверены как from, так и to. Адрес отправителя - это адрес электронной почты нашей учетной записи для мониторинга, и мы уже получаем от него обновления, поэтому подтверждено, что он также работает.   -  person Zack    schedule 01.06.2018


Ответы (2)


- обновлен 6/8 с работающей SES

вот моя запись о том, как мы все это заработали. Внизу этого ответа есть небольшое резюме.

Пара важных моментов:

  1. Мы решили не использовать Amazon SES, а использовать sendmail Теперь у нас есть и работает SES.
  2. Именно воздушный поток обслуживает функции email_on_failure и email_on_retry. Вы можете сделать journalctl –u airflow-worker –f для наблюдения за ним во время прогона Dag. На рабочем сервере вам НЕ нужно перезапускать airflow-worker после изменения airflow.cfg с новыми настройками smtp - он должен быть загружен автоматически. Не нужно беспокоиться о том, чтобы испортить запущенные Dags.

Вот техническое описание того, как использовать sendmail:

Поскольку мы перешли с ses на sendmail на локальном хосте, нам пришлось изменить наши настройки smtp в файле airflow.cfg.

Новый конфиг:

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = localhost
smtp_starttls = False
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
#smtp_user = not used
#smtp_password = not used
smtp_port = 25
smtp_mail_from =  [email protected]

Это работает как в производственных, так и в локальных случаях воздушного потока.

Некоторые распространенные ошибки, которые могут возникнуть, если их конфигурация не похожа на мою выше:

  • socket.error: [Errno 111] Connection refused - вы должны изменить строку smtp_host в airflow.cfg на localhost
  • smtplib.SMTPException: STARTTLS extension not supported by server. - вы должны изменить свой smtp_starttls в airflow.cfg на False

В моем локальном тестировании я попытался просто заставить воздушный поток показать журнал того, что происходило, когда он пытался отправить электронное письмо - я создал поддельный даг следующим образом:

# Airflow imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator

# General imports
from datetime import datetime,timedelta

def throwerror():
    raise ValueError("Failure")

SPARK_V_2_2_1 = '3.5.x-scala2.11'

args = {
    'owner': ‘me’,
    'email': ['me@myjob'],
    'depends_on_past': False,
    'start_date': datetime(2018, 5,24),
    'end_date':datetime(2018,6,28)
}

dag = DAG(
    dag_id='testemaildag',
    default_args=args,
    catchup=False,
    schedule_interval="* 18 * * *"
    )

t1 = DummyOperator(
    task_id='extract_data',
    dag=dag
)

t2 = PythonOperator(
    task_id='fail_task',
    dag=dag,
    python_callable=throwerror
)

t2.set_upstream(t1)

Если вы выполните journalctl -u airflow-worker -f, вы увидите, что работник говорит, что он отправил электронное письмо с предупреждением о сбое на адрес электронной почты в вашей DAG, но мы все еще не получали это электронное письмо. Затем мы решили заглянуть в почтовые журналы sendmail, выполнив cat /var/log/maillog. Мы видели такой журнал:

Jun  5 14:10:25 production-server-ip-range postfix/smtpd[port]: connect from localhost[127.0.0.1]
Jun  5 14:10:25 production-server-ip-range postfix/smtpd[port]: ID: client=localhost[127.0.0.1]
Jun  5 14:10:25 production-server-ip-range postfix/cleanup[port]: ID: message-id=<randomMessageID@production-server-ip-range-ec2-instance>
Jun  5 14:10:25 production-server-ip-range postfix/smtpd[port]: disconnect from localhost[127.0.0.1]
Jun  5 14:10:25 production-server-ip-range postfix/qmgr[port]: MESSAGEID: from=<[email protected]>, size=1297, nrcpt=1 (queue active)
Jun  5 14:10:55 production-server-ip-range postfix/smtp[port]: connect to aspmx.l.google.com[smtp-ip-range]:25: Connection timed out
Jun  5 14:11:25 production-server-ip-range postfix/smtp[port]: connect to alt1.aspmx.l.google.com[smtp-ip-range]:25: Connection timed out

Так что это, наверное, самый большой момент "О, черт возьми". Здесь мы можем увидеть, что на самом деле происходит в нашей службе smtp. Мы использовали telnet, чтобы подтвердить, что нам не удалось подключиться к целевым диапазонам IP-адресов из Gmail.

Мы определили, что письмо было отправлено, но служба sendmail не смогла успешно подключиться к диапазонам IP-адресов.

Мы решили разрешить весь исходящий трафик через порт 25 в AWS (поскольку наша производственная среда воздушного потока является экземпляром ec2), и теперь она успешно работает. Теперь мы можем получать электронные письма о сбоях и повторных попытках (подсказка: email_on_failure и email_on_retry по умолчанию установлены как True в вашей DAG Справочник по API - вам не нужно помещать его в свои аргументы, если вы не хотите, но по-прежнему рекомендуется явно указывать в нем True или False).

SES сейчас работает. Вот конфигурация воздушного потока:

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com 
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 587
smtp_mail_from = [email protected] (Verified SES email)

Спасибо!

person Zack    schedule 05.06.2018

В аналогичном случае я попытался выполнить тот же процесс отладки, но не получил вывода журнала. Кроме того, исходящее правило для моего экземпляра airflow ec2 открыто для всех портов и IPS, поэтому должны быть другие причины.

Я заметил, что когда вы создаете учетные данные SMTP из SES, он также создает пользователя IAM. Я не уверен, как работает воздушный поток в вашем случае (голый металл на экземпляре ec2 или завернутый в контейнеры) и как настроен доступ этого пользователя.

person Richard    schedule 06.05.2021