Column

コラム

  • 【テックコラム】BigQuery Data Transfer...

【テックコラム】BigQuery Data Transfer Service の運用にまつわる Tips ~監視編~

こんにちは、DataCurrent の不破です。

今回は、BigQuery Data Transfer Service の転送ジョブの監視方法についてご紹介したいと思います。

BigQuery Data Transfer Service とは、Amazon S3 や Google Cloud Storage 等、様々なデータソースから BigQuery にデータを転送することができる機能です。外部から BigQuery にデータを取り込みたい場合に、BigQuery Data Transfer Service でサポートされているデータソースであれば、コンソール上でポチポチと設定するだけで簡単にデータ転送を行うことができます。

BigQuery Data Transfer Service のデバッグ方法については、下記のコラムもぜひご参考ください。

【テックコラム】BigQuery Data Transfer Service の運用にまつわる Tips ~デバッグ編~

※この記事は、2025年4月時点に執筆したものであることをご留意ください。

BigQuery Data Transfer Service の監視方法

デバッグ編の記事でもお伝えしましたが、普段 BigQuery Data Transfer Service を利用してデータ転送を実施していると、時々転送エラーに遭遇することがあります。

例えば Cloud Storage 上のファイルを BigQuery に転送する場合、以下のような原因でエラーになるケースがあります。

  • 転送先の BigQuery テーブルのスキーマと、転送元のファイルのスキーマが一致していない
  • etc…

そうした時にいち早くエラーを検知して原因を確認できるよう、本番運用時には予めエラーを検知するための監視の仕組みを用意しておく必要があります。

監視方法として、私が担当する業務では転送ジョブの性質や状況に応じて大きく下記3つのパターンを使い分けています。

  • パターン1(難易度低):BigQuery Data Transfer Service の「通知オプション」から「メール通知」を「ON」にする
  • パターン2(難易度中):Cloud Logging から対象の BigQuery Data Transfer Service のエラーログを検索し、アラートポリシーに設定する
  • パターン3(難易度高):Cloud Run 関数に Slack 等に通知するためのコードをデプロイし、BigQuery Data Transfer Service の「通知オプション」から「Pub/Sub 通知」を「ON」にして、Pub/Sub 経由で Cloud Run 関数をトリガーする

各パターンの特徴や設定方法、メリット・デメリットについては以下にご説明します。

パターン1(難易度低):BigQuery Data Transfer Service の「通知オプション」から「メール通知」を「ON」にする

BigQuery Data Transfer Service の転送設定を作成する際に、「通知オプション」>「メール通知」という設定項目があります。こちらを「ON」に設定すると、転送ジョブが失敗した場合に、転送設定を作成した人(転送管理者)のメールアドレス宛にメールが通知されます。

■ 転送設定画面のイメージ

転送設定画面のイメージ

■ 通知メールのイメージ

通知メールのイメージ

パターン1 のメリットは何といっても設定が簡単で、BigQuery Data Transfer Service のコンソール上で設定を完結できることです。

デメリットとしては、現状は通知先を自由に選択することができず、転送設定作成者のメールアドレスのみにしか通知されないため、複数人で監視したい場合や、メール以外の手段で通知を受け取りたい場合は不向きになります。

例えば緊急性がそこまで高くないジョブや、複数人で確認する必要のないジョブ、失敗したとしても後続のタスクでカバーできるようなジョブにおいては、パターン1の方法で必要十分なケースもあるかと思います。

パターン2(難易度中):Cloud Logging から対象の BigQuery Data Transfer Service のエラーログを検索し、アラートポリシーに設定する

次にご紹介するのは、BigQuery Data Transfer Service で発生したエラーログを Cloud Logging のロギング機能で検知し、Cloud Monitoring のアラート機能を使用して指定した通知先へアラートを通知するという方法です。

アラートの通知先には複数のメールアドレスや Slack チャンネル等を任意で指定することができるため、例えば仮に会社のコミュニケーションツールとして Slack を使用している場合は、社内の関係者が参加する Slack チャンネル宛にリアルタイムでエラーを通知することもできます。

通知先に登録できるサービスの種類は、Google の公式ドキュメント(通知チャンネルを作成する)もご参考ください。

具体的なアラートの作成方法は以下の通りです。

  1. 通知したい BigQuery Transfer Service の「Config ID」を控えておく。
    a. Config ID:転送設定画面 >「構成」>「リソース名」に含まれる「Configs/…」以降の値をメモする
アラートの作成方法①
  1. 下記の形式でクエリを作成する。
    a. 「resource.labels.config_id」に、1 で控えておいた Config ID を指定する
      i. 複数の転送設定をまとめて監視・通知したい場合は、「resource.labels.config_id=(“Config ID1” OR “Config ID2”)」の形式で指定することも可能
    b. 「resource.labels.location」に指定するリージョンは、転送設定に指定したリージョンと合わせる(例:us)
resource.type="bigquery_dts_config"
resource.labels.config_id="控えておいた Config ID"
resource.labels.location="リージョン名"
severity>=ERROR
  1. Cloud Logging コンソール >「ログエクスプローラ」から上記クエリを実行し、ログ結果画面に表示される「操作」メニューから「ログアラートの作成」をクリックする
アラートの作成方法②
  1. 画面右側に表示される「ログベースのアラートポリシーの作成」から、アラート名や通知の頻度等を指定し、通知先のチャンネルにチェックを入れて保存する
    a. 通知先のチャンネルは、Cloud Monitoring コンソール >「アラート」>「Edit notification channels」から追加が可能
アラートの作成方法③
  1. エラーが発生すると、指定したチャンネル宛にエラー通知が送信される(以下は Slack の例)
アラートの作成方法④

パターン2 のメリットとしては、冒頭でもお伝えした通り、複数人でエラーを検知したい場合に任意のメールアドレスや Slack チャンネル等を指定できることです。また、パターン1 と同様に Google Cloud コンソール上で設定が完結するため、実装が比較的簡単になります。

デメリットとしては、選択できる通知先がメールアドレスや Slack 等の一部のサービスに限定されており、例えば Teams 等は現状サポートされていないため、別途通知システムを開発する等の対応が必要となります。また Google の公式ドキュメント(アラートの料金)によると、2026年4月以降、Cloud Monitoring でアラートに対する課金が開始されるというアナウンスがあり、アラートポリシーの条件あたり月額$1.50かかるとされているので、こちらも考慮が必要です。

監視設計に手間はかけたくないものの、複数人でしっかりエラーを検知してできるだけ早く対応したいという場合はパターン2 がお勧めです。

パターン3(難易度高):Cloud Run 関数に Slack 等に通知するためのコードをデプロイし、BigQuery Data Transfer Service の「通知オプション」から「Pub/Sub 通知」を「ON」にして、Pub/Sub 経由で Cloud Run 関数をトリガーする

最後にご紹介するのは、BigQuery Data Transfer Service の転送ジョブの実行イベントをトリガーとして Pub/Sub 経由で Cloud Run 関数を実行し、関数内でイベント内容を解析し Slack 等の外部ツールに通知する方法です。

以下に、簡単な構築の流れをご説明します。

  1. Pub/Sub トピックを作成
  2. BigQuery Transfer Service の転送設定画面 >「通知オプション」>「Pub/Sub 通知」を ON に設定し、1 で作成したトピックを設定する
  3. Cloud Run 関数を作成し、1 で作成したトピックをトリガーに設定する

以下は、S3 から BigQuery への転送ジョブのイベントデータを受け取り、内容を解析して Slack へ通知する Python コードのサンプルです。Slack への通知には、Incoming Webhook URL を使用しています。(※あくまでサンプルですので、動作を確認せずそのまま流用することはお控えください)

■ Python コードのサンプル

import base64
import logging
import json
import os

import requests

logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(filename)s:%(lineno)3s [%(levelname)s] %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

WEB_HOOK_URL = os.environ['WEB_HOOK_URL']


def create_json_format(event_data_str):
    """
    Pub/Sub経由で渡された BQ Data Transfer Service の S3-BQ 転送ジョブイベントデータを受け取り
    Slack通知用のメッセージを作成
    """
    event_data = json.loads(event_data_str)
    source_s3_path = event_data["params"]["data_path"]
    destination_bq_table = event_data["params"]["destination_table_name_template"]
    transfer_state = event_data["state"]

    params = {
        'username': 'transfer-check',
        'icon_emoji': ':four_leaf_clover:',
        'title': 'データ転送結果',
        'status': f'ステータス: {transfer_state}',
        'message': f'転送元s3パス: {source_s3_path}\n転送先BQテーブル: {destination_bq_table}'
    }

    if transfer_state == 'SUCCEEDED':
        params['color'] = 'good'

    else:
        params['color'] = 'danger'
        params['message'] += f'\nエラーメッセージ: 転送が失敗しました。\nevent_data: {event_data}'

    attachments = [{
            'color': params['color'],
            'title': params['title'],
            'text': params['status'] + params['message']
        }]

    body = dict(
            username=params['username'],
            icon_emoji=params['icon_emoji'], 
            attachments=attachments
        )

    slack_json_data = json.dumps(body).encode('utf-8')

    return slack_json_data


def execute_slack_request(event_data):
    """
    結果をSlackへ通知
    """
    slack_json_data = create_json_format(event_data)
    headers = {'Content-Type': 'application/json'}

    logger.info('START: execute slack request')
    slack_res = requests.post(WEB_HOOK_URL, data=slack_json_data, headers=headers)

    if slack_res.status_code == 200:
        logger.info(f'END: Slack notice succeeded')
    else:
        logger.error(f'END: Slack notice failed: {slack_res.status_code}')


def send_slack(event, context):
    """
    Cloud Pub/Sub トピックのメッセージ通知でトリガー
    """
    event_data = base64.b64decode(event["data"]).decode("utf-8")
    execute_slack_request(event_data)

■ Slack 通知のイメージ

Slack 通知のイメージ

パターン1、2 と比べてコード開発を伴うので実装コストは一番高いですが、通知内容はコード内で比較的自由にカスタマイズすることができるので、ぱっと見で分かりやすいフォーマットに整えた上で通知することができるのはパターン3 の良いところです。

また Slack だけでなくコード開発を用いて通知が可能なサービス宛に通知したい場合も、同様のフローで監視の仕組みを構築することが可能です。

まとめ

今回は、BigQuery Data Transfer Service の監視方法について、いくつかのパターンをご紹介させて頂きました。

どのパターンを採用するかは、転送ジョブが失敗したときの緊急度や影響範囲、リカバリフローなどを考慮した上でご検討頂くことをお勧めいたします。

最後に

自社に専門人材がいない、リソースが足りない等の課題をお持ちの方に、エンジニア領域の支援サービス(Data Engineer Hub)をご提供しています。 お困りごとございましたら是非お気軽にご相談ください。

本記事に関するお問い合わせは下記にて承ります。
株式会社DataCurrent
info@datacurrent.co.jp

人気のコラムランキング

PICK UP

企業のDX推進におけるダッシュボード内製化について

DXmarketingPICK UP コラムダッシュボード内製化

企業のDX推進に向けた人材教育支援について

GA4marketingPICK UP コラム内製化

【データプライバシーコラム】電気通信事業法改正の解説(2022年7月時点)

CMPPICK UP コラムデータプライバシーデータプライバシーコラム個人情報保護

CMP導入時の注意点

CMPPICK UP コラムデータプライバシーデータプライバシーコラム個人情報保護

TOPへ
戻る