2022.11.21 コラム
【テックコラム】Cloud Run で Cloud Composer の重いタスクを分離する
● はじめに
こんにちは。テクノロジー本部の奥村です。
前回に引き続き Cloud Composer(Airflow)に関するコラムを投稿したいと思います!
Cloud Composer(Airflow)はワークフローエンジンであるため、一般的に重いタスクは実行するのに適していません。小さなタスクであれば PythonOperator などで実行すればよいですが、もし重いタスクの処理が必要な場合は、タスクの実行環境を用意しそちらで実行することが望ましいです。
今回は重たい処理を実行する際などに活用できる、処理を Cloud Run に外出しする方法を紹介したいと思います。
※実際にはマシンのスペックを増やしたり、ワーカーを増やしたりすることで、Airflow で重たいタスクでも実行できるかとは思いますが、コストパフォーマンスが良くないかなと思います。
●想定する読者(必要な前提知識)
Airflow, GCP, Docker を使用したことがある
●検証した環境
composer-2.0.9-airflow-2.2.3
● Cloud Run とは?
Google のスケーラブルなインフラストラクチャ上でコンテナを直接実行できるマネージドコンピューティングプラットフォームです。
https://cloud.google.com/run/docs/overview/what-is-cloud-run?hl=ja
● Cloud Run で API を作成する
タスクを実行する環境は Cloud Run を使用します。Cloud Run は Docker コンテナを動かすことができるので任意の言語を使用して処理を実装することが可能です。Cloud Run で API を作成して、Cloud Composer からその API を呼び出すことでタスクを実行します。今回は Python で処理を実装しました。作成手順は下記のようになります。
1.コンテナイメージを作成する
ディレクトリ構成
~/cloud_run_api/ - Dockerfile - main.py - requirements.txt
Dockerfile
FROM python:3.10.8-slim-bullseye # Change timezone RUN ln -sf /usr/share/zoneinfo/Asia/Tokyo /etc/localtime # Allow statements and log messages to immediately appear in the Knative logs ENV PYTHONUNBUFFERED True # Copy local code to the container image. ENV APP_HOME /app WORKDIR $APP_HOME COPY . ./ # Install production dependencies. RUN pip install --no-cache-dir -r requirements.txt # Run the web service on container startup. Here we use the gunicorn # webserver, with one worker process and 8 threads. # For environments with multiple CPU cores, increase the number of workers # to be equal to the cores available. # Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling. CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
main.py
import os from flask import Flask, request app = Flask(__name__) @app.route('/test') def test(): req = request.args task = req.get('task') print(f'Execute task. [{task}]') return f'OK [{task}]', 200 if __name__ == '__main__': app.run(debug=False, host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))
requirements.txt
Flask==2.2.2 gunicorn==20.1.0
Docker イメージを作成
docker image build -t asia-northeast1-docker.pkg.dev/******** .
イメージをプッシュ
docker image push asia-northeast1-docker.pkg.dev/********
※今回はArtifact Registryを使用しています。リポジトリは事前に作成し、「********」部分は作成した任意のリポジトリにしてください。
2. デプロイする
WEB コンソールや gcloud コマンドを使用して Cloud Run に作成したコンテナイメージでデプロイします。
※CPU やメモリ、インスタンス台数はタスクの重さなどによって適宜変更してください。
3. API の動作確認
Cloud Run の URL [https:/~.run.app/test?task=test_1] にブラウザなどでリクエストして、API が動作しているか確認します。
「OK [test_1]」とレスポンスがあれば API が正常に作成できています。
● Cloud Composer から API を呼び出す
API としてタスクを実行する準備ができたので、その API を呼ぶタスクを DAG に記載します。
Airflow には SimpleHttpOperator という HTTP リクエストを簡単に実行できるオペレータがあるので今回はそれを使用します。
DAG のサンプル
import airflow from airflow.providers.http.operators.http import SimpleHttpOperator # 開始日とスケジュールを設定 START_DATE = airflow.utils.dates.days_ago(1) SCHEDULE_INTERVAL = "00 10 * * *" default_args = { 'depends_on_past': False, 'retries': 0, } with airflow.DAG('test_http', catchup=False, schedule_interval=SCHEDULE_INTERVAL, start_date=START_DATE, default_args=default_args) as dag: http_req = SimpleHttpOperator( task_id='http_req', method='GET', endpoint='test', data={ "task": 'test_task', }, headers={}, http_conn_id='cloud_run_test', ) # タスク実行 http_req
この DAG を実行する前に、Cloud Composer の Connections を下記の値で作成してください。
- Connection Id: cloud_run_test
- host: {作成した Cloud Run のエンドポイント}
DAG 実行後エラーにならなければ、Cloud Run でのタスクが問題なく実行できたかと思います。今回はテストなので簡単なタスクを Cloud Run で実行するようにしましたが、同じような手順で重いタスクも実行することが可能です。ただし、注意が必要なのが、Cloud Run の最大実行時間が現時点では1時間となっている点です。このためそれ以上に時間がかかるタスクは実行できません。その場合はタスクを分割したり、並列で実行したりと工夫が必要です。
● さいごに
このようにして、重いタスクを Cloud Composer から分離することができます。これによって、Airflow のリソースがタスクによってひっ迫する心配がなくります。また、Python 以外の処理を実行したり、開発を DAG の開発と API の作成側で分けることができるので、Python が少し分かる人限定になりますが非エンジニアの人にも DAG を作成してもらうことも可能になります。
実装する際にはぜひ参考にしてみて下さい!
また弊社では事業会社様のデジタルマーケティング活動の支援を行っています。
自社に専門人材がいない、リソースが足りない等の課題をお持ちの方に、エンジニア領域の支援サービス(Data Engineer Hub)をご提供しています。
お困りごとございましたら是非お気軽にご相談ください。