diff --git a/fast/stages/3-data-platform-dev/data-domains-composer.tf b/fast/stages/3-data-platform-dev/data-domains-composer.tf index 5403ba906..79fc675ff 100644 --- a/fast/stages/3-data-platform-dev/data-domains-composer.tf +++ b/fast/stages/3-data-platform-dev/data-domains-composer.tf @@ -19,7 +19,7 @@ locals { for k, v in local.data_domains : k => merge( { region = var.location, short_name = v.short_name }, try(v.deploy_config.composer, {}) - ) + ) if(v.deploy_config.composer != null) } dd_composer_keys = { for k, v in local.dd_composer : k => try( diff --git a/fast/stages/3-data-platform-dev/data/data-domains/domain-0/_config.yaml b/fast/stages/3-data-platform-dev/data/data-domains/domain-0/_config.yaml index c35248371..df5a0581b 100644 --- a/fast/stages/3-data-platform-dev/data/data-domains/domain-0/_config.yaml +++ b/fast/stages/3-data-platform-dev/data/data-domains/domain-0/_config.yaml @@ -22,7 +22,8 @@ automation: - dp-product-a-0 deploy_config: - composer: {} + composer: + {} # Uncomment for VPC Network Connectivity # region defaults to var.location # node_config: @@ -56,11 +57,12 @@ project_config: - datacatalog.googleapis.com - dataplex.googleapis.com - datalineage.googleapis.com - shared_vpc_service_config: - host_project: dev-net-spoke-0 - service_agent_iam: - roles/composer.sharedVpcAgent: - - composer + # Uncomment for shared VPC Network configuration + # shared_vpc_service_config: + # host_project: dev-net-spoke-0 + # service_agent_iam: + # roles/composer.sharedVpcAgent: + # - composer folder_config: iam_bindings: diff --git a/fast/stages/3-data-platform-dev/demo/.gitignore b/fast/stages/3-data-platform-dev/demo/.gitignore new file mode 100644 index 000000000..c6077b91e --- /dev/null +++ b/fast/stages/3-data-platform-dev/demo/.gitignore @@ -0,0 +1 @@ +composer/variables.json diff --git a/fast/stages/3-data-platform-dev/demo/README.md b/fast/stages/3-data-platform-dev/demo/README.md new file mode 100644 index 000000000..373a338ff --- /dev/null +++ b/fast/stages/3-data-platform-dev/demo/README.md @@ -0,0 +1,190 @@ +# Data Product Reference Example + +This folder contains a reference implementation of a Data Product showcasing the complete lifecycle from raw data ingestion to curated analytics-ready datasets. The example demonstrates how to create Data Products within the [Data Platform stage](../README.md) of Fabric FAST. It utilizes the automation service account and shared services created by the Data Platform stage. + +Our example consists of a batch ELT pipeline that processes and joins individual CSV data files from Cloud Storage to BigQuery using the publicly available theLook eCommerce dataset: + +## Components + +This reference implementation includes: + +- **Infrastructure as Code**: Terraform modules for deploying GCP resources +- **Data Schemas**: BigQuery table schemas in JSON format for structured data validation +- **Orchestration**: Cloud Composer (Apache Airflow) DAGs for automated pipeline execution +- **Sample Data**: Utility script to download theLook eCommerce reference tables + +## Getting Started + +### Prerequisites + +- Google Cloud SDK installed and configured +- Terraform >= 1.9.0 +- `jq` command-line JSON processor +- Access to the automation service account from the previous stage + +Ensure that you are authenticated with the `gcloud` CLI using the user that has the relevant access to +both the Domain Shared Resources as well as the Data Product GCP projects: + +```bash +gcloud auth login +gcloud auth application-default login +``` + +### 1. Infrastructure Setup + +**1. Configure Terraform Variables** + + ```bash + cp terraform.tfvars.sample terraform.tfvars + # Edit terraform.tfvars with your specific values + ``` + +**2. Deploy Infrastructure** + + ```bash + terraform init + terraform apply + ``` + +### 2. Data Pipeline Setup + +**1. Set Environment Variables** + + ```bash + export LANDING_BUCKET=$(terraform output -raw landing_gcs_bucket) + export COMPOSER_PROJECT_ID=$(terraform output -raw composer_project_id) + export COMPOSER_ENV_NAME=$(terraform output -raw composer_environment_name) + export LOCATION=$(terraform output -raw location) + ``` + +**2. Deploy Data Schemas** + + ```bash + gcloud storage cp -r data/schemas/* gs://$LANDING_BUCKET/schemas + ``` + +**3. Source Sample Data** + + ```bash + ./data/get_thelook_data.sh gs://$LANDING_BUCKET + ``` + +**4. Configure Composer Environment** + + Update Composer environment variables from `composer/variables.json`: + + > **Note**: This step may take several minutes to complete. + + ```bash + # Copy Airflow JSON variable file into Composer data folder + gcloud composer environments storage data import \ + --project $COMPOSER_PROJECT_ID \ + --environment=$COMPOSER_ENV_NAME \ + --location $LOCATION \ + --source="composer/variables.json" + + # Import Airflow variables + gcloud composer environments run $COMPOSER_ENV_NAME \ + --project $COMPOSER_PROJECT_ID \ + --location $LOCATION \ + variables \ + -- import /home/airflow/gcs/data/variables.json + ``` + +**5. Deploy Airflow DAGs** + + ```bash + gcloud composer environments storage dags import \ + --project=$COMPOSER_PROJECT_ID \ + --environment=$COMPOSER_ENV_NAME \ + --location=$LOCATION \ + --source="composer/DAG-dp0" + ``` + + > **Note**: It may take several minutes for the DAGs to be parsed and become available in Composer. + +### 3. Pipeline Execution + +**1. Verify DAG Import** + + Navigate to the Composer UI in the Domain Shared Resources project and confirm that the DAGs have been successfully imported. + +**2. Execute Pipeline** + + Trigger the DAGs in the following sequence (wait for each to complete): + + 1. **`gcs2bq_table_create`** - Creates BigQuery tables with proper schemas + 2. **`gcs2bq_table_elt`** - Executes the ELT pipeline to process data + +## Architecture Overview + +The data product implements a three-tier architecture: + +

+ High level diagram. +

+ +Curated data will be made accessible through authorized views within the `exposure` dataset. + +### Data Storage Layers + +- **Landing Zone** (`{prefix}-land-cs-0`): Raw CSV files stored in Cloud Storage +- **Raw Layer** (`{prefix}_lnd_bq_0`): Raw data loaded into BigQuery for processing +- **Curated Layer** (`{prefix}_cur_bq_0`): Processed, analytics-ready datasets + +## Troubleshooting + +### Common Issues + +- **DAG Import Failures**: Ensure the Composer environment is fully initialized before importing DAGs +- **Permission Errors**: Verify that the user that you authenticated with via the `gcloud` CLI has the relevant permissions +- **Variable Configuration**: Double-check that `terraform.tfvars` is properly configured + +### Useful Commands + +```bash +# Check Terraform outputs +terraform output + +# Verify bucket contents +gcloud storage ls gs://$LANDING_BUCKET --recursive + +# Check Composer environment status +gcloud composer environments describe $COMPOSER_ENV_NAME \ + --project $COMPOSER_PROJECT_ID \ + --location $LOCATION +``` + + + +## Files + +| name | description | modules | resources | +|---|---|---|---| +| [main.tf](./main.tf) | Module-level locals and resources. | bigquery-dataset · gcs | | +| [outputs.tf](./outputs.tf) | Module outputs. | | local_file | +| [variables.tf](./variables.tf) | Module variables. | | | + +## Variables + +| name | description | type | required | default | producer | +|---|---|:---:|:---:|:---:|:---:| +| [authorized_dataset_on_curated](variables.tf#L16) | Authorized Dataset. | string | ✓ | | | +| [composer_config](variables.tf#L21) | Composer environment configuration. | object({…}) | ✓ | | | +| [dp_processing_service_account](variables.tf#L30) | Service account for data processing via Composer impersonation. | string | ✓ | | | +| [impersonate_service_account](variables.tf#L47) | Service account to impersonate for Google Cloud providers. | string | ✓ | | | +| [prefix](variables.tf#L60) | Prefix used for resources that need unique names. Use a maximum of 9 chars for organizations, and 11 chars for tenants. | string | ✓ | | | +| [project_id](variables.tf#L69) | Project ID to deploy resources. | string | ✓ | | | +| [encryption_keys](variables.tf#L36) | Default encryption keys for services, in service => { region => key id } format. Overridable on a per-object basis. | object({…}) | | {} | | +| [location](variables.tf#L53) | Default location used when no location is specified. | string | | "europe-west8" | | + +## Outputs + +| name | description | sensitive | consumers | +|---|---|:---:|---| +| [composer_environment_name](outputs.tf#L17) | The name of the Composer environment. | | | +| [composer_project_id](outputs.tf#L22) | The project ID where the Composer environment is located. | | | +| [dp_processing_service_account](outputs.tf#L27) | Service account for data processing. | | | +| [landing_gcs_bucket](outputs.tf#L32) | The name of the landing GCS bucket. | | | +| [location](outputs.tf#L37) | The location/region used for resources. | | | + diff --git a/fast/stages/3-data-platform-dev/demo/composer/DAG-dp0/gcs2bq_elt.py b/fast/stages/3-data-platform-dev/demo/composer/DAG-dp0/gcs2bq_elt.py new file mode 100644 index 000000000..46178cf67 --- /dev/null +++ b/fast/stages/3-data-platform-dev/demo/composer/DAG-dp0/gcs2bq_elt.py @@ -0,0 +1,308 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +BigQuery ELT Pipeline DAG + +This DAG implements a comprehensive customer purchases ELT pipeline that: +1. Loads data from GCS to BigQuery landing tables (users, orders, order_items, products) +2. Performs a 4-table join to create a comprehensive customer_purchases table +3. Creates an exposure view for analytics consumption + +Dependencies: Requires gcs2bq_table_create DAG to complete first +""" + +import datetime +import logging +import os + +from airflow import models +from airflow.decorators import task +from airflow.models import Variable +from airflow.operators import empty +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryInsertJobOperator,) +from airflow.providers.google.cloud.sensors.bigquery import ( + BigQueryTableExistenceSensor,) +from airflow.providers.google.cloud.transfers.gcs_to_bigquery import ( + GCSToBigQueryOperator,) +from airflow.utils.task_group import TaskGroup + +# Configuration +LANDING_TABLES = ["users", "orders", "order_items", "products"] + +# Environment variables (set from composer/variables.json) +DP_PROJECT = os.environ.get("DP_PROJECT") +LAND_BQ_DATASET = os.environ.get("LAND_BQ_DATASET") +CURATED_BQ_DATASET = os.environ.get("CURATED_BQ_DATASET") +LAND_GCS = os.environ.get("LAND_GCS") +DP_PROCESSING_SERVICE_ACCOUNT = os.environ.get("DP_PROCESSING_SERVICE_ACCOUNT") +LOCATION = os.environ.get("LOCATION") + +# Validate required environment variables +required_vars = { + "DP_PROJECT": DP_PROJECT, + "LAND_BQ_DATASET": LAND_BQ_DATASET, + "CURATED_BQ_DATASET": CURATED_BQ_DATASET, + "LAND_GCS": LAND_GCS, + "DP_PROCESSING_SERVICE_ACCOUNT": DP_PROCESSING_SERVICE_ACCOUNT, + "LOCATION": LOCATION, +} + +missing_vars = [var for var, value in required_vars.items() if not value] +if missing_vars: + raise ValueError(f"Missing required environment variables: {missing_vars}") + +logger = logging.getLogger(__name__) + + +def create_gcs_to_bq_task(table_name: str) -> GCSToBigQueryOperator: + """ + Factory function to create GCS to BigQuery load tasks. + + Args: + table_name: Name of the table to load + + Returns: + GCSToBigQueryOperator instance + """ + return GCSToBigQueryOperator( + task_id=f"{table_name}_load", + bucket=LAND_GCS, + source_objects=f"data/{table_name}/{table_name}_*.csv", + destination_project_dataset_table= + f"{DP_PROJECT}.{LAND_BQ_DATASET}.{table_name}", + source_format="CSV", + create_disposition="CREATE_IF_NEEDED", + write_disposition="WRITE_TRUNCATE", + schema_object=f"schemas/landing/{table_name}.json", + schema_object_bucket=LAND_GCS, + autodetect=False, + max_bad_records=1, + project_id=DP_PROJECT, + impersonation_chain=[DP_PROCESSING_SERVICE_ACCOUNT], + ) + + +def create_table_validation_task( + table_name: str, dataset_name: str, + task_prefix: str = "validate") -> BigQueryTableExistenceSensor: + """ + Factory function to create table validation tasks using sensor. + + Args: + table_name: Name of the table to validate + dataset_name: Name of the dataset + task_prefix: Prefix for task ID + + Returns: + BigQueryTableExistenceSensor instance + """ + return BigQueryTableExistenceSensor( + task_id=f"{task_prefix}_{table_name}_exists", + project_id=DP_PROJECT, + dataset_id=dataset_name, + table_id=table_name, + poke_interval=30, # Check every 30 seconds + timeout=600, # Timeout after 10 minutes + mode="reschedule", # Release worker slot between checks + impersonation_chain=[DP_PROCESSING_SERVICE_ACCOUNT], + ) + + +# DAG Definition +yesterday = datetime.datetime.now() - datetime.timedelta(days=1) + +default_args = { + "owner": "data-platform-team", + "start_date": yesterday, + "depends_on_past": False, + "email": Variable.get("alert_email_list", default_var="").split(","), + "email_on_failure": True, + "email_on_retry": False, + "retries": 2, + "retry_delay": datetime.timedelta(minutes=5), + "sla": datetime.timedelta(hours=2), +} + +with models.DAG( + "gcs2bq_elt", + default_args=default_args, + schedule_interval=None, + catchup=False, + max_active_runs=1, + tags=["bigquery", "elt", "data-platform", "customer-purchases"], + doc_md=__doc__, + dagrun_timeout=datetime.timedelta(hours=3), +) as dag: + # Start and end markers + start = empty.EmptyOperator(task_id="start", trigger_rule="all_success") + end = empty.EmptyOperator(task_id="end", trigger_rule="all_done") + + # Validate that all required tables exist before starting data load + with TaskGroup( + "validate_prerequisites", + tooltip="Validate all landing tables exist before data load", + ) as prerequisites_group: + prerequisite_validations = [ + create_table_validation_task( + table_name=table, + dataset_name=LAND_BQ_DATASET, + task_prefix="validate_landing", + ) for table in LANDING_TABLES + ] + # Validate that the curated customer_purchases table exists from a previous run + validate_customer_purchases_prereq = create_table_validation_task( + table_name="customer_purchases", + dataset_name=CURATED_BQ_DATASET, + task_prefix="validate_curated", + ) + + # Load data from GCS to BigQuery landing tables + with TaskGroup("load_landing_data", + tooltip="Load all data files to landing tables") as load_group: + load_tasks = [ + create_gcs_to_bq_task(table_name=table) for table in LANDING_TABLES + ] + + # Create comprehensive customer purchases join + customer_purchases_join = BigQueryInsertJobOperator( + task_id="create_customer_purchases", + project_id=DP_PROJECT, + configuration={ + "jobType": "QUERY", + "query": { + "query": + f""" + SELECT + -- User information + u.id as user_id, + u.first_name, + u.last_name, + u.email, + u.age, + u.gender, + u.state, + u.street_address, + u.postal_code, + u.city, + u.country, + u.latitude, + u.longitude, + u.traffic_source, + u.created_at as user_created_at, + u.user_geom, + + -- Order information + o.order_id, + o.status as order_status, + o.created_at as order_created_at, + o.returned_at as order_returned_at, + o.shipped_at as order_shipped_at, + o.delivered_at as order_delivered_at, + o.num_of_item, + + -- Order item information + oi.id as order_item_id, + oi.product_id, + oi.inventory_item_id, + oi.status as order_item_status, + oi.sale_price, + oi.created_at as order_item_created_at, + oi.shipped_at as order_item_shipped_at, + oi.delivered_at as order_item_delivered_at, + oi.returned_at as order_item_returned_at, + + -- Product information + p.cost, + p.category, + p.name, + p.brand, + p.retail_price, + p.department, + p.sku, + p.distribution_center_id + + FROM `{DP_PROJECT}.{LAND_BQ_DATASET}.users` u + JOIN `{DP_PROJECT}.{LAND_BQ_DATASET}.orders` o + ON u.id = o.user_id + JOIN `{DP_PROJECT}.{LAND_BQ_DATASET}.order_items` oi + ON o.order_id = oi.order_id + JOIN `{DP_PROJECT}.{LAND_BQ_DATASET}.products` p + ON oi.product_id = p.id + """, + "destinationTable": { + "projectId": DP_PROJECT, + "datasetId": CURATED_BQ_DATASET, + "tableId": "customer_purchases", + }, + "writeDisposition": + "WRITE_TRUNCATE", + "useLegacySql": + False, + }, + }, + impersonation_chain=[DP_PROCESSING_SERVICE_ACCOUNT], + ) + + @task(task_id="validate_customer_purchases_data") + def validate_customer_purchases_data_python(): + """ + Checks if the customer_purchases table has data using BigQueryHook + for robust cross-project execution. + """ + project_id = DP_PROJECT + dataset_id = CURATED_BQ_DATASET + table_id = "customer_purchases" + impersonation_account = DP_PROCESSING_SERVICE_ACCOUNT + + logging.info( + f"Executing data validation check on table: {project_id}.{dataset_id}.{table_id}" + ) + + # The hook will use the impersonation chain for all interactions + hook = BigQueryHook( + gcp_conn_id="google_cloud_default", # Assumes default connection + impersonation_chain=[impersonation_account], + location=LOCATION, + ) + + sql = f"SELECT COUNT(*) FROM `{project_id}.{dataset_id}.{table_id}`" + + # Use insert_job for cross-project execution with explicit project_id + job_config = {"query": {"query": sql, "useLegacySql": False}} + + job = hook.insert_job(configuration=job_config, project_id=project_id) + + # Extract results from the completed job + results = job.result() + records = [list(row) for row in results] + + if not records or not records[0] or records[0][0] == 0: + raise ValueError( + f"Data quality check failed: Table {project_id}.{dataset_id}.{table_id} is empty or has no rows." + ) + else: + row_count = records[0][0] + logging.info( + f"Data quality check passed: Table {project_id}.{dataset_id}.{table_id} contains {row_count} rows." + ) + + validate_customer_purchases_data = validate_customer_purchases_data_python() + + # Define dependencies + start >> prerequisites_group + prerequisites_group >> load_group + load_group >> customer_purchases_join + customer_purchases_join >> validate_customer_purchases_data >> end diff --git a/fast/stages/3-data-platform-dev/demo/composer/DAG-dp0/gcs2bq_table_create.py b/fast/stages/3-data-platform-dev/demo/composer/DAG-dp0/gcs2bq_table_create.py new file mode 100644 index 000000000..a2343ab62 --- /dev/null +++ b/fast/stages/3-data-platform-dev/demo/composer/DAG-dp0/gcs2bq_table_create.py @@ -0,0 +1,224 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +BigQuery Table Creation DAG + +This DAG creates BigQuery tables based on configuration stored in GCS. +It creates landing tables, curated tables, and an exposure view. +""" + +import datetime +import logging +import os + +from airflow import models +from airflow.models import Variable +from airflow.operators import empty +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryCreateTableOperator,) +from airflow.providers.google.cloud.sensors.bigquery import ( + BigQueryTableExistenceSensor,) +from airflow.utils.task_group import TaskGroup + +# Configuration +LANDING_TABLES = ["users", "orders", "order_items", "products"] +CURATED_TABLES = ["customer_purchases"] + +# Environment variables (set from Composer variables.json) +DP_PROJECT = os.environ.get("DP_PROJECT") +LAND_BQ_DATASET = os.environ.get("LAND_BQ_DATASET") +CURATED_BQ_DATASET = os.environ.get("CURATED_BQ_DATASET") +EXPOSURE_BQ_DATASET = os.environ.get("EXPOSURE_BQ_DATASET") +LAND_GCS = os.environ.get("LAND_GCS") +DP_PROCESSING_SERVICE_ACCOUNT = os.environ.get("DP_PROCESSING_SERVICE_ACCOUNT") + +# Validate required environment variables +required_vars = { + "DP_PROJECT": DP_PROJECT, + "LAND_BQ_DATASET": LAND_BQ_DATASET, + "CURATED_BQ_DATASET": CURATED_BQ_DATASET, + "EXPOSURE_BQ_DATASET": EXPOSURE_BQ_DATASET, + "LAND_GCS": LAND_GCS, + "DP_PROCESSING_SERVICE_ACCOUNT": DP_PROCESSING_SERVICE_ACCOUNT, +} + +missing_vars = [var for var, value in required_vars.items() if not value] +if missing_vars: + raise ValueError(f"Missing required environment variables: {missing_vars}") + +logger = logging.getLogger(__name__) + + +def create_bq_table_task(table_name: str, dataset_name: str, schema_path: str, + task_prefix: str = "") -> BigQueryCreateTableOperator: + """ + Factory function to create BigQuery table tasks. + + Args: + table_name: Name of the table to create + dataset_name: Name of the dataset + schema_path: Path to schema files in GCS + task_prefix: Prefix for task ID + + Returns: + BigQueryCreateTableOperator instance + """ + task_id = (f"{task_prefix}_{table_name}_create" + if task_prefix else f"{table_name}_create") + + return BigQueryCreateTableOperator( + task_id=task_id, + project_id=DP_PROJECT, + dataset_id=dataset_name, + table_id=table_name, + table_resource={}, + if_exists="log", + gcs_schema_object=f"gs://{LAND_GCS}/{schema_path}/{table_name}.json", + impersonation_chain=[DP_PROCESSING_SERVICE_ACCOUNT], + ) + + +def create_table_validation_task( + table_name: str, dataset_name: str, + task_prefix: str = "validate") -> BigQueryTableExistenceSensor: + """ + Factory function to create table validation tasks using sensor. + + Args: + table_name: Name of the table to validate + dataset_name: Name of the dataset + task_prefix: Prefix for task ID + + Returns: + BigQueryTableExistenceSensor instance + """ + return BigQueryTableExistenceSensor( + task_id=f"{task_prefix}_{table_name}_exists", + project_id=DP_PROJECT, + dataset_id=dataset_name, + table_id=table_name, + poke_interval=30, # Check every 30 seconds + timeout=600, # Timeout after 10 minutes + mode="reschedule", # Release worker slot between checks + impersonation_chain=[DP_PROCESSING_SERVICE_ACCOUNT], + ) + + +# DAG Definition +yesterday = datetime.datetime.now() - datetime.timedelta(days=1) + +default_args = { + "owner": "data-platform-team", + "start_date": yesterday, + "depends_on_past": False, + "email": Variable.get("alert_email_list", default_var="").split(","), + "email_on_failure": True, + "email_on_retry": False, + "retries": 2, + "retry_delay": datetime.timedelta(minutes=5), + "sla": datetime.timedelta(hours=1), +} + +with models.DAG( + "gcs2bq_table_create", + default_args=default_args, + schedule_interval=None, + catchup=False, + max_active_runs=1, + tags=["bigquery", "table-creation", "data-platform"], + doc_md=__doc__, + dagrun_timeout=datetime.timedelta(hours=2), +) as dag: + # Start and end markers + start = empty.EmptyOperator(task_id="start", trigger_rule="all_success") + end = empty.EmptyOperator(task_id="end", trigger_rule="all_done") + + # Create landing tables + with TaskGroup("create_landing_tables", + tooltip="Create all landing layer tables") as landing_group: + landing_tasks = [] + for table in LANDING_TABLES: + task = create_bq_table_task( + table_name=table, + dataset_name=LAND_BQ_DATASET, + schema_path="schemas/landing", + task_prefix="land", + ) + landing_tasks.append(task) + + # Create curated tables + with TaskGroup("create_curated_tables", + tooltip="Create all curated layer tables") as curated_group: + curated_tasks = [] + for table in CURATED_TABLES: + task = create_bq_table_task( + table_name=table, + dataset_name=CURATED_BQ_DATASET, + schema_path="schemas/curated", + task_prefix="curated", + ) + curated_tasks.append(task) + + # Validate all tables exist + with TaskGroup( + "validate_tables", + tooltip="Validate all tables were created") as validation_group: + # Create validation tasks for landing tables + landing_validations = [ + create_table_validation_task( + table_name=table, + dataset_name=LAND_BQ_DATASET, + task_prefix="validate_landing", + ) for table in LANDING_TABLES + ] + + # Create validation tasks for curated tables + curated_validations = [ + create_table_validation_task( + table_name=table, + dataset_name=CURATED_BQ_DATASET, + task_prefix="validate_curated", + ) for table in CURATED_TABLES + ] + + # Create exposure view + exposure_view = BigQueryCreateTableOperator( + task_id="exposure_view_create", + project_id=DP_PROJECT, + dataset_id=EXPOSURE_BQ_DATASET, + table_id="customer_purchases", + table_resource={ + "view": { + "query": + f"SELECT * FROM `{DP_PROJECT}.{CURATED_BQ_DATASET}.customer_purchases`", + "useLegacySql": + False, + }, + }, + if_exists="log", + impersonation_chain=[DP_PROCESSING_SERVICE_ACCOUNT], + ) + + # Validate exposure view exists + validate_exposure_view = create_table_validation_task( + table_name="customer_purchases", + dataset_name=EXPOSURE_BQ_DATASET, + task_prefix="validate_exposure", + ) + + # Define dependencies + start >> [landing_group, curated_group] + [landing_group, curated_group] >> validation_group + validation_group >> exposure_view + exposure_view >> validate_exposure_view >> end diff --git a/fast/stages/3-data-platform-dev/demo/composer/variables.tf.tpl b/fast/stages/3-data-platform-dev/demo/composer/variables.tf.tpl new file mode 100644 index 000000000..46ef77b90 --- /dev/null +++ b/fast/stages/3-data-platform-dev/demo/composer/variables.tf.tpl @@ -0,0 +1,9 @@ +{ + "DP_PROJECT": "${dp_project}", + "LOCATION": "${location}", + "DP_PROCESSING_SERVICE_ACCOUNT": "${dp_processing_service_account}", + "LAND_GCS": "${land_gcs}", + "LAND_BQ_DATASET": "${land_bq_dataset}", + "CURATED_BQ_DATASET": "${curated_bq_dataset}", + "EXPOSURE_BQ_DATASET": "${exposure_bq_dataset}" +} diff --git a/fast/stages/3-data-platform-dev/demo/data/get_thelook_data.sh b/fast/stages/3-data-platform-dev/demo/data/get_thelook_data.sh new file mode 100755 index 000000000..b16c83c49 --- /dev/null +++ b/fast/stages/3-data-platform-dev/demo/data/get_thelook_data.sh @@ -0,0 +1,84 @@ +#!/bin/bash +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Script to export data from BigQuery public dataset to GCS bucket +# Usage: ./export_bigquery_data.sh gs://your-bucket-name + +set -e # Exit on error + +# Check if argument is provided +if [ $# -eq 0 ]; then + echo "Error: No GCS bucket provided" + echo "Usage: $0 gs://your-bucket-name" + exit 1 +fi + +GCS_BUCKET=$1 + +# Validate that the bucket starts with gs:// +if [[ ! "$GCS_BUCKET" =~ ^gs:// ]]; then + echo "Error: GCS bucket must start with gs://" + echo "Example: gs://your-bucket-name" + exit 1 +fi + +# Check if bq command is available +if ! command -v bq &>/dev/null; then + echo "Error: bq command not found. Please install Google Cloud SDK." + exit 1 +fi + +# Remove trailing slash if present +GCS_BUCKET=${GCS_BUCKET%/} + +# Source project and dataset +SOURCE_PROJECT="bigquery-public-data" +SOURCE_DATASET="thelook_ecommerce" + +# Tables to export +TABLES=("users" "orders" "order_items" "products") + +echo "Starting export from ${SOURCE_PROJECT}.${SOURCE_DATASET} to $GCS_BUCKET" +echo "================================================" + +# Export each table +for table in "${TABLES[@]}"; do + echo -n "Exporting $table..." + + # Create destination path + DESTINATION="${GCS_BUCKET}/data/${table}/${table}_*.csv" + + # Execute bq extract command + if bq extract \ + --destination_format CSV \ + --field_delimiter=',' \ + --print_header=true \ + "bigquery-public-data:thelook_ecommerce.${table}" \ + "${DESTINATION}"; then + echo " SUCCESS" + else + echo " FAILED" + echo "Error: Failed to export $table" + exit 1 + fi +done + +echo "================================================" +echo "All tables exported successfully!" +echo "" +echo "Exported tables:" +for table in "${TABLES[@]}"; do + echo " - ${GCS_BUCKET}/data/${table}/" +done diff --git a/fast/stages/3-data-platform-dev/demo/data/schemas/curated/customer_purchases.json b/fast/stages/3-data-platform-dev/demo/data/schemas/curated/customer_purchases.json new file mode 100644 index 000000000..bcaa28a6c --- /dev/null +++ b/fast/stages/3-data-platform-dev/demo/data/schemas/curated/customer_purchases.json @@ -0,0 +1,201 @@ +[ + { + "mode": "NULLABLE", + "name": "user_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "first_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "last_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "email", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "age", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "gender", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "state", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "street_address", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "postal_code", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "city", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "country", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "latitude", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "longitude", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "traffic_source", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "user_created_at", + "type": "TIMESTAMP" + }, + { + "name": "user_geom", + "type": "GEOGRAPHY" + }, + { + "mode": "NULLABLE", + "name": "order_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "order_status", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "order_created_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "order_returned_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "order_shipped_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "order_delivered_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "num_of_item", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "order_item_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "product_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "inventory_item_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "order_item_status", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "order_item_created_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "order_item_shipped_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "order_item_delivered_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "order_item_returned_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "sale_price", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "cost", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "category", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "brand", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "retail_price", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "department", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "sku", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "distribution_center_id", + "type": "INTEGER" + } +] diff --git a/fast/stages/3-data-platform-dev/demo/data/schemas/landing/order_items.json b/fast/stages/3-data-platform-dev/demo/data/schemas/landing/order_items.json new file mode 100644 index 000000000..9b0d6829a --- /dev/null +++ b/fast/stages/3-data-platform-dev/demo/data/schemas/landing/order_items.json @@ -0,0 +1,57 @@ +[ + { + "mode": "NULLABLE", + "name": "id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "order_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "user_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "product_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "inventory_item_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "status", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "created_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "shipped_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "delivered_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "returned_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "sale_price", + "type": "FLOAT" + } +] diff --git a/fast/stages/3-data-platform-dev/demo/data/schemas/landing/orders.json b/fast/stages/3-data-platform-dev/demo/data/schemas/landing/orders.json new file mode 100644 index 000000000..bb872ca55 --- /dev/null +++ b/fast/stages/3-data-platform-dev/demo/data/schemas/landing/orders.json @@ -0,0 +1,47 @@ +[ + { + "mode": "NULLABLE", + "name": "order_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "user_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "status", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "gender", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "created_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "returned_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "shipped_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "delivered_at", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "num_of_item", + "type": "INTEGER" + } +] diff --git a/fast/stages/3-data-platform-dev/demo/data/schemas/landing/products.json b/fast/stages/3-data-platform-dev/demo/data/schemas/landing/products.json new file mode 100644 index 000000000..da9182209 --- /dev/null +++ b/fast/stages/3-data-platform-dev/demo/data/schemas/landing/products.json @@ -0,0 +1,47 @@ +[ + { + "mode": "NULLABLE", + "name": "id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "cost", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "category", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "brand", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "retail_price", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "department", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "sku", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "distribution_center_id", + "type": "INTEGER" + } +] diff --git a/fast/stages/3-data-platform-dev/demo/data/schemas/landing/users.json b/fast/stages/3-data-platform-dev/demo/data/schemas/landing/users.json new file mode 100644 index 000000000..d4a86c28d --- /dev/null +++ b/fast/stages/3-data-platform-dev/demo/data/schemas/landing/users.json @@ -0,0 +1,81 @@ +[ + { + "mode": "NULLABLE", + "name": "id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "first_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "last_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "email", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "age", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "gender", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "state", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "street_address", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "postal_code", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "city", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "country", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "latitude", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "longitude", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "traffic_source", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "created_at", + "type": "TIMESTAMP" + }, + { + "name": "user_geom", + "type": "GEOGRAPHY" + } +] diff --git a/fast/stages/3-data-platform-dev/demo/diagram.png b/fast/stages/3-data-platform-dev/demo/diagram.png new file mode 100644 index 000000000..5f4921df3 Binary files /dev/null and b/fast/stages/3-data-platform-dev/demo/diagram.png differ diff --git a/fast/stages/3-data-platform-dev/demo/main.tf b/fast/stages/3-data-platform-dev/demo/main.tf new file mode 100644 index 000000000..79bd45918 --- /dev/null +++ b/fast/stages/3-data-platform-dev/demo/main.tf @@ -0,0 +1,48 @@ +/** + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +module "land-cs-0" { + source = "../../../../modules/gcs" + project_id = var.project_id + prefix = var.prefix + name = "lnd-cs-0" + encryption_key = try(var.encryption_keys[var.location].storage, null) + location = var.location + storage_class = "REGIONAL" + force_destroy = true +} + +module "land-bq-0" { + source = "../../../../modules/bigquery-dataset" + project_id = var.project_id + id = "${replace(var.prefix, "-", "_")}_lnd_bq_0" + encryption_key = try(var.encryption_keys[var.location].bigquery, null) + location = var.location +} + +module "cur-bq-0" { + source = "../../../../modules/bigquery-dataset" + project_id = var.project_id + id = "${replace(var.prefix, "-", "_")}_cur_bq_0" + encryption_key = try(var.encryption_keys[var.location].bigquery, null) + location = var.location + authorized_datasets = [ + { + project_id = var.project_id, + dataset_id = var.authorized_dataset_on_curated + } + ] +} diff --git a/fast/stages/3-data-platform-dev/demo/outputs.tf b/fast/stages/3-data-platform-dev/demo/outputs.tf new file mode 100644 index 000000000..e40a5e5e1 --- /dev/null +++ b/fast/stages/3-data-platform-dev/demo/outputs.tf @@ -0,0 +1,53 @@ +/** + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +output "composer_environment_name" { + description = "The name of the Composer environment." + value = var.composer_config.environment_name +} + +output "composer_project_id" { + description = "The project ID where the Composer environment is located." + value = var.composer_config.project_id +} + +output "dp_processing_service_account" { + description = "Service account for data processing." + value = var.dp_processing_service_account +} + +output "landing_gcs_bucket" { + description = "The name of the landing GCS bucket." + value = module.land-cs-0.name +} + +output "location" { + description = "The location/region used for resources." + value = var.location +} + +resource "local_file" "composer_variables" { + content = templatefile("composer/variables.tf.tpl", { + dp_project = var.project_id + location = var.location + dp_processing_service_account = var.dp_processing_service_account + land_gcs = module.land-cs-0.bucket.name + land_bq_dataset = module.land-bq-0.dataset_id + curated_bq_dataset = module.cur-bq-0.dataset_id + exposure_bq_dataset = var.authorized_dataset_on_curated + }) + filename = "${path.module}/composer/variables.json" +} diff --git a/fast/stages/3-data-platform-dev/demo/providers.tf b/fast/stages/3-data-platform-dev/demo/providers.tf new file mode 100644 index 000000000..dd56a321b --- /dev/null +++ b/fast/stages/3-data-platform-dev/demo/providers.tf @@ -0,0 +1,24 @@ +/** + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +provider "google" { + impersonate_service_account = var.impersonate_service_account +} +provider "google-beta" { + impersonate_service_account = var.impersonate_service_account +} + +# end provider.tf for data-product diff --git a/fast/stages/3-data-platform-dev/demo/terraform.tfvars.sample b/fast/stages/3-data-platform-dev/demo/terraform.tfvars.sample new file mode 100644 index 000000000..f405bf3e1 --- /dev/null +++ b/fast/stages/3-data-platform-dev/demo/terraform.tfvars.sample @@ -0,0 +1,10 @@ +authorized_dataset_on_curated = "" +composer_config = { + environment_name = "