Column

コラム

  • 【テックコラム】Dataplex のカスタムエントリーを試し...

【テックコラム】Dataplex のカスタムエントリーを試してみた

DataCurrent の金子です。今回は、Google Cloud の Dataplex でカスタムエントリーを作成する方法を試してみました。

※ この記事は、2026年2月時点の Google Cloud の仕様に基づいています。

Dataplex カスタムエントリーとは

Dataplex では、BigQuery や Cloud Storage といった Google Cloud リソースのメタデータを自動的にカタログ化します。一方で、Dataplex が標準でサポートしていないデータセットやリソースについても、コンソールや API を介して手動で登録することで、カタログ管理の対象とすることができます。これを実現するのが「カスタムエントリー」機能です。

カスタムエントリーを使用することで、組織内の多様なデータ資産を一元的に管理し、横断的な検索が可能になります。

試したこと

今回は、Dataplex で SQL クエリーが検索できるのかを試したかったこともあり、オープンソースのデータセットを使って、SQL クエリーを Dataplex のカスタムエントリーとして登録するフローを試してみました。

サンプルデータ

使用したデータセットは、BIRD-bench プロジェクトに含まれる mini_dev のデータ (mini_dev_sqlite.json) です。 以下のリポジトリーから取得しました。

https://github.com/bird-bench/mini_dev/tree/main/llm/mini_dev_data

このデータセットには、自然言語による質問 (question) と、その回答となる SQL クエリー (SQL) のペアが含まれています。 SQL のみをメタデータとして登録し、「その SQL が意図している内容(ユーザーの質問)」で検索してヒットするかを確認することで、Dataplex のセマンティック検索の精度を検証するための「正解データ」として利用できます。

mini_dev_sqlite.json の例:

[
  {
    "question_id": 1471,
    "db_id": "debit_card_specializing",
    "question": "What is the ratio of customers who pay in EUR against customers who pay in CZK?",
    "evidence": "ratio of customers who pay in EUR against customers who pay in CZK = count(Currency = 'EUR') / count(Currency = 'CZK').",
    "SQL": "SELECT CAST(SUM(IIF(Currency = 'EUR', 1, 0)) AS FLOAT) / SUM(IIF(Currency = 'CZK', 1, 0)) AS ratio FROM customers",
    "difficulty": "simple"
  },
  ...
]

今回は、このデータセットに含まれる SQL フィールドの値を、Dataplex 上で管理される「エントリー」として登録し、question フィールドの内容で検索ができるか試してみます。その他のフィールドは今回は使用しません。

実装の流れ

カスタムエントリーを登録するには、以下のステップが必要です。 この流れは、公式ドキュメント Manage entries and ingest from custom sources に記載されている統合プロセスに沿っています。

  1. カスタム Aspect Type の作成: エントリーに付与するメタデータのテンプレートを定義します。
  2. カスタム Entry Type の作成: エントリーの種類を定義し、必須となる Aspect Type を指定します。
  3. カスタム Entry Group の作成: エントリーを格納するコンテナ(箱)を作成します。
  4. エントリーの登録: 実際にデータをエントリーとして作成します。

以降のコードは、以下の変数が定義されていることを前提とします。

# 共通設定
project_id = "xxx"  # Google Cloud プロジェクト ID
location = "asia-northeast1" # Dataplex ロケーション
PREFIX = "test-kaneko"  # 検証用のリソース名プレフィックス
parent = f"projects/{project_id}/locations/{location}"

# 各リソースの ID
aspect_type_id = f"{PREFIX}-sql-metadata"
entry_type_id = f"{PREFIX}-sql-definition"
entry_group_id = f"{PREFIX}-sql"

# データセットのファイルパス
minidev_json = "mini_dev_sqlite.json" 

client = dataplex_v1.CatalogServiceClient()

1. カスタム Aspect Type の作成

まず、SQL クエリーの情報を格納するためのメタデータ構造(Aspect Type)を定義します。 カスタム Aspect Type の詳細や作成方法については、公式ドキュメント Manage aspects and enrich metadata も併せてご参照ください。

今回はシンプルに、SQL 文自体を格納するフィールド sql_query を持つ Aspect Type を定義しました。Aspect Type の ID には、事前に定義した aspect_type_id{PREFIX}-sql-metadata)を使用しています。

# Aspect Type のテンプレート定義
SQL_ASPECT_TEMPLATE = {
    "name": "sql_query_metadata",
    "index": 1,
    "type_": "record",
    "record_fields": [{
        "name": "sql_query",
        "index": 1,
        "type_": "string"
    }],
}

# Aspect Type の作成
client.create_aspect_type(
    parent=parent,
    aspect_type_id=aspect_type_id,
    aspect_type=dataplex_v1.AspectType(
        description="SQL クエリメタデータ",
        metadata_template=dataplex_v1.AspectType.MetadataTemplate(
            **SQL_ASPECT_TEMPLATE),
    ),
).result()

2. カスタム Entry Type の作成

次に、この Aspect Type を利用する Entry Type を作成します。Entry Type の ID には entry_type_id{PREFIX}-sql-definition)を使用しています。 required_aspects に作成した Aspect Type を指定することで、この種のエントリーには必ず SQL クエリーのメタデータが付与されるよう強制します。

# Entry Type の作成
client.create_entry_type(
    parent=parent,
    entry_type_id=entry_type_id,
    entry_type=dataplex_v1.EntryType(
        description="SQL クエリ定義",
        required_aspects=[
            dataplex_v1.EntryType.AspectInfo(type_=f"{parent}/aspectTypes/{aspect_type_id}")
        ],
    ),
).result()

3. カスタム Entry Group の作成

エントリーをまとめるグループを作成します。Entry Group の ID には entry_group_id{PREFIX}-sql)を使用しています。

# Entry Group の作成
client.create_entry_group(
    parent=parent,
    entry_group_id=entry_group_id,
    entry_group=dataplex_v1.EntryGroup(description="SQL エントリ"),
).result()

4. エントリーの登録

最後に、mini_dev_sqlite.json を読み込み、各クエリーをエントリーとして登録します。 JSON の SQL フィールドを sql_query アスペクトにマッピングします。

なお、aspects のキーには {project_id}.{location}.{aspect_type_id} の形式を指定する必要があります。これは Entry の API リファレンス に定義されている仕様です。

# JSON の読み込み
with open(minidev_json, encoding="utf-8") as f:
    sql_queries = json.load(f)

for item in sql_queries:
    entry_id = f"{PREFIX}-sql-{item['question_id']}"

    # Aspect データの作成
    sql_data = Struct()
    sql_data.update({"sql_query": item["SQL"]})

    try:
        client.create_entry(
            parent=f"{parent}/entryGroups/{entry_group_id}",
            entry_id=entry_id,
            entry=dataplex_v1.Entry(
                entry_type=f"{parent}/entryTypes/{entry_type_id}",
                entry_source=dataplex_v1.EntrySource(
                    system=f"{PREFIX}-sqlite", ), 
                aspects={
                    f"{project_id}.{location}.{aspect_type_id}":
                    dataplex_v1.Aspect(aspect_type=f"{parent}/aspectTypes/{aspect_type_id}",
                                       data=sql_data),
                },
            ),
        )
        print(f"  Entry 作成: {entry_id}")
    except Exception as e:
        print(f"  Entry 作成失敗 ({entry_id}): {e}")

このスクリプトを実行することで、JSON ファイル内の各 SQL クエリーが一つのエントリーとして Dataplex Catalog に登録されます。

Dataplex で検索してみる

登録が完了したエントリーに対して、自然言語の質問文で正しく検索できるかを検証します。 今回はデータセットの先頭 3 件の質問を使って、検索結果に該当するエントリーが含まれるかを確認してみました。

検索検証の仕組み

検証スクリプトでは、データセットの各質問(question)に対して以下の検索クエリーを発行し、Dataplex のセマンティック検索 API を呼び出します。

前述の登録時に system 属性を設定しているため、検索時に system=... でフィルタリングすることで、今回登録したエントリーのみを検索対象としています。 なお、自然言語検索におけるフィルター機能の詳細は、公式ドキュメント Search syntax for Data Catalog をご参照ください。

検証スクリプト (search_sql_entries.py)

# parent, client は登録時と同様に定義済み

with open(minidev_json, encoding="utf-8") as f:
    sql_queries = json.load(f)[:3]  # 先頭 3 件のみ検証

for item in sql_queries:
    qid = item["question_id"]
    expected_entry = f"{PREFIX}-sql-{qid}"
    query = item["question"]
    search_query = f"{query} system={PREFIX}-sqlite"
    print(f"\n[qid={qid}] {search_query}")

    request = dataplex_v1.SearchEntriesRequest(
        name=f"projects/{project_id}/locations/global",
        query=search_query,
        page_size=10,
        semantic_search=True,
    )
    results = list(client.search_entries(request=request))[:10]

    found_rank = None
    for rank, result in enumerate(results, 1):
        entry_name = result.dataplex_entry.name.split("/")[-1]
        if entry_name == expected_entry:
            found_rank = rank
            break

    if found_rank:
        print(f"  => rank {found_rank}")
    else:
        print("  => ヒットなし")

検索結果

実行したところ、3 件中 3 件すべてがヒットしました。

[qid=1471] What is the ratio of customers who pay in EUR against customers who pay in CZK? system=test-kaneko-sqlite
  => rank 1

[qid=1472] In 2012, who had the least consumption in LAM? system=test-kaneko-sqlite
  => rank 1

[qid=1473] What was the average monthly consumption of customers in SME for the year 2013? system=test-kaneko-sqlite
  => rank 3

メタデータとして SQL の説明文を明示的に登録していなくても、Dataplex の自然言語検索を利用すれば、SQL クエリーの内容と質問との間の意味的な関連性を理解し、検索結果の上位にヒットするようです。

まとめ

Dataplex のカスタムエントリー機能を使うことで、標準ではサポートされていないデータや、今回のような SQL クエリー集などもカタログとして管理できるようになります。これにより、組織のナレッジを一ヶ所に集約し、検索性を高めることが期待できます。

最後に

自社に専門人材がいない、リソースが足りない等の課題をお持ちの方に、エンジニア領域の支援サービス(Data Engineer Hub)をご提供しています。お困りごとがございましたら是非お気軽にご相談ください。

本件に関するお問い合わせは下記にて承ります。
株式会社DataCurrent
info@datacurrent.co.jp

人気のコラムランキング

PICK UP

企業のDX推進におけるダッシュボード内製化について

DXmarketingPICK UP コラムダッシュボード内製化

企業のDX推進に向けた人材教育支援について

GA4marketingPICK UP コラム内製化

【データプライバシーコラム】電気通信事業法改正の解説(2022年7月時点)

CMPPICK UP コラムデータプライバシーデータプライバシーコラム個人情報保護

CMP導入時の注意点

CMPPICK UP コラムデータプライバシーデータプライバシーコラム個人情報保護

TOPへ
戻る