Add Data Product Reference Example to FAST Data Platform Stage (#3211)

Add Data Product Reference Example to FAST Data Platform Stage

---------

Co-authored-by: lcaggio <lorenzo.caggioni@gmail.com>
This commit is contained in:
Jay Bana
2025-07-09 09:53:49 +01:00
committed by GitHub
parent 5029e078cd
commit 88051c2d5b
22 changed files with 1474 additions and 16 deletions

View File

@@ -19,7 +19,7 @@ locals {
for k, v in local.data_domains : k => merge(
{ region = var.location, short_name = v.short_name },
try(v.deploy_config.composer, {})
)
) if(v.deploy_config.composer != null)
}
dd_composer_keys = {
for k, v in local.dd_composer : k => try(

View File

@@ -22,7 +22,8 @@ automation:
- dp-product-a-0
deploy_config:
composer: {}
composer:
{}
# Uncomment for VPC Network Connectivity
# region defaults to var.location
# node_config:
@@ -56,11 +57,12 @@ project_config:
- datacatalog.googleapis.com
- dataplex.googleapis.com
- datalineage.googleapis.com
shared_vpc_service_config:
host_project: dev-net-spoke-0
service_agent_iam:
roles/composer.sharedVpcAgent:
- composer
# Uncomment for shared VPC Network configuration
# shared_vpc_service_config:
# host_project: dev-net-spoke-0
# service_agent_iam:
# roles/composer.sharedVpcAgent:
# - composer
folder_config:
iam_bindings:

View File

@@ -0,0 +1 @@
composer/variables.json

View File

@@ -0,0 +1,190 @@
# Data Product Reference Example
This folder contains a reference implementation of a Data Product showcasing the complete lifecycle from raw data ingestion to curated analytics-ready datasets. The example demonstrates how to create Data Products within the [Data Platform stage](../README.md) of Fabric FAST. It utilizes the automation service account and shared services created by the Data Platform stage.
Our example consists of a batch ELT pipeline that processes and joins individual CSV data files from Cloud Storage to BigQuery using the publicly available theLook eCommerce dataset:
## Components
This reference implementation includes:
- **Infrastructure as Code**: Terraform modules for deploying GCP resources
- **Data Schemas**: BigQuery table schemas in JSON format for structured data validation
- **Orchestration**: Cloud Composer (Apache Airflow) DAGs for automated pipeline execution
- **Sample Data**: Utility script to download theLook eCommerce reference tables
## Getting Started
### Prerequisites
- Google Cloud SDK installed and configured
- Terraform >= 1.9.0
- `jq` command-line JSON processor
- Access to the automation service account from the previous stage
Ensure that you are authenticated with the `gcloud` CLI using the user that has the relevant access to
both the Domain Shared Resources as well as the Data Product GCP projects:
```bash
gcloud auth login
gcloud auth application-default login
```
### 1. Infrastructure Setup
**1. Configure Terraform Variables**
```bash
cp terraform.tfvars.sample terraform.tfvars
# Edit terraform.tfvars with your specific values
```
**2. Deploy Infrastructure**
```bash
terraform init
terraform apply
```
### 2. Data Pipeline Setup
**1. Set Environment Variables**
```bash
export LANDING_BUCKET=$(terraform output -raw landing_gcs_bucket)
export COMPOSER_PROJECT_ID=$(terraform output -raw composer_project_id)
export COMPOSER_ENV_NAME=$(terraform output -raw composer_environment_name)
export LOCATION=$(terraform output -raw location)
```
**2. Deploy Data Schemas**
```bash
gcloud storage cp -r data/schemas/* gs://$LANDING_BUCKET/schemas
```
**3. Source Sample Data**
```bash
./data/get_thelook_data.sh gs://$LANDING_BUCKET
```
**4. Configure Composer Environment**
Update Composer environment variables from `composer/variables.json`:
> **Note**: This step may take several minutes to complete.
```bash
# Copy Airflow JSON variable file into Composer data folder
gcloud composer environments storage data import \
--project $COMPOSER_PROJECT_ID \
--environment=$COMPOSER_ENV_NAME \
--location $LOCATION \
--source="composer/variables.json"
# Import Airflow variables
gcloud composer environments run $COMPOSER_ENV_NAME \
--project $COMPOSER_PROJECT_ID \
--location $LOCATION \
variables \
-- import /home/airflow/gcs/data/variables.json
```
**5. Deploy Airflow DAGs**
```bash
gcloud composer environments storage dags import \
--project=$COMPOSER_PROJECT_ID \
--environment=$COMPOSER_ENV_NAME \
--location=$LOCATION \
--source="composer/DAG-dp0"
```
> **Note**: It may take several minutes for the DAGs to be parsed and become available in Composer.
### 3. Pipeline Execution
**1. Verify DAG Import**
Navigate to the Composer UI in the Domain Shared Resources project and confirm that the DAGs have been successfully imported.
**2. Execute Pipeline**
Trigger the DAGs in the following sequence (wait for each to complete):
1. **`gcs2bq_table_create`** - Creates BigQuery tables with proper schemas
2. **`gcs2bq_table_elt`** - Executes the ELT pipeline to process data
## Architecture Overview
The data product implements a three-tier architecture:
<p align="center">
<img src="diagram.png" alt="High level diagram.">
</p>
Curated data will be made accessible through authorized views within the `exposure` dataset.
### Data Storage Layers
- **Landing Zone** (`{prefix}-land-cs-0`): Raw CSV files stored in Cloud Storage
- **Raw Layer** (`{prefix}_lnd_bq_0`): Raw data loaded into BigQuery for processing
- **Curated Layer** (`{prefix}_cur_bq_0`): Processed, analytics-ready datasets
## Troubleshooting
### Common Issues
- **DAG Import Failures**: Ensure the Composer environment is fully initialized before importing DAGs
- **Permission Errors**: Verify that the user that you authenticated with via the `gcloud` CLI has the relevant permissions
- **Variable Configuration**: Double-check that `terraform.tfvars` is properly configured
### Useful Commands
```bash
# Check Terraform outputs
terraform output
# Verify bucket contents
gcloud storage ls gs://$LANDING_BUCKET --recursive
# Check Composer environment status
gcloud composer environments describe $COMPOSER_ENV_NAME \
--project $COMPOSER_PROJECT_ID \
--location $LOCATION
```
<!-- TFDOC OPTS files:1 show_extra:1 exclude:providers.tf -->
<!-- BEGIN TFDOC -->
## Files
| name | description | modules | resources |
|---|---|---|---|
| [main.tf](./main.tf) | Module-level locals and resources. | <code>bigquery-dataset</code> · <code>gcs</code> | |
| [outputs.tf](./outputs.tf) | Module outputs. | | <code>local_file</code> |
| [variables.tf](./variables.tf) | Module variables. | | |
## Variables
| name | description | type | required | default | producer |
|---|---|:---:|:---:|:---:|:---:|
| [authorized_dataset_on_curated](variables.tf#L16) | Authorized Dataset. | <code>string</code> | ✓ | | |
| [composer_config](variables.tf#L21) | Composer environment configuration. | <code title="object&#40;&#123;&#10; environment_name &#61; string&#10; project_id &#61; string&#10;&#125;&#41;">object&#40;&#123;&#8230;&#125;&#41;</code> | ✓ | | |
| [dp_processing_service_account](variables.tf#L30) | Service account for data processing via Composer impersonation. | <code>string</code> | ✓ | | |
| [impersonate_service_account](variables.tf#L47) | Service account to impersonate for Google Cloud providers. | <code>string</code> | ✓ | | |
| [prefix](variables.tf#L60) | Prefix used for resources that need unique names. Use a maximum of 9 chars for organizations, and 11 chars for tenants. | <code>string</code> | ✓ | | |
| [project_id](variables.tf#L69) | Project ID to deploy resources. | <code>string</code> | ✓ | | |
| [encryption_keys](variables.tf#L36) | Default encryption keys for services, in service => { region => key id } format. Overridable on a per-object basis. | <code title="object&#40;&#123;&#10; bigquery &#61; optional&#40;map&#40;string&#41;, &#123;&#125;&#41;&#10; composer &#61; optional&#40;map&#40;string&#41;, &#123;&#125;&#41;&#10; storage &#61; optional&#40;map&#40;string&#41;, &#123;&#125;&#41;&#10;&#125;&#41;">object&#40;&#123;&#8230;&#125;&#41;</code> | | <code>&#123;&#125;</code> | |
| [location](variables.tf#L53) | Default location used when no location is specified. | <code>string</code> | | <code>&#34;europe-west8&#34;</code> | |
## Outputs
| name | description | sensitive | consumers |
|---|---|:---:|---|
| [composer_environment_name](outputs.tf#L17) | The name of the Composer environment. | | |
| [composer_project_id](outputs.tf#L22) | The project ID where the Composer environment is located. | | |
| [dp_processing_service_account](outputs.tf#L27) | Service account for data processing. | | |
| [landing_gcs_bucket](outputs.tf#L32) | The name of the landing GCS bucket. | | |
| [location](outputs.tf#L37) | The location/region used for resources. | | |
<!-- END TFDOC -->

View File

@@ -0,0 +1,308 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
BigQuery ELT Pipeline DAG
This DAG implements a comprehensive customer purchases ELT pipeline that:
1. Loads data from GCS to BigQuery landing tables (users, orders, order_items, products)
2. Performs a 4-table join to create a comprehensive customer_purchases table
3. Creates an exposure view for analytics consumption
Dependencies: Requires gcs2bq_table_create DAG to complete first
"""
import datetime
import logging
import os
from airflow import models
from airflow.decorators import task
from airflow.models import Variable
from airflow.operators import empty
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator,)
from airflow.providers.google.cloud.sensors.bigquery import (
BigQueryTableExistenceSensor,)
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
GCSToBigQueryOperator,)
from airflow.utils.task_group import TaskGroup
# Configuration
LANDING_TABLES = ["users", "orders", "order_items", "products"]
# Environment variables (set from composer/variables.json)
DP_PROJECT = os.environ.get("DP_PROJECT")
LAND_BQ_DATASET = os.environ.get("LAND_BQ_DATASET")
CURATED_BQ_DATASET = os.environ.get("CURATED_BQ_DATASET")
LAND_GCS = os.environ.get("LAND_GCS")
DP_PROCESSING_SERVICE_ACCOUNT = os.environ.get("DP_PROCESSING_SERVICE_ACCOUNT")
LOCATION = os.environ.get("LOCATION")
# Validate required environment variables
required_vars = {
"DP_PROJECT": DP_PROJECT,
"LAND_BQ_DATASET": LAND_BQ_DATASET,
"CURATED_BQ_DATASET": CURATED_BQ_DATASET,
"LAND_GCS": LAND_GCS,
"DP_PROCESSING_SERVICE_ACCOUNT": DP_PROCESSING_SERVICE_ACCOUNT,
"LOCATION": LOCATION,
}
missing_vars = [var for var, value in required_vars.items() if not value]
if missing_vars:
raise ValueError(f"Missing required environment variables: {missing_vars}")
logger = logging.getLogger(__name__)
def create_gcs_to_bq_task(table_name: str) -> GCSToBigQueryOperator:
"""
Factory function to create GCS to BigQuery load tasks.
Args:
table_name: Name of the table to load
Returns:
GCSToBigQueryOperator instance
"""
return GCSToBigQueryOperator(
task_id=f"{table_name}_load",
bucket=LAND_GCS,
source_objects=f"data/{table_name}/{table_name}_*.csv",
destination_project_dataset_table=
f"{DP_PROJECT}.{LAND_BQ_DATASET}.{table_name}",
source_format="CSV",
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
schema_object=f"schemas/landing/{table_name}.json",
schema_object_bucket=LAND_GCS,
autodetect=False,
max_bad_records=1,
project_id=DP_PROJECT,
impersonation_chain=[DP_PROCESSING_SERVICE_ACCOUNT],
)
def create_table_validation_task(
table_name: str, dataset_name: str,
task_prefix: str = "validate") -> BigQueryTableExistenceSensor:
"""
Factory function to create table validation tasks using sensor.
Args:
table_name: Name of the table to validate
dataset_name: Name of the dataset
task_prefix: Prefix for task ID
Returns:
BigQueryTableExistenceSensor instance
"""
return BigQueryTableExistenceSensor(
task_id=f"{task_prefix}_{table_name}_exists",
project_id=DP_PROJECT,
dataset_id=dataset_name,
table_id=table_name,
poke_interval=30, # Check every 30 seconds
timeout=600, # Timeout after 10 minutes
mode="reschedule", # Release worker slot between checks
impersonation_chain=[DP_PROCESSING_SERVICE_ACCOUNT],
)
# DAG Definition
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
"owner": "data-platform-team",
"start_date": yesterday,
"depends_on_past": False,
"email": Variable.get("alert_email_list", default_var="").split(","),
"email_on_failure": True,
"email_on_retry": False,
"retries": 2,
"retry_delay": datetime.timedelta(minutes=5),
"sla": datetime.timedelta(hours=2),
}
with models.DAG(
"gcs2bq_elt",
default_args=default_args,
schedule_interval=None,
catchup=False,
max_active_runs=1,
tags=["bigquery", "elt", "data-platform", "customer-purchases"],
doc_md=__doc__,
dagrun_timeout=datetime.timedelta(hours=3),
) as dag:
# Start and end markers
start = empty.EmptyOperator(task_id="start", trigger_rule="all_success")
end = empty.EmptyOperator(task_id="end", trigger_rule="all_done")
# Validate that all required tables exist before starting data load
with TaskGroup(
"validate_prerequisites",
tooltip="Validate all landing tables exist before data load",
) as prerequisites_group:
prerequisite_validations = [
create_table_validation_task(
table_name=table,
dataset_name=LAND_BQ_DATASET,
task_prefix="validate_landing",
) for table in LANDING_TABLES
]
# Validate that the curated customer_purchases table exists from a previous run
validate_customer_purchases_prereq = create_table_validation_task(
table_name="customer_purchases",
dataset_name=CURATED_BQ_DATASET,
task_prefix="validate_curated",
)
# Load data from GCS to BigQuery landing tables
with TaskGroup("load_landing_data",
tooltip="Load all data files to landing tables") as load_group:
load_tasks = [
create_gcs_to_bq_task(table_name=table) for table in LANDING_TABLES
]
# Create comprehensive customer purchases join
customer_purchases_join = BigQueryInsertJobOperator(
task_id="create_customer_purchases",
project_id=DP_PROJECT,
configuration={
"jobType": "QUERY",
"query": {
"query":
f"""
SELECT
-- User information
u.id as user_id,
u.first_name,
u.last_name,
u.email,
u.age,
u.gender,
u.state,
u.street_address,
u.postal_code,
u.city,
u.country,
u.latitude,
u.longitude,
u.traffic_source,
u.created_at as user_created_at,
u.user_geom,
-- Order information
o.order_id,
o.status as order_status,
o.created_at as order_created_at,
o.returned_at as order_returned_at,
o.shipped_at as order_shipped_at,
o.delivered_at as order_delivered_at,
o.num_of_item,
-- Order item information
oi.id as order_item_id,
oi.product_id,
oi.inventory_item_id,
oi.status as order_item_status,
oi.sale_price,
oi.created_at as order_item_created_at,
oi.shipped_at as order_item_shipped_at,
oi.delivered_at as order_item_delivered_at,
oi.returned_at as order_item_returned_at,
-- Product information
p.cost,
p.category,
p.name,
p.brand,
p.retail_price,
p.department,
p.sku,
p.distribution_center_id
FROM `{DP_PROJECT}.{LAND_BQ_DATASET}.users` u
JOIN `{DP_PROJECT}.{LAND_BQ_DATASET}.orders` o
ON u.id = o.user_id
JOIN `{DP_PROJECT}.{LAND_BQ_DATASET}.order_items` oi
ON o.order_id = oi.order_id
JOIN `{DP_PROJECT}.{LAND_BQ_DATASET}.products` p
ON oi.product_id = p.id
""",
"destinationTable": {
"projectId": DP_PROJECT,
"datasetId": CURATED_BQ_DATASET,
"tableId": "customer_purchases",
},
"writeDisposition":
"WRITE_TRUNCATE",
"useLegacySql":
False,
},
},
impersonation_chain=[DP_PROCESSING_SERVICE_ACCOUNT],
)
@task(task_id="validate_customer_purchases_data")
def validate_customer_purchases_data_python():
"""
Checks if the customer_purchases table has data using BigQueryHook
for robust cross-project execution.
"""
project_id = DP_PROJECT
dataset_id = CURATED_BQ_DATASET
table_id = "customer_purchases"
impersonation_account = DP_PROCESSING_SERVICE_ACCOUNT
logging.info(
f"Executing data validation check on table: {project_id}.{dataset_id}.{table_id}"
)
# The hook will use the impersonation chain for all interactions
hook = BigQueryHook(
gcp_conn_id="google_cloud_default", # Assumes default connection
impersonation_chain=[impersonation_account],
location=LOCATION,
)
sql = f"SELECT COUNT(*) FROM `{project_id}.{dataset_id}.{table_id}`"
# Use insert_job for cross-project execution with explicit project_id
job_config = {"query": {"query": sql, "useLegacySql": False}}
job = hook.insert_job(configuration=job_config, project_id=project_id)
# Extract results from the completed job
results = job.result()
records = [list(row) for row in results]
if not records or not records[0] or records[0][0] == 0:
raise ValueError(
f"Data quality check failed: Table {project_id}.{dataset_id}.{table_id} is empty or has no rows."
)
else:
row_count = records[0][0]
logging.info(
f"Data quality check passed: Table {project_id}.{dataset_id}.{table_id} contains {row_count} rows."
)
validate_customer_purchases_data = validate_customer_purchases_data_python()
# Define dependencies
start >> prerequisites_group
prerequisites_group >> load_group
load_group >> customer_purchases_join
customer_purchases_join >> validate_customer_purchases_data >> end

View File

@@ -0,0 +1,224 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
BigQuery Table Creation DAG
This DAG creates BigQuery tables based on configuration stored in GCS.
It creates landing tables, curated tables, and an exposure view.
"""
import datetime
import logging
import os
from airflow import models
from airflow.models import Variable
from airflow.operators import empty
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateTableOperator,)
from airflow.providers.google.cloud.sensors.bigquery import (
BigQueryTableExistenceSensor,)
from airflow.utils.task_group import TaskGroup
# Configuration
LANDING_TABLES = ["users", "orders", "order_items", "products"]
CURATED_TABLES = ["customer_purchases"]
# Environment variables (set from Composer variables.json)
DP_PROJECT = os.environ.get("DP_PROJECT")
LAND_BQ_DATASET = os.environ.get("LAND_BQ_DATASET")
CURATED_BQ_DATASET = os.environ.get("CURATED_BQ_DATASET")
EXPOSURE_BQ_DATASET = os.environ.get("EXPOSURE_BQ_DATASET")
LAND_GCS = os.environ.get("LAND_GCS")
DP_PROCESSING_SERVICE_ACCOUNT = os.environ.get("DP_PROCESSING_SERVICE_ACCOUNT")
# Validate required environment variables
required_vars = {
"DP_PROJECT": DP_PROJECT,
"LAND_BQ_DATASET": LAND_BQ_DATASET,
"CURATED_BQ_DATASET": CURATED_BQ_DATASET,
"EXPOSURE_BQ_DATASET": EXPOSURE_BQ_DATASET,
"LAND_GCS": LAND_GCS,
"DP_PROCESSING_SERVICE_ACCOUNT": DP_PROCESSING_SERVICE_ACCOUNT,
}
missing_vars = [var for var, value in required_vars.items() if not value]
if missing_vars:
raise ValueError(f"Missing required environment variables: {missing_vars}")
logger = logging.getLogger(__name__)
def create_bq_table_task(table_name: str, dataset_name: str, schema_path: str,
task_prefix: str = "") -> BigQueryCreateTableOperator:
"""
Factory function to create BigQuery table tasks.
Args:
table_name: Name of the table to create
dataset_name: Name of the dataset
schema_path: Path to schema files in GCS
task_prefix: Prefix for task ID
Returns:
BigQueryCreateTableOperator instance
"""
task_id = (f"{task_prefix}_{table_name}_create"
if task_prefix else f"{table_name}_create")
return BigQueryCreateTableOperator(
task_id=task_id,
project_id=DP_PROJECT,
dataset_id=dataset_name,
table_id=table_name,
table_resource={},
if_exists="log",
gcs_schema_object=f"gs://{LAND_GCS}/{schema_path}/{table_name}.json",
impersonation_chain=[DP_PROCESSING_SERVICE_ACCOUNT],
)
def create_table_validation_task(
table_name: str, dataset_name: str,
task_prefix: str = "validate") -> BigQueryTableExistenceSensor:
"""
Factory function to create table validation tasks using sensor.
Args:
table_name: Name of the table to validate
dataset_name: Name of the dataset
task_prefix: Prefix for task ID
Returns:
BigQueryTableExistenceSensor instance
"""
return BigQueryTableExistenceSensor(
task_id=f"{task_prefix}_{table_name}_exists",
project_id=DP_PROJECT,
dataset_id=dataset_name,
table_id=table_name,
poke_interval=30, # Check every 30 seconds
timeout=600, # Timeout after 10 minutes
mode="reschedule", # Release worker slot between checks
impersonation_chain=[DP_PROCESSING_SERVICE_ACCOUNT],
)
# DAG Definition
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
"owner": "data-platform-team",
"start_date": yesterday,
"depends_on_past": False,
"email": Variable.get("alert_email_list", default_var="").split(","),
"email_on_failure": True,
"email_on_retry": False,
"retries": 2,
"retry_delay": datetime.timedelta(minutes=5),
"sla": datetime.timedelta(hours=1),
}
with models.DAG(
"gcs2bq_table_create",
default_args=default_args,
schedule_interval=None,
catchup=False,
max_active_runs=1,
tags=["bigquery", "table-creation", "data-platform"],
doc_md=__doc__,
dagrun_timeout=datetime.timedelta(hours=2),
) as dag:
# Start and end markers
start = empty.EmptyOperator(task_id="start", trigger_rule="all_success")
end = empty.EmptyOperator(task_id="end", trigger_rule="all_done")
# Create landing tables
with TaskGroup("create_landing_tables",
tooltip="Create all landing layer tables") as landing_group:
landing_tasks = []
for table in LANDING_TABLES:
task = create_bq_table_task(
table_name=table,
dataset_name=LAND_BQ_DATASET,
schema_path="schemas/landing",
task_prefix="land",
)
landing_tasks.append(task)
# Create curated tables
with TaskGroup("create_curated_tables",
tooltip="Create all curated layer tables") as curated_group:
curated_tasks = []
for table in CURATED_TABLES:
task = create_bq_table_task(
table_name=table,
dataset_name=CURATED_BQ_DATASET,
schema_path="schemas/curated",
task_prefix="curated",
)
curated_tasks.append(task)
# Validate all tables exist
with TaskGroup(
"validate_tables",
tooltip="Validate all tables were created") as validation_group:
# Create validation tasks for landing tables
landing_validations = [
create_table_validation_task(
table_name=table,
dataset_name=LAND_BQ_DATASET,
task_prefix="validate_landing",
) for table in LANDING_TABLES
]
# Create validation tasks for curated tables
curated_validations = [
create_table_validation_task(
table_name=table,
dataset_name=CURATED_BQ_DATASET,
task_prefix="validate_curated",
) for table in CURATED_TABLES
]
# Create exposure view
exposure_view = BigQueryCreateTableOperator(
task_id="exposure_view_create",
project_id=DP_PROJECT,
dataset_id=EXPOSURE_BQ_DATASET,
table_id="customer_purchases",
table_resource={
"view": {
"query":
f"SELECT * FROM `{DP_PROJECT}.{CURATED_BQ_DATASET}.customer_purchases`",
"useLegacySql":
False,
},
},
if_exists="log",
impersonation_chain=[DP_PROCESSING_SERVICE_ACCOUNT],
)
# Validate exposure view exists
validate_exposure_view = create_table_validation_task(
table_name="customer_purchases",
dataset_name=EXPOSURE_BQ_DATASET,
task_prefix="validate_exposure",
)
# Define dependencies
start >> [landing_group, curated_group]
[landing_group, curated_group] >> validation_group
validation_group >> exposure_view
exposure_view >> validate_exposure_view >> end

View File

@@ -0,0 +1,9 @@
{
"DP_PROJECT": "${dp_project}",
"LOCATION": "${location}",
"DP_PROCESSING_SERVICE_ACCOUNT": "${dp_processing_service_account}",
"LAND_GCS": "${land_gcs}",
"LAND_BQ_DATASET": "${land_bq_dataset}",
"CURATED_BQ_DATASET": "${curated_bq_dataset}",
"EXPOSURE_BQ_DATASET": "${exposure_bq_dataset}"
}

View File

@@ -0,0 +1,84 @@
#!/bin/bash
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Script to export data from BigQuery public dataset to GCS bucket
# Usage: ./export_bigquery_data.sh gs://your-bucket-name
set -e # Exit on error
# Check if argument is provided
if [ $# -eq 0 ]; then
echo "Error: No GCS bucket provided"
echo "Usage: $0 gs://your-bucket-name"
exit 1
fi
GCS_BUCKET=$1
# Validate that the bucket starts with gs://
if [[ ! "$GCS_BUCKET" =~ ^gs:// ]]; then
echo "Error: GCS bucket must start with gs://"
echo "Example: gs://your-bucket-name"
exit 1
fi
# Check if bq command is available
if ! command -v bq &>/dev/null; then
echo "Error: bq command not found. Please install Google Cloud SDK."
exit 1
fi
# Remove trailing slash if present
GCS_BUCKET=${GCS_BUCKET%/}
# Source project and dataset
SOURCE_PROJECT="bigquery-public-data"
SOURCE_DATASET="thelook_ecommerce"
# Tables to export
TABLES=("users" "orders" "order_items" "products")
echo "Starting export from ${SOURCE_PROJECT}.${SOURCE_DATASET} to $GCS_BUCKET"
echo "================================================"
# Export each table
for table in "${TABLES[@]}"; do
echo -n "Exporting $table..."
# Create destination path
DESTINATION="${GCS_BUCKET}/data/${table}/${table}_*.csv"
# Execute bq extract command
if bq extract \
--destination_format CSV \
--field_delimiter=',' \
--print_header=true \
"bigquery-public-data:thelook_ecommerce.${table}" \
"${DESTINATION}"; then
echo " SUCCESS"
else
echo " FAILED"
echo "Error: Failed to export $table"
exit 1
fi
done
echo "================================================"
echo "All tables exported successfully!"
echo ""
echo "Exported tables:"
for table in "${TABLES[@]}"; do
echo " - ${GCS_BUCKET}/data/${table}/"
done

View File

@@ -0,0 +1,201 @@
[
{
"mode": "NULLABLE",
"name": "user_id",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "first_name",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "last_name",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "email",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "age",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "gender",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "state",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "street_address",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "postal_code",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "city",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "country",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "latitude",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "longitude",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "traffic_source",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "user_created_at",
"type": "TIMESTAMP"
},
{
"name": "user_geom",
"type": "GEOGRAPHY"
},
{
"mode": "NULLABLE",
"name": "order_id",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "order_status",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "order_created_at",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "order_returned_at",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "order_shipped_at",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "order_delivered_at",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "num_of_item",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "order_item_id",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "product_id",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "inventory_item_id",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "order_item_status",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "order_item_created_at",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "order_item_shipped_at",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "order_item_delivered_at",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "order_item_returned_at",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "sale_price",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "cost",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "category",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "name",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "brand",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "retail_price",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "department",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "sku",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "distribution_center_id",
"type": "INTEGER"
}
]

View File

@@ -0,0 +1,57 @@
[
{
"mode": "NULLABLE",
"name": "id",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "order_id",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "user_id",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "product_id",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "inventory_item_id",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "status",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "created_at",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "shipped_at",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "delivered_at",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "returned_at",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "sale_price",
"type": "FLOAT"
}
]

View File

@@ -0,0 +1,47 @@
[
{
"mode": "NULLABLE",
"name": "order_id",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "user_id",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "status",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "gender",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "created_at",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "returned_at",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "shipped_at",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "delivered_at",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "num_of_item",
"type": "INTEGER"
}
]

View File

@@ -0,0 +1,47 @@
[
{
"mode": "NULLABLE",
"name": "id",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "cost",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "category",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "name",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "brand",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "retail_price",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "department",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "sku",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "distribution_center_id",
"type": "INTEGER"
}
]

View File

@@ -0,0 +1,81 @@
[
{
"mode": "NULLABLE",
"name": "id",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "first_name",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "last_name",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "email",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "age",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "gender",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "state",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "street_address",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "postal_code",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "city",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "country",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "latitude",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "longitude",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "traffic_source",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "created_at",
"type": "TIMESTAMP"
},
{
"name": "user_geom",
"type": "GEOGRAPHY"
}
]

Binary file not shown.

After

Width:  |  Height:  |  Size: 164 KiB

View File

@@ -0,0 +1,48 @@
/**
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
module "land-cs-0" {
source = "../../../../modules/gcs"
project_id = var.project_id
prefix = var.prefix
name = "lnd-cs-0"
encryption_key = try(var.encryption_keys[var.location].storage, null)
location = var.location
storage_class = "REGIONAL"
force_destroy = true
}
module "land-bq-0" {
source = "../../../../modules/bigquery-dataset"
project_id = var.project_id
id = "${replace(var.prefix, "-", "_")}_lnd_bq_0"
encryption_key = try(var.encryption_keys[var.location].bigquery, null)
location = var.location
}
module "cur-bq-0" {
source = "../../../../modules/bigquery-dataset"
project_id = var.project_id
id = "${replace(var.prefix, "-", "_")}_cur_bq_0"
encryption_key = try(var.encryption_keys[var.location].bigquery, null)
location = var.location
authorized_datasets = [
{
project_id = var.project_id,
dataset_id = var.authorized_dataset_on_curated
}
]
}

View File

@@ -0,0 +1,53 @@
/**
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
output "composer_environment_name" {
description = "The name of the Composer environment."
value = var.composer_config.environment_name
}
output "composer_project_id" {
description = "The project ID where the Composer environment is located."
value = var.composer_config.project_id
}
output "dp_processing_service_account" {
description = "Service account for data processing."
value = var.dp_processing_service_account
}
output "landing_gcs_bucket" {
description = "The name of the landing GCS bucket."
value = module.land-cs-0.name
}
output "location" {
description = "The location/region used for resources."
value = var.location
}
resource "local_file" "composer_variables" {
content = templatefile("composer/variables.tf.tpl", {
dp_project = var.project_id
location = var.location
dp_processing_service_account = var.dp_processing_service_account
land_gcs = module.land-cs-0.bucket.name
land_bq_dataset = module.land-bq-0.dataset_id
curated_bq_dataset = module.cur-bq-0.dataset_id
exposure_bq_dataset = var.authorized_dataset_on_curated
})
filename = "${path.module}/composer/variables.json"
}

View File

@@ -0,0 +1,24 @@
/**
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
provider "google" {
impersonate_service_account = var.impersonate_service_account
}
provider "google-beta" {
impersonate_service_account = var.impersonate_service_account
}
# end provider.tf for data-product

View File

@@ -0,0 +1,10 @@
authorized_dataset_on_curated = "<replace-with-your-dataset-id>"
composer_config = {
environment_name = "<replace-with-domain-shared-composer-name",
project_id = "<replace-with-domain-shared-project-id>"
}
impersonate_service_account = "<replace-with-service-account-email>"
dp_processing_service_account = "<replace-with-service-account-email>"
location = "europe-west8"
prefix = "demo"
project_id = "<replace-with-your-project-id>"

View File

@@ -0,0 +1,73 @@
/**
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
variable "authorized_dataset_on_curated" {
description = "Authorized Dataset."
type = string
}
variable "composer_config" {
description = "Composer environment configuration."
type = object({
environment_name = string
project_id = string
})
nullable = false
}
variable "dp_processing_service_account" {
description = "Service account for data processing via Composer impersonation."
type = string
nullable = false
}
variable "encryption_keys" {
description = "Default encryption keys for services, in service => { region => key id } format. Overridable on a per-object basis."
type = object({
bigquery = optional(map(string), {})
composer = optional(map(string), {})
storage = optional(map(string), {})
})
nullable = false
default = {}
}
variable "impersonate_service_account" {
description = "Service account to impersonate for Google Cloud providers."
type = string
nullable = false
}
variable "location" {
description = "Default location used when no location is specified."
type = string
nullable = false
default = "europe-west8"
}
variable "prefix" {
description = "Prefix used for resources that need unique names. Use a maximum of 9 chars for organizations, and 11 chars for tenants."
type = string
validation {
condition = try(length(var.prefix), 0) < 12
error_message = "Use a maximum of 9 chars for organizations, and 11 chars for tenants."
}
}
variable "project_id" {
description = "Project ID to deploy resources."
type = string
nullable = false
}

View File

@@ -2,17 +2,17 @@ location = "europe-west1"
central_project_config = {
iam_by_principals = {
"group:dp-platform-0@example.com" = [
dp-platform = [
"roles/datacatalog.categoryAdmin",
"roles/dataplex.catalogAdmin",
"roles/dataplex.aspectTypeOwner",
"roles/resourcemanager.tagViewer"
]
"group:dp-domain-a@example.com" = [
dp-domain-a = [
"roles/datacatalog.viewer",
"roles/dataplex.aspectTypeUser"
]
"group:dp-product-a-0@example.com" = [
dp-product-a-0 = [
"roles/datacatalog.viewer",
"roles/dataplex.aspectTypeUser"
]
@@ -22,10 +22,10 @@ central_project_config = {
factories_config = {
context = {
iam_principals = {
data-consumer-bi = "group:data-consumer-bi@example.com"
dp-product-a-0 = "group:dp-product-a-0@example.com"
dp-domain-a = "group:dp-domain-a@example.com"
dp-platform = "group:dp-platform-0@example.com"
dp-domain-a = "group:dp-domain-a@example.com"
dp-product-a-0 = "group:dp-product-a-0@example.com"
data-consumer-bi = "group:data-consumer-bi@example.com"
}
}
aspect_types = "data/aspect-types"

Binary file not shown.