Column

コラム

  • 【テックコラム】Airflowでslack通知を簡単に実装す...

【テックコラム】Airflowでslack通知を簡単に実装する方法

● はじめに


こんにちは。テクノロジー本部の奥村です。

最近、ワークフローの1つである「Airflow」を使用して、データ基盤の構築をする機会が増えてきました。Airflowでは各処理をタスクとして定義し、複雑な依存関係を表すことができます。今回はAirflowでのワークフローや各タスクの実行結果やエラーなどをslackに通知するときの実装方法を紹介したいと思います。

検証した環境

  • Airflowバージョン:2.2.3
  • apache-airflow-providers-slack=4.2.3

● Airflowでのslack通知の方法


Airflowでslack通知を行う方法としては大きく分けて下記の2つの方法があります。

  • SlackWebhookOperatorを使用
  • SlackのAPIを使用

それぞれ用途が異なるので状況に応じて使い分けるのが良いと思います。これからそれぞれの用途と実装方法を説明していきたいと思います。

● SlackWebhookOperatorを使用して通知する方法


AirflowにはSlackWebhookOperatorというオペレーターが用意されており、タスクの1つとしてslackに通知したいという用途であれば、この方法が一番楽に実装ができます。

下記がSlackWebhookOperatorの実装のサンプルになります。1つポイントとなるのがあらかじめAirflowのコネクションにslackのincoming webhook URLを設定しておき、http_conn_idに指定する必要がある点になります。このときコネクションのhostにincoming webhookのURLを設定してあげるだけで大丈夫です。

DAGのサンプル

from datetime import datetime, timedelta

import airflow
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

# 開始日とスケジュールを設定
START_DATE = datetime(2022, 1, 1)
SCHEDULE_INTERVAL = timedelta(days=1)

default_args = {
    'depends_on_past': False,
    'retries': 0,
}

with airflow.DAG('test_slack',
                 catchup=False,
                 schedule_interval=SCHEDULE_INTERVAL,
                 start_date=START_DATE,
                 default_args=default_args) as dag:

    post_slack = SlackWebhookOperator(task_id='post_slack',
                                      http_conn_id='test_slack',
                                      message='Hello Airflow!',
                                      username='Airflow',
                                      icon_emoji=':bowtie:')

    # タスク実行
    post_slack

slack通知のイメージ


slack通知のイメージ

● SlackのAPIを使用して通知する方法


こちらはslackのincoming webhook URLに対してリクエストを送る処理を行うpythonのコードを作成する方法になります。基本的にはpythonの関数を実装し、各DAGで共通で使用するのが良いかと思います。AirflowのDAGオブジェクトの引数には「on_failure_callback」「on_success_callback」などが用意されており、タスクやDAGが成功したときや失敗したときに任意の関数を実行することができます。よく使用される用途としては「on_failure_callback」を設定し、タスクが失敗したときにslackに通知を送る場合になるかと思います。

DAGのサンプル

from datetime import datetime, timedelta

import airflow
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator

from notice.slack import post_error_slack

START_DATE = datetime(2022, 1, 1)
SCHEDULE_INTERVAL = timedelta(days=1)

def failure_callback(context):
    URL = Variable.get('slack_url')
    post_error_slack(URL, context)

default_args = {
    'depends_on_past': False,
    'retries': 0,
    'on_failure_callback': failure_callback,
}

with airflow.DAG('test_slack_error',
                 catchup=False,
                 schedule_interval=SCHEDULE_INTERVAL,
                 start_date=START_DATE,
                 default_args=default_args) as dag:

    # わざとエラーにする
    test_error = BashOperator(task_id='test_error', bash_command='exit 1')

    # タスク実行
    test_error

DAGや開発環境・本番環境によって、Slackの通知先を変えることなどがあると思うので通知先のURLはAirflow変数に登録しておき、それを呼び出す形にしてあげると便利です。

notice/slack.py

from slack_sdk.webhook import WebhookClient


def post_error_slack(url,
                     context,
                     username='Airflow',
                     icon_emoji=':bangbang:'):
    """
    'on_failure_callback'時にslackへの通知を行う

    Args:
        url (string): slackのIncoming Webhooks の URL
        context (dict): 実行したタスクのコンテキスト
        username (string): slack通知のユーザ名
        icon_emoji (string): slack通知のアイコン

    Returns:
    """

    webhook = WebhookClient(url)

    blocks = [{
        "type":
        "section",
        "text": {
            "type": "plain_text",
            "text": "Failed to execute DAG.",
        },
        "fields": [{
            "type": "mrkdwn",
            "text": "*DAG*"
        }, {
            "type": "mrkdwn",
            "text": "*Task*"
        }, {
            "type": "plain_text",
            "text": f"{context['dag']}"
        }, {
            "type": "plain_text",
            "text": f"{context['task']}"
        }]
    }]

    body = {
        'text': 'fallback',
        'blocks': blocks,
        'icon_emoji': icon_emoji,
        'username': username
    }

    response = webhook.send_dict(body=body)

slack通知のイメージ


slack通知のイメージ

● さいごに


このようにSlackWebhookOperatorや共通化した関数を作成することで簡単にslackへの通知を実装することができます。Airflowでワークフローを実装する際は参考にしていただけばと思います。

また弊社では事業会社様のデジタルマーケティング活動の支援を行っています。

自社に専門人材がいない、リソースが足りない等の課題をお持ちの方に、エンジニア領域の支援サービス(Data Engineer Hub)をご提供しています。
お困りごとございましたら是非お気軽にご相談ください。

》エンジニア領域支援サービスの詳細はこちら

Data Engineer Hubサービス 資料イメージ

本件に関するお問い合わせは下記にて承ります。
株式会社DataCurrent
info@datacurrent.co.jp

人気のコラムランキング

PICK UP

企業のDX推進に向けた人材教育支援について

GA4marketingPICK UP コラム内製化

【データプライバシーコラム】電気通信事業法改正の解説(2022年7月時点)

CMPPICK UP コラムデータプライバシーデータプライバシーコラム個人情報保護

CMP導入時の注意点

CMPPICK UP コラムデータプライバシーデータプライバシーコラム個人情報保護

Treasure Data CDPを活用したOneID(統合ID)構築4 日付の落とし穴

CDPCDP活用ID統合PICK UP コラム

TOPへ
戻る