From e008fde9bb982311a87bdd66d6a4ae1f52239216 Mon Sep 17 00:00:00 2001 From: Lorenzo Caggioni Date: Sun, 3 Apr 2022 22:03:35 +0200 Subject: [PATCH] Update READMEs and separate demo pipelines --- .../data-platform-foundations/README.md | 13 +- .../data-platform-foundations/demo/README.md | 31 +- .../demo/datapipeline.py | 114 +------ .../demo/datapipeline_dc_tags.py | 322 ++++++++++++++++++ .../demo/delete_table.py | 146 ++++++++ fast/stages/03-data-platform/dev/README.md | 6 + 6 files changed, 510 insertions(+), 122 deletions(-) create mode 100644 examples/data-solutions/data-platform-foundations/demo/datapipeline_dc_tags.py create mode 100644 examples/data-solutions/data-platform-foundations/demo/delete_table.py diff --git a/examples/data-solutions/data-platform-foundations/README.md b/examples/data-solutions/data-platform-foundations/README.md index feaaa02bb..99100f17b 100644 --- a/examples/data-solutions/data-platform-foundations/README.md +++ b/examples/data-solutions/data-platform-foundations/README.md @@ -220,17 +220,10 @@ To do this, you need to remove IAM binging at project-level for the `data-analys ## Demo pipeline -The application layer is out of scope of this script, but as a demo, it is provided with a Cloud Composer DAG to mode data from the `landing` area to the `DataLake L2` dataset. +The application layer is out of scope of this script. As a demo purpuse only, several Cloud Composer DAGs are provided. Demos will import data from the `landing` area to the `DataLake L2` dataset suing different features. -Just follow the commands you find in the `demo_commands` Terraform output, go in the Cloud Composer UI and run the `data_pipeline_dag`. +You can find examples in the `[demo](./demo)` folder. -Description of commands: - -- 01: copy sample data to a `landing` Cloud Storage bucket impersonating the `load` service account. -- 02: copy sample data structure definition in the `orchestration` Cloud Storage bucket impersonating the `orchestration` service account. -- 03: copy the Cloud Composer DAG to the Cloud Composer Storage bucket impersonating the `orchestration` service account. -- 04: Open the Cloud Composer Airflow UI and run the imported DAG. -- 05: Run the BigQuery query to see results. ## Variables @@ -268,8 +261,6 @@ Description of commands: Features to add in future releases: -- Add support for Column level access on BigQuery -- Add example templates for Data Catalog - Add example on how to use Cloud Data Loss Prevention - Add solution to handle Tables, Views, and Authorized Views lifecycle - Add solution to handle Metadata lifecycle diff --git a/examples/data-solutions/data-platform-foundations/demo/README.md b/examples/data-solutions/data-platform-foundations/demo/README.md index 2cac9a9a3..5347b2cff 100644 --- a/examples/data-solutions/data-platform-foundations/demo/README.md +++ b/examples/data-solutions/data-platform-foundations/demo/README.md @@ -1,3 +1,32 @@ # Data ingestion Demo -In this folder you can find an example to ingest data on the `data platform` instantiated in [here](../). See details in the [README.m](../#demo-pipeline) to run the demo. \ No newline at end of file +In this folder, you can find an example to ingest data on the `data platform` instantiated [here](../). + +The example is not intended to be a production-ready code. + +## Demo use case +The demo imports purchase data generated by a store. + +## Input files +Data are uploaded to the `landing` GCS bucket. File structure: + - `customers.csv`: Comma separate value with customer information in the following format: Customer ID, Name, Surname, Registration Timestamp + - `purchases.csv`: Comma separate value with customer information in the following format: Item ID, Customer ID, Item, Item price, Purchase Timestamp + +## Data processing pipelines +Different data pipelines are provided to highlight different features and patterns. For the purpose of the example, a single pipeline handle all data lifecycles. When adapting them to your real use case, you may want to evaluate the option to handle each functional step on a separate pipeline or a dedicated tool. For example, you may want to use `Dataform` to handle data schemas lifecycle. + +Below you can find a description of each example: + - Simple import data: [`datapipeline.py`](./datapipeline.py) is a simple pipeline to import provided data from the `landing` Google Cloud Storage bucket to the Data Hub L2 layer joining `customers` and `purchases` tables into `customerpurchase` table. + - Import data with Policy Tags: [`datapipeline_dc_tags.py`](./datapipeline.py) imports provided data from `landing` bucket to the Data Hub L2 layer protecting sensitive data using Data Catalog policy Tags. + - Delete tables: [`delete_table.py`](./delete_table.py) deletes BigQuery tables created by import pipelines. + +## Runnin the demo +To run demo examples, please follow the following steps: + +- 01: copy sample data to the `landing` Cloud Storage bucket impersonating the `load` service account. +- 02: copy sample data structure definition in the `orchestration` Cloud Storage bucket impersonating the `orchestration` service account. +- 03: copy the Cloud Composer DAG to the Cloud Composer Storage bucket impersonating the `orchestration` service account. +- 04: Open the Cloud Composer Airflow UI and run the imported DAG. +- 05: Run the BigQuery query to see results. + +You can find pre-computed commands in the `demo_commands` output variable of the deployed terraform [data pipeline](../). diff --git a/examples/data-solutions/data-platform-foundations/demo/datapipeline.py b/examples/data-solutions/data-platform-foundations/demo/datapipeline.py index 6a1a06be2..1f748c088 100644 --- a/examples/data-solutions/data-platform-foundations/demo/datapipeline.py +++ b/examples/data-solutions/data-platform-foundations/demo/datapipeline.py @@ -116,115 +116,8 @@ with models.DAG( trigger_rule='all_success' ) - with TaskGroup('upsert_table') as upsert_table: - upsert_table_customers = BigQueryUpsertTableOperator( - task_id="upsert_table_customers", - project_id=DTL_L0_PRJ, - dataset_id=DTL_L0_BQ_DATASET, - impersonation_chain=[TRF_SA_DF], - table_resource={ - "tableReference": {"tableId": "customers"}, - }, - ) - - upsert_table_purchases = BigQueryUpsertTableOperator( - task_id="upsert_table_purchases", - project_id=DTL_L0_PRJ, - dataset_id=DTL_L0_BQ_DATASET, - impersonation_chain=[TRF_SA_BQ], - table_resource={ - "tableReference": {"tableId": "purchases"} - }, - ) - - upsert_table_customer_purchase_l1 = BigQueryUpsertTableOperator( - task_id="upsert_table_customer_purchase_l1", - project_id=DTL_L1_PRJ, - dataset_id=DTL_L1_BQ_DATASET, - impersonation_chain=[TRF_SA_BQ], - table_resource={ - "tableReference": {"tableId": "customer_purchase"} - }, - ) - - upsert_table_customer_purchase_l2 = BigQueryUpsertTableOperator( - task_id="upsert_table_customer_purchase_l2", - project_id=DTL_L2_PRJ, - dataset_id=DTL_L2_BQ_DATASET, - impersonation_chain=[TRF_SA_BQ], - table_resource={ - "tableReference": {"tableId": "customer_purchase"} - }, - ) - - with TaskGroup('update_schema_table') as update_schema_table: - update_table_schema_customers = BigQueryUpdateTableSchemaOperator( - task_id="update_table_schema_customers", - project_id=DTL_L0_PRJ, - dataset_id=DTL_L0_BQ_DATASET, - table_id="customers", - impersonation_chain=[TRF_SA_BQ], - include_policy_tags=True, - schema_fields_updates=[ - { "mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID" }, - { "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]}}, - { "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]} }, - { "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" } - ] - ) - - update_table_schema_customers = BigQueryUpdateTableSchemaOperator( - task_id="update_table_schema_purchases", - project_id=DTL_L0_PRJ, - dataset_id=DTL_L0_BQ_DATASET, - table_id="purchases", - impersonation_chain=[TRF_SA_BQ], - include_policy_tags=True, - schema_fields_updates=[ - { "mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID" }, - { "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" }, - { "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" }, - { "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" }, - { "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" } - ] - ) - - update_table_schema_customer_purchase_l1 = BigQueryUpdateTableSchemaOperator( - task_id="update_table_schema_customer_purchase_l1", - project_id=DTL_L1_PRJ, - dataset_id=DTL_L1_BQ_DATASET, - table_id="customer_purchase", - impersonation_chain=[TRF_SA_BQ], - include_policy_tags=True, - schema_fields_updates=[ - { "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" }, - { "mode": "REQUIRED", "name": "purchase_id", "type": "INTEGER", "description": "ID" }, - { "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]}}, - { "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]} }, - { "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" }, - { "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" }, - { "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" } - ] - ) - - update_table_schema_customer_purchase_l2 = BigQueryUpdateTableSchemaOperator( - task_id="update_table_schema_customer_purchase_l2", - project_id=DTL_L2_PRJ, - dataset_id=DTL_L2_BQ_DATASET, - table_id="customer_purchase", - impersonation_chain=[TRF_SA_BQ], - include_policy_tags=True, - schema_fields_updates=[ - { "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" }, - { "mode": "REQUIRED", "name": "purchase_id", "type": "INTEGER", "description": "ID" }, - { "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]}}, - { "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]} }, - { "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" }, - { "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" }, - { "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" } - ] - ) - + # Bigquery Tables automatically created for demo porpuse. + # Consider a dedicated pipeline or tool for a real life scenario. customers_import = DataflowTemplatedJobStartOperator( task_id="dataflow_customers_import", template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery", @@ -315,4 +208,5 @@ with models.DAG( }, impersonation_chain=[TRF_SA_BQ] ) - start >> upsert_table >> update_schema_table >> [customers_import, purchases_import] >> join_customer_purchase >> l2_customer_purchase >> end + + start >> [customers_import, purchases_import] >> join_customer_purchase >> l2_customer_purchase >> end \ No newline at end of file diff --git a/examples/data-solutions/data-platform-foundations/demo/datapipeline_dc_tags.py b/examples/data-solutions/data-platform-foundations/demo/datapipeline_dc_tags.py new file mode 100644 index 000000000..2fb88c9e5 --- /dev/null +++ b/examples/data-solutions/data-platform-foundations/demo/datapipeline_dc_tags.py @@ -0,0 +1,322 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# -------------------------------------------------------------------------------- +# Load The Dependencies +# -------------------------------------------------------------------------------- + +import csv +import datetime +import io +import json +import logging +import os + +from airflow import models +from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator +from airflow.operators import dummy +from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryUpsertTableOperator, BigQueryUpdateTableSchemaOperator +from airflow.utils.task_group import TaskGroup + +# -------------------------------------------------------------------------------- +# Set variables - Needed for the DEMO +# -------------------------------------------------------------------------------- +BQ_LOCATION = os.environ.get("BQ_LOCATION") +DATA_CAT_TAGS = json.loads(os.environ.get("DATA_CAT_TAGS")) +DTL_L0_PRJ = os.environ.get("DTL_L0_PRJ") +DTL_L0_BQ_DATASET = os.environ.get("DTL_L0_BQ_DATASET") +DTL_L0_GCS = os.environ.get("DTL_L0_GCS") +DTL_L1_PRJ = os.environ.get("DTL_L1_PRJ") +DTL_L1_BQ_DATASET = os.environ.get("DTL_L1_BQ_DATASET") +DTL_L1_GCS = os.environ.get("DTL_L1_GCS") +DTL_L2_PRJ = os.environ.get("DTL_L2_PRJ") +DTL_L2_BQ_DATASET = os.environ.get("DTL_L2_BQ_DATASET") +DTL_L2_GCS = os.environ.get("DTL_L2_GCS") +DTL_PLG_PRJ = os.environ.get("DTL_PLG_PRJ") +DTL_PLG_BQ_DATASET = os.environ.get("DTL_PLG_BQ_DATASET") +DTL_PLG_GCS = os.environ.get("DTL_PLG_GCS") +GCP_REGION = os.environ.get("GCP_REGION") +LND_PRJ = os.environ.get("LND_PRJ") +LND_BQ = os.environ.get("LND_BQ") +LND_GCS = os.environ.get("LND_GCS") +LND_PS = os.environ.get("LND_PS") +LOD_PRJ = os.environ.get("LOD_PRJ") +LOD_GCS_STAGING = os.environ.get("LOD_GCS_STAGING") +LOD_NET_VPC = os.environ.get("LOD_NET_VPC") +LOD_NET_SUBNET = os.environ.get("LOD_NET_SUBNET") +LOD_SA_DF = os.environ.get("LOD_SA_DF") +ORC_PRJ = os.environ.get("ORC_PRJ") +ORC_GCS = os.environ.get("ORC_GCS") +TRF_PRJ = os.environ.get("TRF_PRJ") +TRF_GCS_STAGING = os.environ.get("TRF_GCS_STAGING") +TRF_NET_VPC = os.environ.get("TRF_NET_VPC") +TRF_NET_SUBNET = os.environ.get("TRF_NET_SUBNET") +TRF_SA_DF = os.environ.get("TRF_SA_DF") +TRF_SA_BQ = os.environ.get("TRF_SA_BQ") +DF_KMS_KEY = os.environ.get("DF_KMS_KEY", "") +DF_REGION = os.environ.get("GCP_REGION") +DF_ZONE = os.environ.get("GCP_REGION") + "-b" + +# -------------------------------------------------------------------------------- +# Set default arguments +# -------------------------------------------------------------------------------- + +# If you are running Airflow in more than one time zone +# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html +# for best practices +yesterday = datetime.datetime.now() - datetime.timedelta(days=1) + +default_args = { + 'owner': 'airflow', + 'start_date': yesterday, + 'depends_on_past': False, + 'email': [''], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': datetime.timedelta(minutes=5), + 'dataflow_default_options': { + 'location': DF_REGION, + 'zone': DF_ZONE, + 'stagingLocation': LOD_GCS_STAGING, + 'tempLocation': LOD_GCS_STAGING + "/tmp", + 'serviceAccountEmail': LOD_SA_DF, + 'subnetwork': LOD_NET_SUBNET, + 'ipConfiguration': "WORKER_IP_PRIVATE", + 'kmsKeyName' : DF_KMS_KEY + }, +} + +# -------------------------------------------------------------------------------- +# Main DAG +# -------------------------------------------------------------------------------- + +with models.DAG( + 'data_pipeline_dc_tags_dag', + default_args=default_args, + schedule_interval=None) as dag: + start = dummy.DummyOperator( + task_id='start', + trigger_rule='all_success' + ) + + end = dummy.DummyOperator( + task_id='end', + trigger_rule='all_success' + ) + + # Bigquery Tables created here for demo porpuse. + # Consider a dedicated pipeline or tool for a real life scenario. + with TaskGroup('upsert_table') as upsert_table: + upsert_table_customers = BigQueryUpsertTableOperator( + task_id="upsert_table_customers", + project_id=DTL_L0_PRJ, + dataset_id=DTL_L0_BQ_DATASET, + impersonation_chain=[TRF_SA_DF], + table_resource={ + "tableReference": {"tableId": "customers"}, + }, + ) + + upsert_table_purchases = BigQueryUpsertTableOperator( + task_id="upsert_table_purchases", + project_id=DTL_L0_PRJ, + dataset_id=DTL_L0_BQ_DATASET, + impersonation_chain=[TRF_SA_BQ], + table_resource={ + "tableReference": {"tableId": "purchases"} + }, + ) + + upsert_table_customer_purchase_l1 = BigQueryUpsertTableOperator( + task_id="upsert_table_customer_purchase_l1", + project_id=DTL_L1_PRJ, + dataset_id=DTL_L1_BQ_DATASET, + impersonation_chain=[TRF_SA_BQ], + table_resource={ + "tableReference": {"tableId": "customer_purchase"} + }, + ) + + upsert_table_customer_purchase_l2 = BigQueryUpsertTableOperator( + task_id="upsert_table_customer_purchase_l2", + project_id=DTL_L2_PRJ, + dataset_id=DTL_L2_BQ_DATASET, + impersonation_chain=[TRF_SA_BQ], + table_resource={ + "tableReference": {"tableId": "customer_purchase"} + }, + ) + + # Bigquery Tables schema defined here for demo porpuse. + # Consider a dedicated pipeline or tool for a real life scenario. + with TaskGroup('update_schema_table') as update_schema_table: + update_table_schema_customers = BigQueryUpdateTableSchemaOperator( + task_id="update_table_schema_customers", + project_id=DTL_L0_PRJ, + dataset_id=DTL_L0_BQ_DATASET, + table_id="customers", + impersonation_chain=[TRF_SA_BQ], + include_policy_tags=True, + schema_fields_updates=[ + { "mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID" }, + { "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]}}, + { "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]} }, + { "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" } + ] + ) + + update_table_schema_customers = BigQueryUpdateTableSchemaOperator( + task_id="update_table_schema_purchases", + project_id=DTL_L0_PRJ, + dataset_id=DTL_L0_BQ_DATASET, + table_id="purchases", + impersonation_chain=[TRF_SA_BQ], + include_policy_tags=True, + schema_fields_updates=[ + { "mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID" }, + { "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" }, + { "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" }, + { "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" }, + { "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" } + ] + ) + + update_table_schema_customer_purchase_l1 = BigQueryUpdateTableSchemaOperator( + task_id="update_table_schema_customer_purchase_l1", + project_id=DTL_L1_PRJ, + dataset_id=DTL_L1_BQ_DATASET, + table_id="customer_purchase", + impersonation_chain=[TRF_SA_BQ], + include_policy_tags=True, + schema_fields_updates=[ + { "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" }, + { "mode": "REQUIRED", "name": "purchase_id", "type": "INTEGER", "description": "ID" }, + { "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]}}, + { "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]} }, + { "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" }, + { "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" }, + { "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" } + ] + ) + + update_table_schema_customer_purchase_l2 = BigQueryUpdateTableSchemaOperator( + task_id="update_table_schema_customer_purchase_l2", + project_id=DTL_L2_PRJ, + dataset_id=DTL_L2_BQ_DATASET, + table_id="customer_purchase", + impersonation_chain=[TRF_SA_BQ], + include_policy_tags=True, + schema_fields_updates=[ + { "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" }, + { "mode": "REQUIRED", "name": "purchase_id", "type": "INTEGER", "description": "ID" }, + { "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]}}, + { "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]} }, + { "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" }, + { "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" }, + { "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" } + ] + ) + + customers_import = DataflowTemplatedJobStartOperator( + task_id="dataflow_customers_import", + template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery", + project_id=LOD_PRJ, + location=DF_REGION, + parameters={ + "javascriptTextTransformFunctionName": "transform", + "JSONPath": ORC_GCS + "/customers_schema.json", + "javascriptTextTransformGcsPath": ORC_GCS + "/customers_udf.js", + "inputFilePattern": LND_GCS + "/customers.csv", + "outputTable": DTL_L0_PRJ + ":"+DTL_L0_BQ_DATASET+".customers", + "bigQueryLoadingTemporaryDirectory": LOD_GCS_STAGING + "/tmp/bq/", + }, + ) + + purchases_import = DataflowTemplatedJobStartOperator( + task_id="dataflow_purchases_import", + template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery", + project_id=LOD_PRJ, + location=DF_REGION, + parameters={ + "javascriptTextTransformFunctionName": "transform", + "JSONPath": ORC_GCS + "/purchases_schema.json", + "javascriptTextTransformGcsPath": ORC_GCS + "/purchases_udf.js", + "inputFilePattern": LND_GCS + "/purchases.csv", + "outputTable": DTL_L0_PRJ + ":"+DTL_L0_BQ_DATASET+".purchases", + "bigQueryLoadingTemporaryDirectory": LOD_GCS_STAGING + "/tmp/bq/", + }, + ) + + join_customer_purchase = BigQueryInsertJobOperator( + task_id='bq_join_customer_purchase', + gcp_conn_id='bigquery_default', + project_id=TRF_PRJ, + location=BQ_LOCATION, + configuration={ + 'jobType':'QUERY', + 'query':{ + 'query':"""SELECT + c.id as customer_id, + p.id as purchase_id, + c.name as name, + c.surname as surname, + p.item as item, + p.price as price, + p.timestamp as timestamp + FROM `{dtl_0_prj}.{dtl_0_dataset}.customers` c + JOIN `{dtl_0_prj}.{dtl_0_dataset}.purchases` p ON c.id = p.customer_id + """.format(dtl_0_prj=DTL_L0_PRJ, dtl_0_dataset=DTL_L0_BQ_DATASET, ), + 'destinationTable':{ + 'projectId': DTL_L1_PRJ, + 'datasetId': DTL_L1_BQ_DATASET, + 'tableId': 'customer_purchase' + }, + 'writeDisposition':'WRITE_TRUNCATE', + "useLegacySql": False + } + }, + impersonation_chain=[TRF_SA_BQ] + ) + + l2_customer_purchase = BigQueryInsertJobOperator( + task_id='bq_l2_customer_purchase', + gcp_conn_id='bigquery_default', + project_id=TRF_PRJ, + location=BQ_LOCATION, + configuration={ + 'jobType':'QUERY', + 'query':{ + 'query':"""SELECT + customer_id, + purchase_id, + name, + surname, + item, + price, + timestamp + FROM `{dtl_1_prj}.{dtl_1_dataset}.customer_purchase` + """.format(dtl_1_prj=DTL_L1_PRJ, dtl_1_dataset=DTL_L1_BQ_DATASET, ), + 'destinationTable':{ + 'projectId': DTL_L2_PRJ, + 'datasetId': DTL_L2_BQ_DATASET, + 'tableId': 'customer_purchase' + }, + 'writeDisposition':'WRITE_TRUNCATE', + "useLegacySql": False + } + }, + impersonation_chain=[TRF_SA_BQ] + ) + start >> upsert_table >> update_schema_table >> [customers_import, purchases_import] >> join_customer_purchase >> l2_customer_purchase >> end diff --git a/examples/data-solutions/data-platform-foundations/demo/delete_table.py b/examples/data-solutions/data-platform-foundations/demo/delete_table.py new file mode 100644 index 000000000..a2585a68b --- /dev/null +++ b/examples/data-solutions/data-platform-foundations/demo/delete_table.py @@ -0,0 +1,146 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# -------------------------------------------------------------------------------- +# Load The Dependencies +# -------------------------------------------------------------------------------- + +import csv +import datetime +import io +import json +import logging +import os + +from airflow import models +from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator +from airflow.operators import dummy +from airflow.providers.google.cloud.operators.bigquery import BigQueryDeleteTableOperator +from airflow.utils.task_group import TaskGroup + +# -------------------------------------------------------------------------------- +# Set variables - Needed for the DEMO +# -------------------------------------------------------------------------------- +BQ_LOCATION = os.environ.get("BQ_LOCATION") +DATA_CAT_TAGS = json.loads(os.environ.get("DATA_CAT_TAGS")) +DTL_L0_PRJ = os.environ.get("DTL_L0_PRJ") +DTL_L0_BQ_DATASET = os.environ.get("DTL_L0_BQ_DATASET") +DTL_L0_GCS = os.environ.get("DTL_L0_GCS") +DTL_L1_PRJ = os.environ.get("DTL_L1_PRJ") +DTL_L1_BQ_DATASET = os.environ.get("DTL_L1_BQ_DATASET") +DTL_L1_GCS = os.environ.get("DTL_L1_GCS") +DTL_L2_PRJ = os.environ.get("DTL_L2_PRJ") +DTL_L2_BQ_DATASET = os.environ.get("DTL_L2_BQ_DATASET") +DTL_L2_GCS = os.environ.get("DTL_L2_GCS") +DTL_PLG_PRJ = os.environ.get("DTL_PLG_PRJ") +DTL_PLG_BQ_DATASET = os.environ.get("DTL_PLG_BQ_DATASET") +DTL_PLG_GCS = os.environ.get("DTL_PLG_GCS") +GCP_REGION = os.environ.get("GCP_REGION") +LND_PRJ = os.environ.get("LND_PRJ") +LND_BQ = os.environ.get("LND_BQ") +LND_GCS = os.environ.get("LND_GCS") +LND_PS = os.environ.get("LND_PS") +LOD_PRJ = os.environ.get("LOD_PRJ") +LOD_GCS_STAGING = os.environ.get("LOD_GCS_STAGING") +LOD_NET_VPC = os.environ.get("LOD_NET_VPC") +LOD_NET_SUBNET = os.environ.get("LOD_NET_SUBNET") +LOD_SA_DF = os.environ.get("LOD_SA_DF") +ORC_PRJ = os.environ.get("ORC_PRJ") +ORC_GCS = os.environ.get("ORC_GCS") +TRF_PRJ = os.environ.get("TRF_PRJ") +TRF_GCS_STAGING = os.environ.get("TRF_GCS_STAGING") +TRF_NET_VPC = os.environ.get("TRF_NET_VPC") +TRF_NET_SUBNET = os.environ.get("TRF_NET_SUBNET") +TRF_SA_DF = os.environ.get("TRF_SA_DF") +TRF_SA_BQ = os.environ.get("TRF_SA_BQ") +DF_KMS_KEY = os.environ.get("DF_KMS_KEY", "") +DF_REGION = os.environ.get("GCP_REGION") +DF_ZONE = os.environ.get("GCP_REGION") + "-b" + +# -------------------------------------------------------------------------------- +# Set default arguments +# -------------------------------------------------------------------------------- + +# If you are running Airflow in more than one time zone +# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html +# for best practices +yesterday = datetime.datetime.now() - datetime.timedelta(days=1) + +default_args = { + 'owner': 'airflow', + 'start_date': yesterday, + 'depends_on_past': False, + 'email': [''], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': datetime.timedelta(minutes=5), + 'dataflow_default_options': { + 'location': DF_REGION, + 'zone': DF_ZONE, + 'stagingLocation': LOD_GCS_STAGING, + 'tempLocation': LOD_GCS_STAGING + "/tmp", + 'serviceAccountEmail': LOD_SA_DF, + 'subnetwork': LOD_NET_SUBNET, + 'ipConfiguration': "WORKER_IP_PRIVATE", + 'kmsKeyName' : DF_KMS_KEY + }, +} + +# -------------------------------------------------------------------------------- +# Main DAG +# -------------------------------------------------------------------------------- + +with models.DAG( + 'delete_tables_dag', + default_args=default_args, + schedule_interval=None) as dag: + start = dummy.DummyOperator( + task_id='start', + trigger_rule='all_success' + ) + + end = dummy.DummyOperator( + task_id='end', + trigger_rule='all_success' + ) + + # Bigquery Tables deleted here for demo porpuse. + # Consider a dedicated pipeline or tool for a real life scenario. + with TaskGroup('delete_table') as delte_table: + delete_table_customers = BigQueryDeleteTableOperator( + task_id="delete_table_customers", + deletion_dataset_table=DTL_L0_PRJ+"."+DTL_L0_BQ_DATASET+".customers", + impersonation_chain=[TRF_SA_DF] + ) + + delete_table_purchases = BigQueryDeleteTableOperator( + task_id="delete_table_purchases", + deletion_dataset_table=DTL_L0_PRJ+"."+DTL_L0_BQ_DATASET+".purchases", + impersonation_chain=[TRF_SA_DF] + ) + + delete_table_customer_purchase_l1 = BigQueryDeleteTableOperator( + task_id="delete_table_customer_purchase_l1", + deletion_dataset_table=DTL_L1_PRJ+"."+DTL_L1_BQ_DATASET+".customer_purchase", + impersonation_chain=[TRF_SA_DF] + ) + + delete_table_customer_purchase_l2 = BigQueryDeleteTableOperator( + task_id="delete_table_customer_purchase_l2", + deletion_dataset_table=DTL_L2_PRJ+"."+DTL_L2_BQ_DATASET+".customer_purchase", + impersonation_chain=[TRF_SA_DF] + ) + + start >> delte_table >> end diff --git a/fast/stages/03-data-platform/dev/README.md b/fast/stages/03-data-platform/dev/README.md index 1d833a2a8..5ecc2ad51 100644 --- a/fast/stages/03-data-platform/dev/README.md +++ b/fast/stages/03-data-platform/dev/README.md @@ -129,6 +129,12 @@ terraform init terraform apply ``` +## Demo pipeline + +The application layer is out of scope of this script. As a demo purpuse only, several Cloud Composer DAGs are provided. Demos will import data from the `landing` area to the `DataLake L2` dataset suing different features. + +You can find examples in the `[demo](../../../../examples/data-solutions/data-platform-foundations/demo)` folder. +