diff --git a/examples/data-solutions/data-platform-foundations/demo/datapipeline.py b/examples/data-solutions/data-platform-foundations/demo/datapipeline.py index fd633ebd8..eb3de1824 100644 --- a/examples/data-solutions/data-platform-foundations/demo/datapipeline.py +++ b/examples/data-solutions/data-platform-foundations/demo/datapipeline.py @@ -25,11 +25,11 @@ import os from airflow import models from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator from airflow.operators import dummy -from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator +from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryUpsertTableOperator # -------------------------------------------------------------------------------- -# Set variables -# ------------------------------------------------------------ +# Set variables - Needed for the DEMO +# -------------------------------------------------------------------------------- BQ_LOCATION = os.environ.get("BQ_LOCATION") DTL_L0_PRJ = os.environ.get("DTL_L0_PRJ") DTL_L0_BQ_DATASET = os.environ.get("DTL_L0_BQ_DATASET") @@ -114,6 +114,40 @@ with models.DAG( trigger_rule='all_success' ) + upsert_table_customers = BigQueryUpsertTableOperator( + task_id="upsert_table_customers", + project_id=DTL_L0_PRJ, + dataset_id=DTL_L0_BQ_DATASET, + impersonation_chain=[TRF_SA_DF], + table_resource={ + "tableReference": {"tableId": "customers"}, + "schema": { + "field": [ + { "mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID" }, + { "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name" }, #, "policyTags": { "names": [] } }, + { "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname" }, + { "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" } + ] + }, + }, + ) + + upsert_table_purchasess = BigQueryUpsertTableOperator( + task_id="upsert_table_purchasess", + project_id=DTL_L0_PRJ, + dataset_id=DTL_L0_BQ_DATASET, + table_resource={ + "tableReference": {"tableId": "purchases"}, + "schema": [ + { "mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID" }, + { "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" }, + { "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" }, + { "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" }, + { "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" } + ] + }, + ) + customers_import = DataflowTemplateOperator( task_id="dataflow_customer_import", template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery", @@ -200,5 +234,5 @@ with models.DAG( }, impersonation_chain=[TRF_SA_BQ] ) - - start >> [customers_import, purchases_import] >> join_customer_purchase >> l2_customer_purchase >> end + start >> upsert_table_customers >> end + # start >> [customers_import, purchases_import] >> join_customer_purchase >> l2_customer_purchase >> end