Update pipeline example.

This commit is contained in:
Lorenzo Caggioni
2022-03-31 09:34:34 +02:00
parent 730d6d50b1
commit 14a0fbf336

View File

@@ -25,11 +25,11 @@ import os
from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.operators import dummy
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryUpsertTableOperator
# --------------------------------------------------------------------------------
# Set variables
# ------------------------------------------------------------
# Set variables - Needed for the DEMO
# --------------------------------------------------------------------------------
BQ_LOCATION = os.environ.get("BQ_LOCATION")
DTL_L0_PRJ = os.environ.get("DTL_L0_PRJ")
DTL_L0_BQ_DATASET = os.environ.get("DTL_L0_BQ_DATASET")
@@ -114,6 +114,40 @@ with models.DAG(
trigger_rule='all_success'
)
upsert_table_customers = BigQueryUpsertTableOperator(
task_id="upsert_table_customers",
project_id=DTL_L0_PRJ,
dataset_id=DTL_L0_BQ_DATASET,
impersonation_chain=[TRF_SA_DF],
table_resource={
"tableReference": {"tableId": "customers"},
"schema": {
"field": [
{ "mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name" }, #, "policyTags": { "names": [] } },
{ "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname" },
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
]
},
},
)
upsert_table_purchasess = BigQueryUpsertTableOperator(
task_id="upsert_table_purchasess",
project_id=DTL_L0_PRJ,
dataset_id=DTL_L0_BQ_DATASET,
table_resource={
"tableReference": {"tableId": "purchases"},
"schema": [
{ "mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" },
{ "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" },
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
]
},
)
customers_import = DataflowTemplateOperator(
task_id="dataflow_customer_import",
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
@@ -200,5 +234,5 @@ with models.DAG(
},
impersonation_chain=[TRF_SA_BQ]
)
start >> [customers_import, purchases_import] >> join_customer_purchase >> l2_customer_purchase >> end
start >> upsert_table_customers >> end
# start >> [customers_import, purchases_import] >> join_customer_purchase >> l2_customer_purchase >> end