Column

コラム

  • 【テックコラム】Cloud Run で Cloud Comp...

【テックコラム】Cloud Run で Cloud Composer の重いタスクを分離する

● はじめに


こんにちは。テクノロジー本部の奥村です。

前回に引き続き Cloud Composer(Airflow)に関するコラムを投稿したいと思います!

Cloud Composer(Airflow)はワークフローエンジンであるため、一般的に重いタスクは実行するのに適していません。小さなタスクであれば PythonOperator などで実行すればよいですが、もし重いタスクの処理が必要な場合は、タスクの実行環境を用意しそちらで実行することが望ましいです。

今回は重たい処理を実行する際などに活用できる、処理を Cloud Run に外出しする方法を紹介したいと思います。

※実際にはマシンのスペックを増やしたり、ワーカーを増やしたりすることで、Airflow で重たいタスクでも実行できるかとは思いますが、コストパフォーマンスが良くないかなと思います。

●想定する読者(必要な前提知識)

Airflow, GCP, Docker を使用したことがある

●検証した環境

composer-2.0.9-airflow-2.2.3

● Cloud Run とは?


Google のスケーラブルなインフラストラクチャ上でコンテナを直接実行できるマネージドコンピューティングプラットフォームです。

https://cloud.google.com/run/docs/overview/what-is-cloud-run?hl=ja

● Cloud Run で API を作成する


タスクを実行する環境は Cloud Run を使用します。Cloud Run は Docker コンテナを動かすことができるので任意の言語を使用して処理を実装することが可能です。Cloud Run で API を作成して、Cloud Composer からその API を呼び出すことでタスクを実行します。今回は Python で処理を実装しました。作成手順は下記のようになります。

1.コンテナイメージを作成する

ディレクトリ構成

~/cloud_run_api/
  - Dockerfile 
  - main.py  
  - requirements.txt

Dockerfile

FROM python:3.10.8-slim-bullseye

# Change timezone
RUN ln -sf /usr/share/zoneinfo/Asia/Tokyo /etc/localtime

# Allow statements and log messages to immediately appear in the Knative logs
ENV PYTHONUNBUFFERED True

# Copy local code to the container image.
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY . ./

# Install production dependencies.
RUN pip install --no-cache-dir -r requirements.txt

# Run the web service on container startup. Here we use the gunicorn
# webserver, with one worker process and 8 threads.
# For environments with multiple CPU cores, increase the number of workers
# to be equal to the cores available.
# Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling.
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app

main.py

import os

from flask import Flask, request

app = Flask(__name__)


@app.route('/test')
def test():
    req = request.args
    task = req.get('task')
    print(f'Execute task. [{task}]')

    return f'OK [{task}]', 200


if __name__ == '__main__':
    app.run(debug=False,
            host='0.0.0.0',
            port=int(os.environ.get('PORT', 8080)))

requirements.txt

Flask==2.2.2
gunicorn==20.1.0

Docker イメージを作成

docker image build -t asia-northeast1-docker.pkg.dev/******** .

イメージをプッシュ

docker image push asia-northeast1-docker.pkg.dev/********

※今回はArtifact Registryを使用しています。リポジトリは事前に作成し、「********」部分は作成した任意のリポジトリにしてください。

2. デプロイする

WEB コンソールや gcloud コマンドを使用して Cloud Run に作成したコンテナイメージでデプロイします。

※CPU やメモリ、インスタンス台数はタスクの重さなどによって適宜変更してください。

3. API の動作確認

Cloud Run の URL [https:/~.run.app/test?task=test_1] にブラウザなどでリクエストして、API が動作しているか確認します。

「OK [test_1]」とレスポンスがあれば API が正常に作成できています。

● Cloud Composer から API を呼び出す


API としてタスクを実行する準備ができたので、その API を呼ぶタスクを DAG に記載します。

Airflow には SimpleHttpOperator という HTTP リクエストを簡単に実行できるオペレータがあるので今回はそれを使用します。

DAG のサンプル

import airflow
from airflow.providers.http.operators.http import SimpleHttpOperator

# 開始日とスケジュールを設定
START_DATE = airflow.utils.dates.days_ago(1)
SCHEDULE_INTERVAL = "00 10 * * *"

default_args = {
    'depends_on_past': False,
    'retries': 0,
}

with airflow.DAG('test_http',
                 catchup=False,
                 schedule_interval=SCHEDULE_INTERVAL,
                 start_date=START_DATE,
                 default_args=default_args) as dag:

    http_req = SimpleHttpOperator(
        task_id='http_req',
        method='GET',
        endpoint='test',
        data={
            "task": 'test_task',
        },
        headers={},
        http_conn_id='cloud_run_test',
    )

    # タスク実行
    http_req

この DAG を実行する前に、Cloud Composer の Connections を下記の値で作成してください。

  • Connection Id: cloud_run_test
  • host: {作成した Cloud Run のエンドポイント}

DAG 実行後エラーにならなければ、Cloud Run でのタスクが問題なく実行できたかと思います。今回はテストなので簡単なタスクを Cloud Run で実行するようにしましたが、同じような手順で重いタスクも実行することが可能です。ただし、注意が必要なのが、Cloud Run の最大実行時間が現時点では1時間となっている点です。このためそれ以上に時間がかかるタスクは実行できません。その場合はタスクを分割したり、並列で実行したりと工夫が必要です。

● さいごに


このようにして、重いタスクを Cloud Composer から分離することができます。これによって、Airflow のリソースがタスクによってひっ迫する心配がなくります。また、Python 以外の処理を実行したり、開発を DAG の開発と API の作成側で分けることができるので、Python が少し分かる人限定になりますが非エンジニアの人にも DAG を作成してもらうことも可能になります。

実装する際にはぜひ参考にしてみて下さい!

また弊社では事業会社様のデジタルマーケティング活動の支援を行っています。

自社に専門人材がいない、リソースが足りない等の課題をお持ちの方に、エンジニア領域の支援サービス(Data Engineer Hub)をご提供しています。
お困りごとございましたら是非お気軽にご相談ください。

》サービスの詳細はこちら

人気のコラムランキング

PICK UP

【データプライバシーコラム】電気通信事業法改正の解説(2022年7月時点)

CMPPICK UP コラムデータプライバシーデータプライバシーコラム個人情報保護

CMP導入時の注意点

CMPPICK UP コラムデータプライバシーデータプライバシーコラム個人情報保護

Treasure Data CDPを活用したOneID(統合ID)構築4 日付の落とし穴

CDPCDP活用ID統合PICK UP コラム

今、CMPは導入するべきか?

CMPPICK UP コラムデータプライバシーデータプライバシーコラム個人情報保護

TOPへ
戻る