diff --git a/data-solutions/gcs-to-bq/README.md b/data-solutions/gcs-to-bq/README.md new file mode 100644 index 000000000..f1a13578f --- /dev/null +++ b/data-solutions/gcs-to-bq/README.md @@ -0,0 +1,119 @@ +# GCE and GCS CMEK via centralized Cloud KMS + +This example creates the infrastructure needed to run a [Cloud Dataflow](https://cloud.google.com/dataflow) pipeline to import data from [GCS](https://cloud.google.com/storage) to [Bigquery](https://cloud.google.com/bigquery). + +The solution will use: + - internal IP for GCE and Dataflow instances + - CMEK encription for GCS bucket, GCE instances, DataFlow instances and BigQuery Tables + - Cloud NAT to let resource comunicate to internet to run updates and packages installation + +The example is designed to match real-world use cases with a minimum amount of resources. It can be used as a starting point for more complex scenarios. + +This is the high level diagram: + +![GCS to Biquery High-level diagram](diagram.png "GCS to Biquery High-level diagram") + +## Managed resources and services + +This sample creates several distinct groups of resources: + +- projects + - Cloud KMS project + - Service Project configured for GCE instances, GCS buckets, Dataflow instances and BigQuery tables +- networking + - VPC network + - One subnet + - Firewall rules for [SSH access via IAP](https://cloud.google.com/iap/docs/using-tcp-forwarding) and open communication within the VPC +- IAM + - One service account for GGE instances + - One service account for Dataflow instances + - One service account for Bigquery tables +- KMS + - One contintent key ring (example: 'Europe') + - One crypto key (Procection level: softwere) for Cloud Engine + - One crypto key (Protection level: softwere) for Cloud Storage + - One regional key ring ('example: 'europe-west1') + - One crypto key (Protection level: softwere) for Cloud Dataflow +- GCE + - One instance encrypted with a CMEK Cryptokey hosted in Cloud KMS +- GCS + - One bucket encrypted with a CMEK Cryptokey hosted in Cloud KMS +- BQ + - One dataset encrypted with a CMEK Cryptokey hosted in Cloud KMS + - Two tables encrypted with a CMEK Cryptokey hosted in Cloud KMS + + +## Variables + +| name | description | type | required | default | +|---|---|:---: |:---:|:---:| +| billing_account | Billing account id used as default for new projects. | string | ✓ | | +| project_kms_name | Name for the new KMS Project. | string | ✓ | | +| project_service_name | Name for the new Service Project. | string | ✓ | | +| root_node | The resource name of the parent Folder or Organization. Must be of the form folders/folder_id or organizations/org_id. | string | ✓ | | +| *location* | The location where resources will be deployed. | string | | europe | +| *region* | The region where resources will be deployed. | string | | europe-west1 | +| *ssh_source_ranges* | IP CIDR ranges that will be allowed to connect via SSH to the onprem instance. | list(string) | | ["0.0.0.0/0"] | +| *vpc_ip_cidr_range* | Ip range used in the subnet deployef in the Service Project. | string | | 10.0.0.0/20 | +| *vpc_name* | Name of the VPC created in the Service Project. | string | | local | +| *vpc_subnet_name* | Name of the subnet created in the Service Project. | string | | subnet | +| *zone* | The zone where resources will be deployed. | string | | europe-west1-b | + +## Outputs + +| name | description | sensitive | +|---|---|:---:| +| bq_tables | Bigquery Tables. | | +| buckets | GCS Bucket Cloud KMS crypto keys. | | +| projects | Project ids. | | +| vm | GCE VMs. | | + + +## Test your environment +You can now connect to the GCE instance with the following command: + +```hcl + gcloud compute ssh vm-example-1 +``` + +You can run now the simple pipeline you can find [here](./script/data_ingestion/). Once you have installed required packages and copied a file into the GCS bucket, you can trigger the pipeline using internal ips with a command simila to: + +```hcl +python data_ingestion.py \ +--runner=DataflowRunner \ +--max_num_workers=10 \ +--autoscaling_algorithm=THROUGHPUT_BASED \ +--region=### REGION ### \ +--staging_location=gs://### TEMP BUCKET NAME ###/ \ +--temp_location=gs://### TEMP BUCKET NAME ###/ \ +--project=### PROJECT ID ### \ +--input=gs://### DATA BUCKET NAME###/### FILE NAME ###.csv \ +--output=### DATASET NAME ###.### TABLE NAME ### \ +--service_account_email=### SERVICE ACCOUNT EMAIL ### \ +--network=### NETWORK NAME ### \ +--subnetwork=### SUBNET NAME ### \ +--dataflow_kms_key=### CRYPTOKEY ID ### \ +--no_use_public_ips +``` + +for example: + +```hcl +python data_ingestion.py \ +--runner=DataflowRunner \ +--max_num_workers=10 \ +--autoscaling_algorithm=THROUGHPUT_BASED \ +--region=europe-west1 \ +--staging_location=gs://lc-001-eu-df-tmplocation/ \ +--temp_location=gs://lc-001-eu-df-tmplocation/ \ +--project=lcaggio-demo-001 \ +--input=gs://lc-001-eu-data/person.csv \ +--output=bq_dataset.df_import \ +--service_account_email=df-test@lcaggio-aa-demo-001.iam.gserviceaccount.com \ +--network=local \ +--subnetwork=regions/europe-west1/subnetworks/subnet \ +--dataflow_kms_key=projects/lcaggio-demo-kms/locations/europe-west1/keyRings/my-keyring-regional/cryptoKeys/key-df \ +--no_use_public_ips +``` + +You can check data imported into Google BigQuery from the Google Cloud Console UI. \ No newline at end of file diff --git a/data-solutions/gcs-to-bq/backend.tf.sample b/data-solutions/gcs-to-bq/backend.tf.sample new file mode 100644 index 000000000..a540c7cd1 --- /dev/null +++ b/data-solutions/gcs-to-bq/backend.tf.sample @@ -0,0 +1,20 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +terraform { + backend "gcs" { + bucket = "" + } +} diff --git a/data-solutions/gcs-to-bq/diagram.png b/data-solutions/gcs-to-bq/diagram.png new file mode 100644 index 000000000..2b1cb1988 Binary files /dev/null and b/data-solutions/gcs-to-bq/diagram.png differ diff --git a/data-solutions/gcs-to-bq/main.tf b/data-solutions/gcs-to-bq/main.tf new file mode 100644 index 000000000..2ffc352a8 --- /dev/null +++ b/data-solutions/gcs-to-bq/main.tf @@ -0,0 +1,341 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +locals { + vm-startup-script = join("\n", [ + "#! /bin/bash", + "apt-get update && apt-get install -y bash-completion git python3-venv gcc build-essential python-dev" + ]) +} + +############################################################################### +# Projects - Centralized # +############################################################################### + +module "project-service" { + source = "../../modules/project" + name = var.project_service_name + parent = var.root_node + billing_account = var.billing_account + services = [ + "compute.googleapis.com", + "servicenetworking.googleapis.com", + "storage-component.googleapis.com", + "bigquery.googleapis.com", + "bigquerystorage.googleapis.com", + "bigqueryreservation.googleapis.com", + "dataflow.googleapis.com", + "cloudkms.googleapis.com", + ] + oslogin = true +} + +module "project-kms" { + source = "../../modules/project" + name = var.project_kms_name + parent = var.root_node + billing_account = var.billing_account + services = [ + "cloudkms.googleapis.com", + ] +} + +############################################################################### +# Project Service Accounts # +############################################################################### + +module "service-account-bq" { + source = "../../modules/iam-service-accounts" + project_id = module.project-service.project_id + names = ["bq-test"] + iam_project_roles = { + (module.project-service.project_id) = [ + "roles/logging.logWriter", + "roles/monitoring.metricWriter", + "roles/bigquery.admin" + ] + } +} + +module "service-account-gce" { + source = "../../modules/iam-service-accounts" + project_id = module.project-service.project_id + names = ["gce-test"] + iam_project_roles = { + (module.project-service.project_id) = [ + "roles/logging.logWriter", + "roles/monitoring.metricWriter", + "roles/dataflow.admin", + "roles/iam.serviceAccountUser", + "roles/bigquery.dataOwner", + "roles/bigquery.jobUser" # Needed to import data using 'bq' command + ] + } +} + +module "service-account-df" { + source = "../../modules/iam-service-accounts" + project_id = module.project-service.project_id + names = ["df-test"] + iam_project_roles = { + (module.project-service.project_id) = [ + "roles/dataflow.worker", + "roles/bigquery.dataOwner", + "roles/bigquery.metadataViewer", + "roles/storage.objectViewer", + "roles/bigquery.jobUser" + ] + } +} + +data "google_bigquery_default_service_account" "bq_sa" { + project = module.project-service.project_id +} + +data "google_storage_project_service_account" "gcs_account" { + project = module.project-service.project_id +} + +############################################################################### +# KMS # +############################################################################### + +module "kms" { + source = "../../modules/kms" + project_id = module.project-kms.project_id + keyring = { + name = "my-keyring", + location = var.location + } + keys = { key-gce = null, key-gcs = null, key-bq = null } + key_iam_roles = { + key-gce = ["roles/cloudkms.cryptoKeyEncrypterDecrypter"] + key-gcs = ["roles/cloudkms.cryptoKeyEncrypterDecrypter"] + key-bq = ["roles/cloudkms.cryptoKeyEncrypterDecrypter"] + } + key_iam_members = { + key-gce = { + "roles/cloudkms.cryptoKeyEncrypterDecrypter" = [ + "serviceAccount:${module.project-service.service_accounts.robots.compute}", + ] + }, + key-gcs = { + "roles/cloudkms.cryptoKeyEncrypterDecrypter" = [ + #"serviceAccount:${module.project-service.service_accounts.robots.storage}", + "serviceAccount:${data.google_storage_project_service_account.gcs_account.email_address}" + ] + }, + key-bq = { + "roles/cloudkms.cryptoKeyEncrypterDecrypter" = [ + # TODO: Find a better place to store BQ service account + #"serviceAccount:${module.project-service.service_accounts.default.bq}", + "serviceAccount:${data.google_bigquery_default_service_account.bq_sa.email}", + ] + }, + } +} + +module "kms-regional" { + source = "../../modules/kms" + project_id = module.project-kms.project_id + keyring = { + name = "my-keyring-regional", + location = var.region + } + keys = { key-df = null } + key_iam_roles = { + key-df = ["roles/cloudkms.cryptoKeyEncrypterDecrypter"] + } + key_iam_members = { + key-df = { + "roles/cloudkms.cryptoKeyEncrypterDecrypter" = [ + "serviceAccount:${module.project-service.service_accounts.robots.dataflow}", + "serviceAccount:${module.project-service.service_accounts.robots.compute}", + ] + } + } +} + +############################################################################### +# Networking # +############################################################################### + +module "vpc" { + source = "../../modules/net-vpc" + project_id = module.project-service.project_id + name = var.vpc_name + subnets = [ + { + ip_cidr_range = var.vpc_ip_cidr_range + name = var.vpc_subnet_name + region = var.region + secondary_ip_range = {} + } + ] +} + +module "vpc-firewall" { + source = "../../modules/net-vpc-firewall" + project_id = module.project-service.project_id + network = module.vpc.name + admin_ranges_enabled = true + admin_ranges = [var.vpc_ip_cidr_range] +} + +module "nat" { + source = "../../modules/net-cloudnat" + project_id = module.project-service.project_id + region = var.region + name = "default" + router_network = module.vpc.name +} + +############################################################################### +# GCE # +############################################################################### + +module "vm_example" { + source = "../../modules/compute-vm" + project_id = module.project-service.project_id + region = var.region + zone = var.zone + name = "vm-example" + network_interfaces = [{ + network = module.vpc.self_link, + subnetwork = module.vpc.subnet_self_links["${var.region}/${var.vpc_subnet_name}"], + nat = false, + addresses = null + }] + attached_disks = [ + { + name = "attacheddisk" + size = 10 + image = null + options = { + auto_delete = true + mode = null + source = null + type = null + } + } + ] + instance_count = 1 + boot_disk = { + image = "projects/debian-cloud/global/images/family/debian-10" + type = "pd-ssd" + size = 10 + encrypt_disk = true + } + encryption = { + encrypt_boot = true + disk_encryption_key_raw = null + kms_key_self_link = module.kms.key_self_links.key-gce + } + metadata = { startup-script = local.vm-startup-script } + service_account = module.service-account-gce.email + service_account_scopes = ["https://www.googleapis.com/auth/cloud-platform"] + tags = ["ssh"] +} + +############################################################################### +# GCS # +############################################################################### + +module "kms-gcs" { + source = "../../modules/gcs" + project_id = module.project-service.project_id + prefix = module.project-service.project_id + names = ["data", "df-tmplocation"] + iam_roles = { + data = ["roles/storage.admin","roles/storage.objectViewer"], + df-tmplocation = ["roles/storage.admin"] + } + iam_members = { + data = { + "roles/storage.admin" = [ + "serviceAccount:${module.service-account-gce.email}", + ], + "roles/storage.viewer" = [ + "serviceAccount:${module.service-account-df.email}", + ], + }, + df-tmplocation = { + "roles/storage.admin" = [ + "serviceAccount:${module.service-account-gce.email}", + "serviceAccount:${module.service-account-df.email}", + ] + } + } + encryption_keys = { + data = module.kms.keys.key-gcs.self_link, + df-tmplocation = module.kms.keys.key-gcs.self_link, + } + force_destroy = { + data = true, + df-tmplocation = true, + } +} + +############################################################################### +# BQ # +############################################################################### + +module "bigquery-dataset" { + source = "../../modules/bigquery-dataset" + project_id = module.project-service.project_id + id = "bq_dataset" + access_roles = { + reader-group = { role = "READER", type = "domain" } + owner = { role = "OWNER", type = "user_by_email" } + } + access_identities = { + reader-group = "caggioland.com" + owner = module.service-account-bq.email + } + encryption_key = module.kms.keys.key-bq.self_link + tables = { + bq_import = { + friendly_name = "BQ import" + labels = {} + options = null + partitioning = { + field = null + range = null # use start/end/interval for range + time = null + } + schema = file("schema_bq_import.json") + options = { + clustering = null + expiration_time = null + encryption_key = module.kms.keys.key-bq.self_link + } + }, + df_import = { + friendly_name = "Dataflow import" + labels = {} + options = null + partitioning = { + field = null + range = null # use start/end/interval for range + time = null + } + schema = file("schema_df_import.json") + options = { + clustering = null + expiration_time = null + encryption_key = module.kms.keys.key-bq.self_link + } + } + } +} diff --git a/data-solutions/gcs-to-bq/outputs.tf b/data-solutions/gcs-to-bq/outputs.tf new file mode 100644 index 000000000..9dc4de31c --- /dev/null +++ b/data-solutions/gcs-to-bq/outputs.tf @@ -0,0 +1,42 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +output "bq_tables" { + description = "Bigquery Tables." + value = module.bigquery-dataset.table_ids +} + +output "buckets" { + description = "GCS Bucket Cloud KMS crypto keys." + value = { + for bucket in module.kms-gcs.buckets : + bucket.name => bucket.url + } +} + +output "projects" { + description = "Project ids." + value = { + service-project = module.project-service.project_id + kms-project = module.project-kms.project_id + } +} + +output "vm" { + description = "GCE VMs." + value = { + for instance in module.vm_example.instances : + instance.name => instance.network_interface.0.network_ip + } +} diff --git a/data-solutions/gcs-to-bq/schema_bq_import.json b/data-solutions/gcs-to-bq/schema_bq_import.json new file mode 100644 index 000000000..e26fa3d23 --- /dev/null +++ b/data-solutions/gcs-to-bq/schema_bq_import.json @@ -0,0 +1,14 @@ +[ + { + "name": "name", + "type": "STRING" + }, + { + "name": "surname", + "type": "STRING" + }, + { + "name": "age", + "type": "NUMERIC" + } +] \ No newline at end of file diff --git a/data-solutions/gcs-to-bq/schema_df_import.json b/data-solutions/gcs-to-bq/schema_df_import.json new file mode 100644 index 000000000..516df6fa2 --- /dev/null +++ b/data-solutions/gcs-to-bq/schema_df_import.json @@ -0,0 +1,18 @@ +[ + { + "name": "name", + "type": "STRING" + }, + { + "name": "surname", + "type": "STRING" + }, + { + "name": "age", + "type": "NUMERIC" + }, + { + "name": "_TIMESTAMP", + "type": "TIMESTAMP" + } +] \ No newline at end of file diff --git a/data-solutions/gcs-to-bq/scripts/data_ingestion/README.md b/data-solutions/gcs-to-bq/scripts/data_ingestion/README.md new file mode 100644 index 000000000..1f1bac716 --- /dev/null +++ b/data-solutions/gcs-to-bq/scripts/data_ingestion/README.md @@ -0,0 +1,81 @@ +# Ingest CSV files from GCS into Bigquery + +In this example we create a Python [Apache Beam](https://beam.apache.org/) pipeline running on [Google Cloud Dataflow](https://cloud.google.com/dataflow/) to import CSV files into BigQuery adding a timestamp to each row. Below the architecture used: + +![Apache Beam pipeline to import CSV from GCS into BQ](diagram.png) + +The architecture uses: +* [Google Cloud Storage]() to store CSV source files +* [Google Cloud Dataflow](https://cloud.google.com/dataflow/) to read files from Google Cloud Storage, Transform data base on the structure of the file and import the data into Google BigQuery +* [Google BigQuery](https://cloud.google.com/bigquery/) to store data in a Data Lake. + +You can use this script as a starting point to import your files into Google BigQuery. You'll probably need to adapt the script logic to your requirements. + +## 1. Prerequisites + - Up and running GCP project with enabled billing account + - gcloud installed and initiated to your project + - Google Cloud Dataflow API enabled + - Google Cloud Storage Bucket containing the file to import (CSV format) containings name, surnames and age. Example: `Mario,Rossi,30`. + - Google Cloud Storage Bucket for temp and staging Google Dataflow files + - Google BigQuery dataset + - [Python](https://www.python.org/) >= 3.7 and python-dev module + - gcc + - Google Cloud [Application Default Credentials](https://cloud.google.com/sdk/gcloud/reference/auth/application-default/login) + +## 2. Create virtual environment +Create a new virtual environment (recommended) and install requirements: + +``` +virtualenv env +source ./env/bin/activate +pip install -r requirements.txt +``` + +## 4. Upload files into Google Cloud Storage +Upload files to be imported into Google Bigquery in a Google Cloud Storage Bucket. You can use `gsutil` using a command like: +``` +gsutil cp [LOCAL_OBJECT_LOCATION] gs://[DESTINATION_BUCKET_NAME]/ +``` + +Files need to be in CSV format,For example: +``` +Enrico,Bianchi,20 +Mario,Rossi,30 +``` + +You can use the [person_details_generator](../person_details_generator/) script if you want to create random person details. + +## 5. Run pipeline +You can check parameters accepted by the `data_ingestion.py` script with the following command: +``` +python pipelines/data_ingestion --help +``` + +You can run the pipeline locally with the following command: +``` +python data_ingestion.py \ +--runner=DirectRunner \ +--project=###PUT HERE PROJECT ID### \ +--input=###PUT HERE THE FILE TO IMPORT. EXAMPLE: gs://bucket_name/person.csv ### \ +--output=###PUT HERE BQ DATASET.TABLE### +``` + +or you can run the pipeline on Google Dataflow using the following command: + +``` +python pipelines/data_ingestion_configurable.py \ +--runner=DataflowRunner \ +--max_num_workers=100 \ +--autoscaling_algorithm=THROUGHPUT_BASED \ +--region=###PUT HERE REGION### \ +--staging_location=###PUT HERE GCS STAGING LOCATION### \ +--temp_location=###PUT HERE GCS TMP LOCATION###\ +--project=###PUT HERE PROJECT ID### \ +--input-bucket=###PUT HERE GCS BUCKET NAME### \ +--input-path=###PUT HERE INPUT FOLDER### \ +--input-files=###PUT HERE FILE NAMES### \ +--bq-dataset=###PUT HERE BQ DATASET NAME### +``` + +## 6. Check results +You can check data imported into Google BigQuery from the Google Cloud Console UI. \ No newline at end of file diff --git a/data-solutions/gcs-to-bq/scripts/data_ingestion/REQUIREMENTS.txt b/data-solutions/gcs-to-bq/scripts/data_ingestion/REQUIREMENTS.txt new file mode 100644 index 000000000..ce9b3d903 --- /dev/null +++ b/data-solutions/gcs-to-bq/scripts/data_ingestion/REQUIREMENTS.txt @@ -0,0 +1,2 @@ +wheel +apache-beam diff --git a/data-solutions/gcs-to-bq/scripts/data_ingestion/data_ingestion.py b/data-solutions/gcs-to-bq/scripts/data_ingestion/data_ingestion.py new file mode 100644 index 000000000..54c89cd42 --- /dev/null +++ b/data-solutions/gcs-to-bq/scripts/data_ingestion/data_ingestion.py @@ -0,0 +1,133 @@ +# Copyright 2020 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""`data_ingestion.py` is a Dataflow pipeline which reads a CSV file and +writes its contents to a BigQuery table adding a timestamp. +""" + +import argparse +import logging +import re + +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions + + +class DataIngestion: + """A helper class which contains the logic to translate the file into + a format BigQuery will accept.""" + + def parse_method(self, string_input): + """Translate CSV row to dictionary. + Args: + string_input: A comma separated list of values in the form of + name,surname + Example string_input: lorenzo,caggioni + Returns: + A dict mapping BigQuery column names as keys + example output: + { + 'name': 'mario', + 'surname': 'rossi', + 'age': 30 + } + """ + # Strip out carriage return, newline and quote characters. + values = re.split(",", re.sub('\r\n', '', re.sub('"', '', + string_input))) + row = dict( + zip(('name', 'surname', 'age'), + values)) + return row + + +class InjectTimestamp(beam.DoFn): + """A class which add a timestamp for each row. + Args: + element: A dictionary mapping BigQuery column names + Example: + { + 'name': 'mario', + 'surname': 'rossi', + 'age': 30 + } + Returns: + The input dictionary with a timestamp value added + Example: + { + 'name': 'mario', + 'surname': 'rossi', + 'age': 30 + '_TIMESTAMP': 1545730073 + } + """ + + def process(self, element): + import time + element['_TIMESTAMP'] = int(time.mktime(time.gmtime())) + return [element] + + +def run(argv=None): + """The main function which creates the pipeline and runs it.""" + + parser = argparse.ArgumentParser() + + parser.add_argument( + '--input', + dest='input', + required=False, + help='Input file to read. This can be a local file or ' + 'a file in a Google Storage Bucket.') + + parser.add_argument( + '--output', + dest='output', + required=False, + help='Output BQ table to write results to.') + + # Parse arguments from the command line. + known_args, pipeline_args = parser.parse_known_args(argv) + + # DataIngestion is a class we built in this script to hold the logic for + # transforming the file into a BigQuery table. + data_ingestion = DataIngestion() + + # Initiate the pipeline using the pipeline arguments + p = beam.Pipeline(options=PipelineOptions(pipeline_args)) + + (p + # Read the file. This is the source of the pipeline. + | 'Read from a File' >> beam.io.ReadFromText(known_args.input) + # Translates CSV row to a dictionary object consumable by BigQuery. + | 'String To BigQuery Row' >> + beam.Map(lambda s: data_ingestion.parse_method(s)) + # Add the timestamp on each row + | 'Inject Timestamp - ' >> beam.ParDo(InjectTimestamp()) + # Write data to Bigquery + | 'Write to BigQuery' >> beam.io.Write( + beam.io.BigQuerySink( + # BigQuery table name. + known_args.output, + # Bigquery table schema + schema='name:STRING,surname:STRING,age:NUMERIC,_TIMESTAMP:TIMESTAMP', + # Creates the table in BigQuery if it does not yet exist. + create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER, + # Deletes all data in the BigQuery table before writing. + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))) + p.run().wait_until_finish() + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() diff --git a/data-solutions/gcs-to-bq/scripts/data_ingestion/diagram.png b/data-solutions/gcs-to-bq/scripts/data_ingestion/diagram.png new file mode 100644 index 000000000..22e4518f7 Binary files /dev/null and b/data-solutions/gcs-to-bq/scripts/data_ingestion/diagram.png differ diff --git a/data-solutions/gcs-to-bq/scripts/person_details_generator/README.md b/data-solutions/gcs-to-bq/scripts/person_details_generator/README.md new file mode 100644 index 000000000..43832cdff --- /dev/null +++ b/data-solutions/gcs-to-bq/scripts/person_details_generator/README.md @@ -0,0 +1,17 @@ +# Create random Person PII data + +In this example you can find a Python script to generate Person PII data in a CSV file format. + +To know how to use the script run: + +```hcl +python3 person_details_generator.py --help +``` + +## Example +To create a file 'person.csv' with 10000 of random person details data you can run: +```hcl +python3 person_details_generator.py \ +--count 10000 \ +--output person.csv +``` \ No newline at end of file diff --git a/data-solutions/gcs-to-bq/scripts/person_details_generator/REQUIREMENTS.txt b/data-solutions/gcs-to-bq/scripts/person_details_generator/REQUIREMENTS.txt new file mode 100644 index 000000000..b98f6609d --- /dev/null +++ b/data-solutions/gcs-to-bq/scripts/person_details_generator/REQUIREMENTS.txt @@ -0,0 +1 @@ +click \ No newline at end of file diff --git a/data-solutions/gcs-to-bq/scripts/person_details_generator/person_details_generator.py b/data-solutions/gcs-to-bq/scripts/person_details_generator/person_details_generator.py new file mode 100644 index 000000000..366954df5 --- /dev/null +++ b/data-solutions/gcs-to-bq/scripts/person_details_generator/person_details_generator.py @@ -0,0 +1,43 @@ +# Copyright 2020 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""`person_details_generator.py` is a Python script to generate random +Person PII (name, surname and age) having as input an array of +names and an array of surnames. +""" + +import click +import logging +import random + + +@click.command() +@click.option("--count", default=100, help="Number of generated names.") +@click.option("--output", default=False, help="Name of the output file. Content will be overwriten. If not defined, standard output will be used.") +@click.option("--first_names", default="Lorenzo,Giacomo,Chiara,Miriam", help="String of Names, comma separated. Default 'Lorenzo,Giacomo,Chiara,Miriam'") +@click.option("--last_names", default="Rossi, Bianchi,Brambilla,Caggioni", help="String of Names, comma separated. Default 'Rossi,Bianchi,Brambilla,Caggioni'") +def main(count, output, first_names, last_names): + generated_names = "".join(random.choice(first_names.split(',')) + "," + + random.choice(last_names.split(',')) + "," + + str(random.randint(1, 100)) + "\n" for _ in range(count))[:-1] + if output: + f = open(output, "w") + f.write(generated_names) + f.close() + else: + print(generated_names) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + main() diff --git a/data-solutions/gcs-to-bq/variables.tf b/data-solutions/gcs-to-bq/variables.tf new file mode 100644 index 000000000..fb53dd77e --- /dev/null +++ b/data-solutions/gcs-to-bq/variables.tf @@ -0,0 +1,76 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +variable "billing_account" { + description = "Billing account id used as default for new projects." + type = string +} + +variable "location" { + description = "The location where resources will be deployed." + type = string + default = "europe" +} + +variable "project_service_name" { + description = "Name for the new Service Project." + type = string +} + +variable "project_kms_name" { + description = "Name for the new KMS Project." + type = string +} + +variable "region" { + description = "The region where resources will be deployed." + type = string + default = "europe-west1" +} + +variable "root_node" { + description = "The resource name of the parent Folder or Organization. Must be of the form folders/folder_id or organizations/org_id." + type = string +} + +variable "vpc_name" { + description = "Name of the VPC created in the Service Project." + type = string + default = "local" +} + +variable "vpc_subnet_name" { + description = "Name of the subnet created in the Service Project." + type = string + default = "subnet" +} + +variable "vpc_ip_cidr_range" { + description = "Ip range used in the subnet deployef in the Service Project." + type = string + default = "10.0.0.0/20" +} + +variable "zone" { + description = "The zone where resources will be deployed." + type = string + default = "europe-west1-b" +} + +variable "ssh_source_ranges" { + description = "IP CIDR ranges that will be allowed to connect via SSH to the onprem instance." + type = list(string) + default = ["0.0.0.0/0"] +} \ No newline at end of file diff --git a/data-solutions/gcs-to-bq/versions.tf b/data-solutions/gcs-to-bq/versions.tf new file mode 100644 index 000000000..057095c0f --- /dev/null +++ b/data-solutions/gcs-to-bq/versions.tf @@ -0,0 +1,17 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +terraform { + required_version = ">= 0.12.6" +} diff --git a/modules/project/service_accounts.tf b/modules/project/service_accounts.tf index d0ccee655..64d5566e0 100644 --- a/modules/project/service_accounts.tf +++ b/modules/project/service_accounts.tf @@ -17,6 +17,8 @@ locals { service_account_cloud_services = "${google_project.project.number}@cloudservices.gserviceaccount.com" service_accounts_default = { + # TODO: Find a better place to store BQ service account + bq = "bq-${google_project.project.number}@bigquery-encryption.iam.gserviceaccount.com" compute = "${google_project.project.number}-compute@developer.gserviceaccount.com" gae = "${google_project.project.project_id}@appspot.gserviceaccount.com" } @@ -26,6 +28,7 @@ locals { compute = "compute-system" container-engine = "container-engine-robot" containerregistry = "containerregistry" + dataflow = "dataflow-service-producer-prod" dataproc = "dataproc-accounts" gae-flex = "gae-api-prod" gcf = "gcf-admin-robot"