2026.02.20 コラム
【テックコラム】Dataplex のカスタムエントリーを試してみた
DataCurrent の金子です。今回は、Google Cloud の Dataplex でカスタムエントリーを作成する方法を試してみました。
※ この記事は、2026年2月時点の Google Cloud の仕様に基づいています。
Dataplex カスタムエントリーとは
Dataplex では、BigQuery や Cloud Storage といった Google Cloud リソースのメタデータを自動的にカタログ化します。一方で、Dataplex が標準でサポートしていないデータセットやリソースについても、コンソールや API を介して手動で登録することで、カタログ管理の対象とすることができます。これを実現するのが「カスタムエントリー」機能です。
カスタムエントリーを使用することで、組織内の多様なデータ資産を一元的に管理し、横断的な検索が可能になります。
試したこと
今回は、Dataplex で SQL クエリーが検索できるのかを試したかったこともあり、オープンソースのデータセットを使って、SQL クエリーを Dataplex のカスタムエントリーとして登録するフローを試してみました。
サンプルデータ
使用したデータセットは、BIRD-bench プロジェクトに含まれる mini_dev のデータ (mini_dev_sqlite.json) です。 以下のリポジトリーから取得しました。
https://github.com/bird-bench/mini_dev/tree/main/llm/mini_dev_data
このデータセットには、自然言語による質問 (question) と、その回答となる SQL クエリー (SQL) のペアが含まれています。 SQL のみをメタデータとして登録し、「その SQL が意図している内容(ユーザーの質問)」で検索してヒットするかを確認することで、Dataplex のセマンティック検索の精度を検証するための「正解データ」として利用できます。
mini_dev_sqlite.json の例:
[
{
"question_id": 1471,
"db_id": "debit_card_specializing",
"question": "What is the ratio of customers who pay in EUR against customers who pay in CZK?",
"evidence": "ratio of customers who pay in EUR against customers who pay in CZK = count(Currency = 'EUR') / count(Currency = 'CZK').",
"SQL": "SELECT CAST(SUM(IIF(Currency = 'EUR', 1, 0)) AS FLOAT) / SUM(IIF(Currency = 'CZK', 1, 0)) AS ratio FROM customers",
"difficulty": "simple"
},
...
]
今回は、このデータセットに含まれる SQL フィールドの値を、Dataplex 上で管理される「エントリー」として登録し、question フィールドの内容で検索ができるか試してみます。その他のフィールドは今回は使用しません。
実装の流れ
カスタムエントリーを登録するには、以下のステップが必要です。 この流れは、公式ドキュメント Manage entries and ingest from custom sources に記載されている統合プロセスに沿っています。
- カスタム Aspect Type の作成: エントリーに付与するメタデータのテンプレートを定義します。
- カスタム Entry Type の作成: エントリーの種類を定義し、必須となる Aspect Type を指定します。
- カスタム Entry Group の作成: エントリーを格納するコンテナ(箱)を作成します。
- エントリーの登録: 実際にデータをエントリーとして作成します。
以降のコードは、以下の変数が定義されていることを前提とします。
# 共通設定
project_id = "xxx" # Google Cloud プロジェクト ID
location = "asia-northeast1" # Dataplex ロケーション
PREFIX = "test-kaneko" # 検証用のリソース名プレフィックス
parent = f"projects/{project_id}/locations/{location}"
# 各リソースの ID
aspect_type_id = f"{PREFIX}-sql-metadata"
entry_type_id = f"{PREFIX}-sql-definition"
entry_group_id = f"{PREFIX}-sql"
# データセットのファイルパス
minidev_json = "mini_dev_sqlite.json"
client = dataplex_v1.CatalogServiceClient()
1. カスタム Aspect Type の作成
まず、SQL クエリーの情報を格納するためのメタデータ構造(Aspect Type)を定義します。 カスタム Aspect Type の詳細や作成方法については、公式ドキュメント Manage aspects and enrich metadata も併せてご参照ください。
今回はシンプルに、SQL 文自体を格納するフィールド sql_query を持つ Aspect Type を定義しました。Aspect Type の ID には、事前に定義した aspect_type_id({PREFIX}-sql-metadata)を使用しています。
# Aspect Type のテンプレート定義
SQL_ASPECT_TEMPLATE = {
"name": "sql_query_metadata",
"index": 1,
"type_": "record",
"record_fields": [{
"name": "sql_query",
"index": 1,
"type_": "string"
}],
}
# Aspect Type の作成
client.create_aspect_type(
parent=parent,
aspect_type_id=aspect_type_id,
aspect_type=dataplex_v1.AspectType(
description="SQL クエリメタデータ",
metadata_template=dataplex_v1.AspectType.MetadataTemplate(
**SQL_ASPECT_TEMPLATE),
),
).result()
2. カスタム Entry Type の作成
次に、この Aspect Type を利用する Entry Type を作成します。Entry Type の ID には entry_type_id({PREFIX}-sql-definition)を使用しています。 required_aspects に作成した Aspect Type を指定することで、この種のエントリーには必ず SQL クエリーのメタデータが付与されるよう強制します。
# Entry Type の作成
client.create_entry_type(
parent=parent,
entry_type_id=entry_type_id,
entry_type=dataplex_v1.EntryType(
description="SQL クエリ定義",
required_aspects=[
dataplex_v1.EntryType.AspectInfo(type_=f"{parent}/aspectTypes/{aspect_type_id}")
],
),
).result()
3. カスタム Entry Group の作成
エントリーをまとめるグループを作成します。Entry Group の ID には entry_group_id({PREFIX}-sql)を使用しています。
# Entry Group の作成
client.create_entry_group(
parent=parent,
entry_group_id=entry_group_id,
entry_group=dataplex_v1.EntryGroup(description="SQL エントリ"),
).result()
4. エントリーの登録
最後に、mini_dev_sqlite.json を読み込み、各クエリーをエントリーとして登録します。 JSON の SQL フィールドを sql_query アスペクトにマッピングします。
なお、aspects のキーには {project_id}.{location}.{aspect_type_id} の形式を指定する必要があります。これは Entry の API リファレンス に定義されている仕様です。
# JSON の読み込み
with open(minidev_json, encoding="utf-8") as f:
sql_queries = json.load(f)
for item in sql_queries:
entry_id = f"{PREFIX}-sql-{item['question_id']}"
# Aspect データの作成
sql_data = Struct()
sql_data.update({"sql_query": item["SQL"]})
try:
client.create_entry(
parent=f"{parent}/entryGroups/{entry_group_id}",
entry_id=entry_id,
entry=dataplex_v1.Entry(
entry_type=f"{parent}/entryTypes/{entry_type_id}",
entry_source=dataplex_v1.EntrySource(
system=f"{PREFIX}-sqlite", ),
aspects={
f"{project_id}.{location}.{aspect_type_id}":
dataplex_v1.Aspect(aspect_type=f"{parent}/aspectTypes/{aspect_type_id}",
data=sql_data),
},
),
)
print(f" Entry 作成: {entry_id}")
except Exception as e:
print(f" Entry 作成失敗 ({entry_id}): {e}")
このスクリプトを実行することで、JSON ファイル内の各 SQL クエリーが一つのエントリーとして Dataplex Catalog に登録されます。
Dataplex で検索してみる
登録が完了したエントリーに対して、自然言語の質問文で正しく検索できるかを検証します。 今回はデータセットの先頭 3 件の質問を使って、検索結果に該当するエントリーが含まれるかを確認してみました。
検索検証の仕組み
検証スクリプトでは、データセットの各質問(question)に対して以下の検索クエリーを発行し、Dataplex のセマンティック検索 API を呼び出します。
前述の登録時に system 属性を設定しているため、検索時に system=... でフィルタリングすることで、今回登録したエントリーのみを検索対象としています。 なお、自然言語検索におけるフィルター機能の詳細は、公式ドキュメント Search syntax for Data Catalog をご参照ください。
検証スクリプト (search_sql_entries.py)
# parent, client は登録時と同様に定義済み
with open(minidev_json, encoding="utf-8") as f:
sql_queries = json.load(f)[:3] # 先頭 3 件のみ検証
for item in sql_queries:
qid = item["question_id"]
expected_entry = f"{PREFIX}-sql-{qid}"
query = item["question"]
search_query = f"{query} system={PREFIX}-sqlite"
print(f"\n[qid={qid}] {search_query}")
request = dataplex_v1.SearchEntriesRequest(
name=f"projects/{project_id}/locations/global",
query=search_query,
page_size=10,
semantic_search=True,
)
results = list(client.search_entries(request=request))[:10]
found_rank = None
for rank, result in enumerate(results, 1):
entry_name = result.dataplex_entry.name.split("/")[-1]
if entry_name == expected_entry:
found_rank = rank
break
if found_rank:
print(f" => rank {found_rank}")
else:
print(" => ヒットなし")
検索結果
実行したところ、3 件中 3 件すべてがヒットしました。
[qid=1471] What is the ratio of customers who pay in EUR against customers who pay in CZK? system=test-kaneko-sqlite => rank 1 [qid=1472] In 2012, who had the least consumption in LAM? system=test-kaneko-sqlite => rank 1 [qid=1473] What was the average monthly consumption of customers in SME for the year 2013? system=test-kaneko-sqlite => rank 3
メタデータとして SQL の説明文を明示的に登録していなくても、Dataplex の自然言語検索を利用すれば、SQL クエリーの内容と質問との間の意味的な関連性を理解し、検索結果の上位にヒットするようです。
まとめ
Dataplex のカスタムエントリー機能を使うことで、標準ではサポートされていないデータや、今回のような SQL クエリー集などもカタログとして管理できるようになります。これにより、組織のナレッジを一ヶ所に集約し、検索性を高めることが期待できます。
最後に
自社に専門人材がいない、リソースが足りない等の課題をお持ちの方に、エンジニア領域の支援サービス(Data Engineer Hub)をご提供しています。お困りごとがございましたら是非お気軽にご相談ください。
本件に関するお問い合わせは下記にて承ります。
株式会社DataCurrent
info@datacurrent.co.jp