- обновлен 6/8 с работающей SES
вот моя запись о том, как мы все это заработали. Внизу этого ответа есть небольшое резюме.
Пара важных моментов:
Мы решили не использовать Amazon SES, а использовать sendmail Теперь у нас есть и работает SES.
- Именно воздушный поток обслуживает функции
email_on_failure
и email_on_retry
. Вы можете сделать journalctl –u airflow-worker –f
для наблюдения за ним во время прогона Dag. На рабочем сервере вам НЕ нужно перезапускать airflow-worker после изменения airflow.cfg
с новыми настройками smtp - он должен быть загружен автоматически. Не нужно беспокоиться о том, чтобы испортить запущенные Dags.
Вот техническое описание того, как использовать sendmail:
Поскольку мы перешли с ses на sendmail на локальном хосте, нам пришлось изменить наши настройки smtp в файле airflow.cfg
.
Новый конфиг:
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = localhost
smtp_starttls = False
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
#smtp_user = not used
#smtp_password = not used
smtp_port = 25
smtp_mail_from = [email protected]
Это работает как в производственных, так и в локальных случаях воздушного потока.
Некоторые распространенные ошибки, которые могут возникнуть, если их конфигурация не похожа на мою выше:
socket.error: [Errno 111] Connection refused
- вы должны изменить строку smtp_host
в airflow.cfg
на localhost
smtplib.SMTPException: STARTTLS extension not supported by server.
- вы должны изменить свой smtp_starttls
в airflow.cfg
на False
В моем локальном тестировании я попытался просто заставить воздушный поток показать журнал того, что происходило, когда он пытался отправить электронное письмо - я создал поддельный даг следующим образом:
# Airflow imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
# General imports
from datetime import datetime,timedelta
def throwerror():
raise ValueError("Failure")
SPARK_V_2_2_1 = '3.5.x-scala2.11'
args = {
'owner': ‘me’,
'email': ['me@myjob'],
'depends_on_past': False,
'start_date': datetime(2018, 5,24),
'end_date':datetime(2018,6,28)
}
dag = DAG(
dag_id='testemaildag',
default_args=args,
catchup=False,
schedule_interval="* 18 * * *"
)
t1 = DummyOperator(
task_id='extract_data',
dag=dag
)
t2 = PythonOperator(
task_id='fail_task',
dag=dag,
python_callable=throwerror
)
t2.set_upstream(t1)
Если вы выполните journalctl -u airflow-worker -f
, вы увидите, что работник говорит, что он отправил электронное письмо с предупреждением о сбое на адрес электронной почты в вашей DAG, но мы все еще не получали это электронное письмо. Затем мы решили заглянуть в почтовые журналы sendmail, выполнив cat /var/log/maillog
. Мы видели такой журнал:
Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: connect from localhost[127.0.0.1]
Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: ID: client=localhost[127.0.0.1]
Jun 5 14:10:25 production-server-ip-range postfix/cleanup[port]: ID: message-id=<randomMessageID@production-server-ip-range-ec2-instance>
Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: disconnect from localhost[127.0.0.1]
Jun 5 14:10:25 production-server-ip-range postfix/qmgr[port]: MESSAGEID: from=<[email protected]>, size=1297, nrcpt=1 (queue active)
Jun 5 14:10:55 production-server-ip-range postfix/smtp[port]: connect to aspmx.l.google.com[smtp-ip-range]:25: Connection timed out
Jun 5 14:11:25 production-server-ip-range postfix/smtp[port]: connect to alt1.aspmx.l.google.com[smtp-ip-range]:25: Connection timed out
Так что это, наверное, самый большой момент "О, черт возьми". Здесь мы можем увидеть, что на самом деле происходит в нашей службе smtp. Мы использовали telnet, чтобы подтвердить, что нам не удалось подключиться к целевым диапазонам IP-адресов из Gmail.
Мы определили, что письмо было отправлено, но служба sendmail не смогла успешно подключиться к диапазонам IP-адресов.
Мы решили разрешить весь исходящий трафик через порт 25 в AWS (поскольку наша производственная среда воздушного потока является экземпляром ec2), и теперь она успешно работает. Теперь мы можем получать электронные письма о сбоях и повторных попытках (подсказка: email_on_failure
и email_on_retry
по умолчанию установлены как True
в вашей DAG Справочник по API - вам не нужно помещать его в свои аргументы, если вы не хотите, но по-прежнему рекомендуется явно указывать в нем True или False).
SES сейчас работает. Вот конфигурация воздушного потока:
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 587
smtp_mail_from = [email protected] (Verified SES email)
Спасибо!
person
Zack
schedule
05.06.2018