2022.04.27 コラム
- Airflow
- データ基盤構築
- テックコラム
【テックコラム】Airflowでslack通知を簡単に実装する方法
● はじめに
こんにちは。テクノロジー本部の奥村です。
最近、ワークフローの1つである「Airflow」を使用して、データ基盤の構築をする機会が増えてきました。Airflowでは各処理をタスクとして定義し、複雑な依存関係を表すことができます。今回はAirflowでのワークフローや各タスクの実行結果やエラーなどをslackに通知するときの実装方法を紹介したいと思います。
検証した環境
- Airflowバージョン:2.2.3
- apache-airflow-providers-slack=4.2.3
● Airflowでのslack通知の方法
Airflowでslack通知を行う方法としては大きく分けて下記の2つの方法があります。
- SlackWebhookOperatorを使用
- SlackのAPIを使用
それぞれ用途が異なるので状況に応じて使い分けるのが良いと思います。これからそれぞれの用途と実装方法を説明していきたいと思います。
● SlackWebhookOperatorを使用して通知する方法
AirflowにはSlackWebhookOperatorというオペレーターが用意されており、タスクの1つとしてslackに通知したいという用途であれば、この方法が一番楽に実装ができます。
下記がSlackWebhookOperatorの実装のサンプルになります。1つポイントとなるのがあらかじめAirflowのコネクションにslackのincoming webhook URLを設定しておき、http_conn_idに指定する必要がある点になります。このときコネクションのhostにincoming webhookのURLを設定してあげるだけで大丈夫です。
DAGのサンプル
from datetime import datetime, timedelta
import airflow
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
# 開始日とスケジュールを設定
START_DATE = datetime(2022, 1, 1)
SCHEDULE_INTERVAL = timedelta(days=1)
default_args = {
'depends_on_past': False,
'retries': 0,
}
with airflow.DAG('test_slack',
catchup=False,
schedule_interval=SCHEDULE_INTERVAL,
start_date=START_DATE,
default_args=default_args) as dag:
post_slack = SlackWebhookOperator(task_id='post_slack',
http_conn_id='test_slack',
message='Hello Airflow!',
username='Airflow',
icon_emoji=':bowtie:')
# タスク実行
post_slack
slack通知のイメージ

● SlackのAPIを使用して通知する方法
こちらはslackのincoming webhook URLに対してリクエストを送る処理を行うpythonのコードを作成する方法になります。基本的にはpythonの関数を実装し、各DAGで共通で使用するのが良いかと思います。AirflowのDAGオブジェクトの引数には「on_failure_callback」「on_success_callback」などが用意されており、タスクやDAGが成功したときや失敗したときに任意の関数を実行することができます。よく使用される用途としては「on_failure_callback」を設定し、タスクが失敗したときにslackに通知を送る場合になるかと思います。
DAGのサンプル
from datetime import datetime, timedelta
import airflow
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from notice.slack import post_error_slack
START_DATE = datetime(2022, 1, 1)
SCHEDULE_INTERVAL = timedelta(days=1)
def failure_callback(context):
URL = Variable.get('slack_url')
post_error_slack(URL, context)
default_args = {
'depends_on_past': False,
'retries': 0,
'on_failure_callback': failure_callback,
}
with airflow.DAG('test_slack_error',
catchup=False,
schedule_interval=SCHEDULE_INTERVAL,
start_date=START_DATE,
default_args=default_args) as dag:
# わざとエラーにする
test_error = BashOperator(task_id='test_error', bash_command='exit 1')
# タスク実行
test_error
DAGや開発環境・本番環境によって、Slackの通知先を変えることなどがあると思うので通知先のURLはAirflow変数に登録しておき、それを呼び出す形にしてあげると便利です。
notice/slack.py
from slack_sdk.webhook import WebhookClient
def post_error_slack(url,
context,
username='Airflow',
icon_emoji=':bangbang:'):
"""
'on_failure_callback'時にslackへの通知を行う
Args:
url (string): slackのIncoming Webhooks の URL
context (dict): 実行したタスクのコンテキスト
username (string): slack通知のユーザ名
icon_emoji (string): slack通知のアイコン
Returns:
"""
webhook = WebhookClient(url)
blocks = [{
"type":
"section",
"text": {
"type": "plain_text",
"text": "Failed to execute DAG.",
},
"fields": [{
"type": "mrkdwn",
"text": "*DAG*"
}, {
"type": "mrkdwn",
"text": "*Task*"
}, {
"type": "plain_text",
"text": f"{context['dag']}"
}, {
"type": "plain_text",
"text": f"{context['task']}"
}]
}]
body = {
'text': 'fallback',
'blocks': blocks,
'icon_emoji': icon_emoji,
'username': username
}
response = webhook.send_dict(body=body)
slack通知のイメージ

● さいごに
このようにSlackWebhookOperatorや共通化した関数を作成することで簡単にslackへの通知を実装することができます。Airflowでワークフローを実装する際は参考にしていただけばと思います。
また弊社では事業会社様のデジタルマーケティング活動の支援を行っています。
自社に専門人材がいない、リソースが足りない等の課題をお持ちの方に、エンジニア領域の支援サービス(Data Engineer Hub)をご提供しています。
お困りごとございましたら是非お気軽にご相談ください。

本件に関するお問い合わせは下記にて承ります。
株式会社DataCurrent
info@datacurrent.co.jp