New SecOps anonymization pipeline (#2794)

* new secops anonymization pipeline

Co-authored-by: Ludovico Magnocavallo <ludomagno@google.com>
This commit is contained in:
simonebruzzechesse
2025-02-17 19:23:19 +01:00
committed by GitHub
parent 1a4b298cc9
commit b0021cc0f1
16 changed files with 1610 additions and 1 deletions

View File

@@ -128,6 +128,28 @@ jobs:
with:
MODULE: Blueprint Examples
examples-project-templates:
runs-on: ubuntu-latest
needs: setup-tf-providers
steps:
- uses: actions/checkout@v4
- name: Call composite action fabric-tests
uses: ./.github/actions/fabric-tests
with:
PYTHON_VERSION: ${{ env.PYTHON_VERSION }}
TERRAFORM_VERSION: ${{ env.DEFAULT_TERRAFORM_VERSION }}
TERRAFORM_FLAVOUR: ${{ env.DEFAULT_TERRAFORM_FLAVOUR }}
- name: Run tests on documentation examples
run: pytest -vv -n4 --tb=line --junit-xml=test-results-raw.xml -k fast/project-templates/ tests/examples
- name: Create report
uses: ./.github/actions/post-fabric-tests
if: always()
with:
MODULE: Project Templates Examples
examples-modules:
runs-on: ubuntu-latest
needs:

View File

@@ -8,7 +8,6 @@ This repository provides a collection of Terraform blueprints designed to automa
<br clear="left">
## SecOps GKE Forwarder
<a href="./secops-gke-forwarder/" title="SecOps GKE Forwarder"><img src="./secops-gke-forwarder/images/diagram.png" align="left" width="280px"></a> This [blueprint](./secops-gke-forwarder/) is a modular and scalable solution for setting up a SecOps forwarder on Google Kubernetes Engine (GKE). This forwarder is designed to handle multi-tenant data ingestion, ensuring secure and efficient log forwarding to your SecOps SIEM instances.

View File

@@ -0,0 +1,142 @@
# SecOps Anonymization Pipeline
This application template offers a comprehensive and adaptable solution for constructing a SecOps pipeline for exporting raw data from a SecOps tenant, optionally anonymize this data and then import data back in a different SecOps tenant. The pipeline is built on top of various Google Cloud products.
## Prerequisites
The [`project.yaml`](./project.yaml) file describes the project-level configuration needed in terms of API activation and IAM bindings.
If you are deploying this inside a FAST-enabled organization, the file can be lightly edited to match your configuration, and then used directly in the [project factory](../../stages/2-project-factory/).
This Terraform can of course be deployed using any pre-existing project. In that case use the YAML file to determine the configuration you need to set on the project:
- enable the APIs listed under `services`
- grant the permissions listed under `iam` to the principal running Terraform, either machine (service account) or human
### High level architecture
The following diagram illustrates the high-level design of the solution, which can be adapted to specific requirements via variables and/or simple terraform and Python code customizations:
![SecOps Anonymization Pipeline](./images/diagram.png)
The use case is a SecOps deployment composed of 2 tenants (one for production and one for development/testing). There might be the need to export production data from the prod tenant and import them back in DEV (possibly anonymizing it) for rules and/or parser development, that is why this pipeline might be convenient for speeding up the data migration process.
### Pipeline Steps
- **SecOps Export**: Triggered via the corresponding TRIGGER-EXPORT action. Call SecOps Export API to trigger raw logs export on a GCS bucket based on either all the log types or one o more of them for a specific time frame. By default, the export will be for the previous day, otherwise the following parameters can be specified to change the time frame:
* `EXPORT_DATE` date for the export (format %Y-%m-%d)
* `EXPORT_START_DATETIME` and `EXPORT_END_DATETIME` start and end datetime for the export (format %Y-%m-%dT%H:%M:%SZ). This is useful for verbose log source with GB/TB of raw logs ingested on a daily basis
- **Anonymize Data**: Triggered via the corresponding ANONYMIZE-DATA action. Split the exported CSV files to one or more CSV files where the size of each file is less than 60MB (which is the maximum file size supported by DLP). It also renames those files in .log for better handling by the DLP Job. It will then trigger an asynchronous DLP job to anonymize data.
- **Import Data**: Triggered via the corresponding IMPORT-DATA action. Import the exported raw logs (or anonymized ones according to the pipeline configuration) data into the target SecOps tenant leveraging the [Ingestion API](https://cloud.google.com/chronicle/docs/reference/ingestion-api).
### Limitations
- The pipeline can be schedule to run on a daily basis or on-demand, being all asynchronous tasks the anonymization and/or import logs should be triggered after the export is completed successfully
- This pipeline is built for migrating few logs between tenants, lack of multi-threading and limitations on the Cloud Function memory result in the function being able to process at most order of MB of raw logs data (no GB)
- Currently, SecOps export API supports 3 concurrent export requests for each tenant, due to each export request being associated to eithe all log types or a specific log type this result in no more than 3 log types exported within the same export request.
### Deployment
#### Step 0: Cloning the repository
If you want to deploy from your Cloud Shell, click on the image below, sign in
if required and when the prompt appears, click on “confirm”.
[![Open Cloudshell](./images/cloud-shell-button.png)](https://shell.cloud.google.com/cloudshell/editor?cloudshell_git_repo=https%3A%2F%2Fgithub.com%2FGoogleCloudPlatform%2Fcloud-foundation-fabric&cloudshell_workspace=blueprints%2Fthird-party-solutions%2Fwordpress%2Fcloudrun)
Otherwise, in your console of choice:
```bash
git clone REPO_URL
```
Before you deploy the architecture, you will need at least the following
information (for more precise configuration see the Variables section):
* GCP Project ID for SecOps anonymization pipeline deployment
* SecOps tenants information:
* GCP projects of SecOps tenants
* customer ID
* deployment region for both the tenants (must be the same)
* SA credentials with export permissions on source tenant
* SA credentials with ingestion API grants on target tenant
#### Step 2: Prepare the variables
Once you have the required information, head back to your cloned repository.
Make sure youre in the directory of this tutorial (where this README is in).
Configure the Terraform variables in your `terraform.tfvars` file.
See sample TF variables in README.md file as starting point - just
copy them to a new `terraform.tfvars` file and update accordingly.
See the variables documentation below for more information.
#### Step 3: Prepare the DLP template
When anonymization is required (variable `skip_anonymization` is false) a Data Loss prevention API configuration is required for the corresponding DLP job.
By default, the blueprint will provision a very basic DLP inspect and de-identify template for identifying and masking with just a sample value common PII information such as email addresses, person names, IP addresses and so on, more information available on the corresponding TF script in [dlp.tf](./dlp.tf).
In general a more advanced configuration is required for custom info types or a better de-identification template based on multiple anonymized values for the same info type (more email addresses or IP addresses to guarantee differentiation), in that case you can build your own DLP templates and pass them to the anonymization pipeline leveraging the `dlp_config` variable.
#### Step 4: Deploy resources
Initialize your Terraform environment and deploy the resources:
```shell
terraform init
terraform apply
```
#### Step 5: Test solution
Test the solution triggering an export from the Cloud Scheduler page, after few hours (accoding to the size of the export) logs should be available on secops-export bucket. Please check for any issue during export using the corresponding APIs and the export ID.
<!-- BEGIN TFDOC -->
## Variables
| name | description | type | required | default |
|---|---|:---:|:---:|:---:|
| [prefix](variables.tf#L59) | Prefix used for resource names. | <code>string</code> | ✓ | |
| [project_id](variables.tf#L69) | Project id, references existing project if `project_create` is null. | <code>string</code> | ✓ | |
| [secops_config](variables.tf#L86) | SecOps config. | <code title="object&#40;&#123;&#10; region &#61; string&#10; alpha_apis_region &#61; string&#10; source_tenant &#61; object&#40;&#123;&#10; gcp_project &#61; string&#10; export_sa_key_base64 &#61; string&#10; &#125;&#41;&#10; target_tenant &#61; object&#40;&#123;&#10; gcp_project &#61; string&#10; customer_id &#61; string&#10; ingestion_sa_key_base64 &#61; string&#10; &#125;&#41;&#10;&#125;&#41;">object&#40;&#123;&#8230;&#125;&#41;</code> | ✓ | |
| [anonymization_scheduler](variables.tf#L17) | Schedule for triggering export, anonymization and import of data. | <code title="object&#40;&#123;&#10; trigger-export &#61; string&#10; anonymize-data &#61; string&#10; import-data &#61; string&#10;&#125;&#41;">object&#40;&#123;&#8230;&#125;&#41;</code> | | <code title="&#123;&#10; trigger-export &#61; &#34;0 8 29 2 &#42;&#34;&#10; anonymize-data &#61; &#34;0 12 29 2 &#42;&#34;&#10; import-data &#61; &#34;0 13 29 2 &#42;&#34;&#10;&#125;">&#123;&#8230;&#125;</code> |
| [cloud_function_config](variables.tf#L31) | Optional Cloud Function configuration. | <code title="object&#40;&#123;&#10; build_worker_pool_id &#61; optional&#40;string&#41;&#10; build_sa &#61; optional&#40;string&#41;&#10; debug &#61; optional&#40;bool, false&#41;&#10; cpu &#61; optional&#40;number, 1&#41;&#10; memory_mb &#61; optional&#40;number, 2048&#41;&#10; timeout_seconds &#61; optional&#40;number, 3600&#41;&#10; vpc_connector &#61; optional&#40;object&#40;&#123;&#10; name &#61; string&#10; egress_settings &#61; optional&#40;string, &#34;ALL_TRAFFIC&#34;&#41;&#10; &#125;&#41;&#41;&#10;&#125;&#41;">object&#40;&#123;&#8230;&#125;&#41;</code> | | <code>&#123;&#125;</code> |
| [dlp_config](variables.tf#L49) | Data Loss prevention configuration. | <code title="object&#40;&#123;&#10; region &#61; string&#10; deidentify_template_id &#61; string&#10; inspect_template_id &#61; string&#10;&#125;&#41;">object&#40;&#123;&#8230;&#125;&#41;</code> | | <code>null</code> |
| [regions](variables.tf#L74) | Regions: primary for all resources and secondary for clouds scheduler since the latter is available in few regions. | <code title="object&#40;&#123;&#10; primary &#61; string&#10; secondary &#61; string&#10;&#125;&#41;">object&#40;&#123;&#8230;&#125;&#41;</code> | | <code title="&#123;&#10; primary &#61; &#34;europe-west1&#34;&#10; secondary &#61; &#34;europe-west1&#34;&#10;&#125;">&#123;&#8230;&#125;</code> |
| [skip_anonymization](variables.tf#L103) | Whether to skip anonymization step and just import data exported from source tenant. | <code>bool</code> | | <code>false</code> |
## Outputs
| name | description | sensitive |
|---|---|:---:|
| [function_sa](outputs.tf#L17) | Chronicle Anonymization function service account. | |
<!-- END TFDOC -->
## Test
```hcl
module "test" {
source = "./fabric/fast/project-templates/secops-anonymization-pipeline"
secops_config = {
region = "europe"
alpha_apis_region = "eu"
source_tenant = {
gcp_project = "SOURCE_PROJECT_ID"
export_sa_key_base64 = "dGVzdAo="
}
target_tenant = {
gcp_project = "TARGET_PROJECT_ID"
customer_id = "xxx-xxxxxx-xxxxx"
ingestion_sa_key_base64 = "dGVzdAo="
}
}
skip_anonymization = false
prefix = "pre"
project_id = "gcp-project-id"
regions = {
primary = "europe-west1"
secondary = "europe-west1"
}
}
# tftest modules=8 resources=54
```

View File

@@ -0,0 +1,283 @@
/**
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
resource "google_data_loss_prevention_deidentify_template" "dlp_deidentify_template" {
count = var.dlp_config == null ? 1 : 0
parent = "projects/${module.project.project_id}/locations/${var.regions.primary}"
description = "SecOps Anonymization pipeline deidentify template."
display_name = "sample_deidentify_config_template"
deidentify_config {
info_type_transformations {
transformations {
info_types {
name = "PHONE_NUMBER"
}
primitive_transformation {
replace_config {
new_value {
integer_value = 3333333333
}
}
}
}
transformations {
info_types {
name = "AGE"
}
primitive_transformation {
replace_config {
new_value {
integer_value = 10
}
}
}
}
transformations {
info_types {
name = "EMAIL_ADDRESS"
}
primitive_transformation {
replace_config {
new_value {
string_value = "john.doe@fakedomain.com"
}
}
}
}
transformations {
info_types {
name = "LAST_NAME"
}
primitive_transformation {
replace_config {
new_value {
string_value = "doe"
}
}
}
}
transformations {
info_types {
name = "PERSON_NAME"
}
primitive_transformation {
replace_config {
new_value {
string_value = "john"
}
}
}
}
transformations {
info_types {
name = "DATE_OF_BIRTH"
}
primitive_transformation {
replace_config {
new_value {
date_value {
year = 1990
month = 1
day = 1
}
}
}
}
}
transformations {
info_types {
name = "CREDIT_CARD_NUMBER"
}
primitive_transformation {
replace_config {
new_value {
string_value = "1234567812345678"
}
}
}
}
transformations {
info_types {
name = "CREDIT_CARD_TRACK_NUMBER"
}
primitive_transformation {
replace_config {
new_value {
string_value = "1234567812345678"
}
}
}
}
transformations {
info_types {
name = "ETHNIC_GROUP"
}
primitive_transformation {
replace_config {
new_value {
string_value = "None"
}
}
}
}
transformations {
info_types {
name = "GENDER"
}
primitive_transformation {
replace_config {
new_value {
string_value = "Gender"
}
}
}
}
transformations {
info_types {
name = "IBAN_CODE"
}
primitive_transformation {
replace_config {
new_value {
string_value = "2131312312312312"
}
}
}
}
transformations {
info_types {
name = "PASSPORT"
}
primitive_transformation {
replace_config {
new_value {
string_value = "2131312312312312"
}
}
}
}
transformations {
info_types {
name = "STREET_ADDRESS"
}
primitive_transformation {
replace_config {
new_value {
string_value = "street address"
}
}
}
}
transformations {
info_types {
name = "SWIFT_CODE"
}
primitive_transformation {
replace_config {
new_value {
string_value = "2131312312312312"
}
}
}
}
transformations {
info_types {
name = "VEHICLE_IDENTIFICATION_NUMBER"
}
primitive_transformation {
replace_config {
new_value {
string_value = "2131312312312312"
}
}
}
}
}
}
}
resource "google_data_loss_prevention_inspect_template" "dlp_inspect_template" {
count = var.dlp_config == null ? 1 : 0
parent = "projects/${module.project.project_id}/locations/${var.regions.primary}"
description = "Data Loss prevention sample inspect config."
display_name = "sample_inspect_config_template"
inspect_config {
info_types {
name = "ADVERTISING_ID"
}
info_types {
name = "AGE"
}
info_types {
name = "CREDIT_CARD_NUMBER"
}
info_types {
name = "CREDIT_CARD_TRACK_NUMBER"
}
info_types {
name = "EMAIL_ADDRESS"
}
info_types {
name = "DATE_OF_BIRTH"
}
info_types {
name = "ETHNIC_GROUP"
}
info_types {
name = "GENDER"
}
info_types {
name = "IBAN_CODE"
}
info_types {
name = "PASSPORT"
}
info_types {
name = "PERSON_NAME"
}
info_types {
name = "FIRST_NAME"
}
info_types {
name = "LAST_NAME"
}
info_types {
name = "PHONE_NUMBER"
}
info_types {
name = "STREET_ADDRESS"
}
info_types {
name = "SWIFT_CODE"
}
info_types {
name = "VEHICLE_IDENTIFICATION_NUMBER"
}
min_likelihood = "POSSIBLE"
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 10 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 90 KiB

View File

@@ -0,0 +1,284 @@
/**
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
locals {
dlp_config = var.dlp_config == null ? {
region = var.regions.primary
deidentify_template_id = google_data_loss_prevention_deidentify_template.dlp_deidentify_template.0.id
inspect_template_id = google_data_loss_prevention_inspect_template.dlp_inspect_template.0.id
} : var.dlp_config
secops_anonymization_export_secret_id = "secops-export-secret-json"
secops_anonymization_import_secret_id = "secops-import-secret-json"
}
module "project" {
source = "../../../modules/project"
name = var.project_id
services = concat([
"secretmanager.googleapis.com",
"run.googleapis.com",
"cloudscheduler.googleapis.com",
"cloudbuild.googleapis.com",
"cloudresourcemanager.googleapis.com",
"vpcaccess.googleapis.com",
"dlp.googleapis.com",
"vpcaccess.googleapis.com"
])
iam = {
"roles/dlp.reader" = [module.function.service_account_iam_email]
"roles/dlp.jobsEditor" = [module.function.service_account_iam_email]
"roles/serviceusage.serviceUsageConsumer" = [module.function.service_account_iam_email]
}
iam_bindings_additive = {
function-log-writer = {
member = module.function.service_account_iam_email
role = "roles/logging.logWriter"
}
}
}
module "secrets" {
source = "../../../modules/secret-manager"
project_id = module.project.project_id
secrets = {
(local.secops_anonymization_export_secret_id) = {
locations = [var.regions.primary]
}
(local.secops_anonymization_import_secret_id) = {
locations = [var.regions.primary]
}
}
versions = {
(local.secops_anonymization_export_secret_id) = {
latest = {
enabled = true,
data = base64decode(var.secops_config.source_tenant.export_sa_key_base64)
}
}
(local.secops_anonymization_import_secret_id) = {
latest = {
enabled = true,
data = base64decode(var.secops_config.target_tenant.ingestion_sa_key_base64)
}
}
}
iam = {
(local.secops_anonymization_export_secret_id) = {
"roles/secretmanager.secretAccessor" = [
"serviceAccount:${module.function.service_account_email}"
]
}
(local.secops_anonymization_import_secret_id) = {
"roles/secretmanager.secretAccessor" = [
"serviceAccount:${module.function.service_account_email}"
]
}
}
}
module "export-bucket" {
source = "../../../modules/gcs"
project_id = module.project.project_id
name = "secops-export"
prefix = var.prefix
location = var.regions.primary
storage_class = "REGIONAL"
versioning = true
lifecycle_rules = {
delete = {
action = {
type = "Delete"
}
condition = {
age = 7
}
}
}
iam = {
"roles/storage.legacyBucketReader" = [
"user:malachite-data-export-batch@prod.google.com",
module.function.service_account_iam_email
]
"roles/storage.objectAdmin" = [
"user:malachite-data-export-batch@prod.google.com",
module.function.service_account_iam_email
]
"roles/storage.objectViewer" = [module.function.service_account_iam_email]
}
}
module "anonymized-bucket" {
count = var.skip_anonymization ? 0 : 1
source = "../../../modules/gcs"
project_id = module.project.project_id
name = "anonymized-data"
prefix = var.prefix
location = var.regions.primary
storage_class = "REGIONAL"
versioning = true
lifecycle_rules = {
delete = {
action = {
type = "Delete"
}
condition = {
age = 7
}
}
}
iam_bindings_additive = {
storage-legacy-reader-function = {
role = "roles/storage.legacyBucketReader"
member = module.function.service_account_iam_email
}
storage-legacy-reader-dlp = {
role = "roles/storage.legacyBucketReader"
member = "serviceAccount:service-${module.project.number}@dlp-api.iam.gserviceaccount.com"
}
storage-object-admin-dlp = {
role = "roles/storage.objectAdmin"
member = "serviceAccount:service-${module.project.number}@dlp-api.iam.gserviceaccount.com"
}
storage-object-admin-function = {
role = "roles/storage.objectAdmin"
member = module.function.service_account_iam_email
}
}
}
module "function" {
source = "../../../modules/cloud-function-v2"
project_id = module.project.project_id
region = var.regions.primary
prefix = var.prefix
name = "secops-anonymization"
bucket_name = "${var.project_id}-anonymization"
service_account_create = true
ingress_settings = "ALLOW_INTERNAL_AND_GCLB"
build_worker_pool = var.cloud_function_config.build_worker_pool_id
build_service_account = var.cloud_function_config.build_sa != null ? var.cloud_function_config.build_sa : module.cloudbuild-sa.0.id
bucket_config = {
lifecycle_delete_age_days = 1
}
bundle_config = {
path = "${path.module}/source"
}
environment_variables = merge({
GCP_PROJECT = module.project.project_id
SKIP_ANONYMIZATION = var.skip_anonymization
SECOPS_SOURCE_SA_KEY_SECRET_PATH = "/app/secrets/source/latest"
SECOPS_TARGET_SA_KEY_SECRET_PATH = "/app/secrets/target/latest"
SECOPS_TARGET_CUSTOMER_ID = var.secops_config.target_tenant.customer_id
SECOPS_REGION = var.secops_config.region
SECOPS_ALPHA_APIS_REGION = var.secops_config.alpha_apis_region
SECOPS_EXPORT_BUCKET = module.export-bucket.name
LOG_EXECUTION_ID = "true"
}, var.skip_anonymization ? {} : {
SECOPS_OUTPUT_BUCKET = module.anonymized-bucket.0.name
DLP_DEIDENTIFY_TEMPLATE_ID = local.dlp_config.deidentify_template_id
DLP_INSPECT_TEMPLATE_ID = local.dlp_config.inspect_template_id
DLP_REGION = local.dlp_config.region
})
function_config = {
cpu = var.cloud_function_config.cpu
memory_mb = var.cloud_function_config.memory_mb
timeout_seconds = var.cloud_function_config.timeout_seconds
}
iam = {
"roles/run.invoker" = [
"serviceAccount:${module.scheduler-sa.email}"
]
}
secrets = {
"/app/secrets/source" = {
is_volume = true
project_id = module.project.number
secret = local.secops_anonymization_export_secret_id
versions = [
"latest:latest"
]
}
"/app/secrets/target" = {
is_volume = true
project_id = module.project.number
secret = local.secops_anonymization_import_secret_id
versions = [
"latest:latest"
]
}
}
vpc_connector = (
var.cloud_function_config.vpc_connector == null
? null
: {
create = false
name = var.cloud_function_config.vpc_connector.name
egress_settings = var.cloud_function_config.vpc_connector.egress_settings
}
)
}
module "cloudbuild-sa" {
count = var.cloud_function_config.build_sa == null ? 1 : 0
source = "../../../modules/iam-service-account"
project_id = module.project.project_id
name = "cloudbuild"
iam_project_roles = {
(module.project.project_id) = [
"roles/logging.logWriter",
"roles/monitoring.metricWriter",
"roles/artifactregistry.writer",
"roles/storage.objectAdmin"
]
}
}
module "scheduler-sa" {
source = "../../../modules/iam-service-account"
project_id = module.project.project_id
name = "secops-anonymization-scheduler"
}
resource "google_cloud_scheduler_job" "anonymization_jobs" {
for_each = { for k, v in var.anonymization_scheduler : k => v if !(var.skip_anonymization && k == "anonymize-data") }
project = module.project.project_id
name = "secops_${each.key}"
description = "Trigger SecOps anonymization function."
schedule = each.value
time_zone = "Etc/UTC"
attempt_deadline = "320s"
region = var.regions.secondary
retry_config {
retry_count = 1
}
http_target {
http_method = "POST"
uri = module.function.uri
body = base64encode(jsonencode({
ACTION = upper(each.key)
}))
headers = { "Content-Type" : "application/json" }
oidc_token {
service_account_email = module.scheduler-sa.email
audience = module.function.uri
}
}
lifecycle {
ignore_changes = [
http_target
]
}
}

View File

@@ -0,0 +1,20 @@
/**
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
output "function_sa" {
description = "Chronicle Anonymization function service account."
value = try(module.function.service_account_email, null)
}

View File

@@ -0,0 +1,52 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# yaml-language-server: $schema=../../stages/2-project-factory/schemas/project.schema.json
# TODO: edit and uncomment the following line to create the project in a folder
# parent: shared
name: secops-anonym-0
services:
- "secretmanager.googleapis.com"
- "run.googleapis.com"
- "cloudscheduler.googleapis.com"
- "cloudbuild.googleapis.com"
- "cloudresourcemanager.googleapis.com"
- "vpcaccess.googleapis.com"
- "dlp.googleapis.com"
- "vpcaccess.googleapis.com"
automation:
# TODO: edit the automation project and optionally edit resource names
project: pf-automation-0
service_accounts:
rw:
description: Read/write automation service account for apt registries.
buckets:
tf-state:
description: Terraform state bucket for apt registries.
iam:
roles/storage.objectCreator:
- rw
roles/storage.objectViewer:
- rw
iam:
roles/viewer:
- rw
roles/owner:
- rw
# TODO: add instance service accounts that need access to the registries
# roles/artifactregistry.writer:
# - serviceAccount:foo@bar

View File

@@ -0,0 +1,31 @@
{
"actions": [
{
"deidentify": {
"file_types_to_transform": [
"TEXT_FILE",
"IMAGE",
"CSV",
"TSV"
],
"transformation_details_storage_config": {},
"transformation_config": {
"deidentify_template": "{{ deidentify_template_id }}",
"structured_deidentify_template": "",
"image_redact_template": ""
},
"cloud_storage_output": "gs://{{output_bucket}}/"
}
}
],
"inspect_template_name": "{{ inspect_template_id }}",
"storage_config": {
"cloud_storage_options": {
"file_set": {
"url": "gs://{{ export_bucket }}/{{export_id}}/**"
},
"file_types": ["TEXT_FILE", "CSV", "TSV", "EXCEL", "AVRO"],
"files_limit_percent": 100
}
}
}

View File

@@ -0,0 +1,301 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import binascii
import json
import os
import click
import logging
import sys
import google.cloud.logging
from google.auth.transport.requests import AuthorizedSession
from google.oauth2 import service_account
from shared.secops import SecOpsUtils
from jinja2 import Template
from shared import utils
from google.cloud import dlp_v2
from google.cloud import storage
from datetime import date, timedelta
client = google.cloud.logging.Client()
client.setup_logging()
LOGGER = logging.getLogger('secops')
logging.basicConfig(
level=logging.DEBUG if os.environ.get('DEBUG') else logging.INFO,
format='[%(levelname)-8s] - %(asctime)s - %(message)s')
logging.root.setLevel(logging.DEBUG)
SCOPES = [
"https://www.googleapis.com/auth/chronicle-backstory",
"https://www.googleapis.com/auth/malachite-ingestion"
]
# Threshold value in bytes for ingesting the logs to the SecOps.
# SecOps Ingestion API allows the maximum 1MB of payload and we kept 0.5MB as a buffer.
SIZE_THRESHOLD_BYTES = 950000
SECOPS_REGION = os.environ.get("SECOPS_REGION")
SECOPS_ALPHA_APIS_REGION = os.environ.get("SECOPS_ALPHA_APIS_REGION")
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT")
SECOPS_EXPORT_BUCKET = os.environ.get("SECOPS_EXPORT_BUCKET")
SECOPS_OUTPUT_BUCKET = os.environ.get("SECOPS_OUTPUT_BUCKET")
SECOPS_SOURCE_SA_KEY_SECRET_PATH = os.environ.get(
"SECOPS_SOURCE_SA_KEY_SECRET_PATH")
SECOPS_TARGET_SA_KEY_SECRET_PATH = os.environ.get(
"SECOPS_TARGET_SA_KEY_SECRET_PATH")
SECOPS_TARGET_CUSTOMER_ID = os.environ.get("SECOPS_TARGET_CUSTOMER_ID")
SKIP_ANONYMIZATION = False if (os.environ.get(
"SKIP_ANONYMIZATION", "false").lower() == "false") else True
DLP_DEIDENTIFY_TEMPLATE_ID = os.environ.get("DLP_DEIDENTIFY_TEMPLATE_ID")
DLP_INSPECT_TEMPLATE_ID = os.environ.get("DLP_INSPECT_TEMPLATE_ID")
DLP_REGION = os.environ.get("DLP_REGION")
INGESTION_API_URL = F"https://{SECOPS_REGION}-malachiteingestion-pa.googleapis.com"
URI_UNSTRUCTURED = f"{INGESTION_API_URL}/v2/unstructuredlogentries:batchCreate"
def import_logs(export_date):
storage_client = storage.Client()
BUCKET = SECOPS_OUTPUT_BUCKET if not SKIP_ANONYMIZATION else SECOPS_EXPORT_BUCKET
bucket = storage_client.bucket(BUCKET)
export_ids = utils.get_secops_export_folders_for_date(BUCKET, export_date)
backstory_credentials = service_account.Credentials.from_service_account_file(
SECOPS_TARGET_SA_KEY_SECRET_PATH, scopes=SCOPES)
authed_session = AuthorizedSession(backstory_credentials)
for export_id in export_ids:
for folder in utils.list_anonymized_folders(BUCKET, export_id):
log_type = folder.split("-")[0]
for log_file in utils.list_log_files(BUCKET, f"{export_id}/{folder}"):
blob = bucket.blob(log_file) # Directly get the blob object
with blob.open("r") as f:
cur_entries = []
body = {
"customer_id": SECOPS_TARGET_CUSTOMER_ID,
"log_type": log_type,
"entries": cur_entries
}
size_of_empty_payload = sys.getsizeof(json.dumps(body))
for line in f:
next_entries = cur_entries + [{"logText": line.rstrip('\n')}]
if size_of_empty_payload + sys.getsizeof(
json.dumps(next_entries)) >= SIZE_THRESHOLD_BYTES:
body["entries"] = cur_entries
LOGGER.debug(body)
LOGGER.debug(sys.getsizeof(json.dumps(body)))
response = authed_session.post(URI_UNSTRUCTURED, json=body)
LOGGER.debug(response)
cur_entries = [{"logText": line.rstrip('\n')}]
else:
cur_entries.append({"logText": line.rstrip('\n')})
# Send any remaining entries
if cur_entries:
body["entries"] = cur_entries
LOGGER.debug(sys.getsizeof(json.dumps(body)))
LOGGER.debug(body)
response = authed_session.post(URI_UNSTRUCTURED, json=body)
LOGGER.debug(response)
# delete both export and anonymized buckets after ingesting logs
utils.delete_folder(BUCKET, export_id)
if not SKIP_ANONYMIZATION:
utils.delete_folder(SECOPS_EXPORT_BUCKET, export_id)
LOGGER.info("Finished importing data.")
def trigger_export(export_date: str, export_start_datetime: str,
export_end_datetime: str, log_types: list):
"""
Trigger secops export using Data Export API for a specific date
:param secops_source_sa_key_secret_path:
:param secops_export_bucket:
:param secops_target_project_id:
:param log_types:
:param export_end_datetime:
:param export_start_datetime:
:param export_date:
:param date: datetime (as string) with DD-MM-YYYY format
:return:
"""
backstory_credentials = service_account.Credentials.from_service_account_file(
SECOPS_SOURCE_SA_KEY_SECRET_PATH, scopes=SCOPES)
secops_utils = SecOpsUtils(backstory_credentials)
export_ids = []
try:
if log_types is None:
export_response = secops_utils.create_data_export(
project=GCP_PROJECT_ID, export_date=export_date,
export_start_datetime=export_start_datetime,
export_end_datetime=export_end_datetime)
LOGGER.info(export_response)
export_ids.append(export_response["dataExportId"])
LOGGER.info(
f"Triggered export with ID: {export_response['dataExportId']}")
else:
for log_type in log_types:
export_response = secops_utils.create_data_export(
project=GCP_PROJECT_ID, export_date=export_date,
export_start_datetime=export_start_datetime,
export_end_datetime=export_end_datetime, log_type=log_type)
LOGGER.info(export_response)
export_ids.append(export_response["dataExportId"])
LOGGER.info(
f"Triggered export with ID: {export_response['dataExportId']}")
except Exception as e:
LOGGER.error(f"Error during export': {e}")
raise SystemExit(f'Error during secops export: {e}')
LOGGER.info(f"Export IDs: {export_response['dataExportId']}")
return export_ids
def anonymize_data(export_date):
"""
Trigger DLP Job and setup secops feeds to ingest data from output bucket.
:param export_date: date for which data should be anonymized
:return:
"""
backstory_credentials = service_account.Credentials.from_service_account_file(
SECOPS_SOURCE_SA_KEY_SECRET_PATH, scopes=SCOPES)
secops_utils = SecOpsUtils(backstory_credentials)
export_ids = utils.get_secops_export_folders_for_date(SECOPS_EXPORT_BUCKET,
export_date=export_date)
export_finished = True
for export_id in export_ids:
export = secops_utils.get_data_export(export_id=export_id)
export_state = export["dataExportStatus"]["stage"]
LOGGER.info(f"Export status: {export_state}.")
if export_state != "FINISHED_SUCCESS":
export_finished = False
if export_finished:
for export_id in export_ids:
utils.split_and_rename_csv_to_log_files(SECOPS_EXPORT_BUCKET, export_id)
with open("dlp_job_template.json.tpl", "r") as template_file:
content = template_file.read()
template = Template(content)
rendered_str = template.render({
"export_bucket": SECOPS_EXPORT_BUCKET,
"output_bucket": SECOPS_OUTPUT_BUCKET,
"deidentify_template_id": DLP_DEIDENTIFY_TEMPLATE_ID,
"inspect_template_id": DLP_INSPECT_TEMPLATE_ID,
"export_id": export_id
})
LOGGER.info(f"Filled template: {rendered_str}")
dlp_job = json.loads(rendered_str)
LOGGER.info(dlp_job)
job_request = {
"parent": f"projects/{GCP_PROJECT_ID}/locations/{DLP_REGION}",
"inspect_job": dlp_job
}
dlp_client = dlp_v2.DlpServiceClient(
client_options={'quota_project_id': GCP_PROJECT_ID})
response = dlp_client.create_dlp_job(request=job_request)
LOGGER.info(response)
else:
LOGGER.error("Export is not finished yet, please try again later.")
LOGGER.info("Triggered all DLP jobs successfully.")
def main(request):
"""
Entry point for Cloud Function triggered by HTTP request.
:param request: payload of HTTP request triggering cloud function
:return:
"""
debug = os.environ.get('DEBUG')
logging.basicConfig(level=logging.INFO)
LOGGER.info('processing http payload')
try:
payload = json.loads(request.data)
except (binascii.Error, json.JSONDecodeError) as e:
raise SystemExit(f'Invalid payload: {e.args[0]}.')
if "EXPORT_DATE" in payload:
export_date = payload.get('EXPORT_DATE')
else:
export_date = date.today().strftime("%Y-%m-%d")
action = payload.get('ACTION')
export_start_datetime = payload.get('EXPORT_START_DATETIME', None)
export_end_datetime = payload.get('EXPORT_END_DATETIME', None)
log_types = payload.get('LOG_TYPES', None)
match action:
case "TRIGGER-EXPORT":
trigger_export(export_date=export_date,
export_start_datetime=export_start_datetime,
export_end_datetime=export_end_datetime,
log_types=log_types)
case "ANONYMIZE-DATA":
anonymize_data(export_date=export_date)
case "IMPORT-DATA":
import_logs(export_date=export_date)
case _:
return "Action must be either 'TRIGGER-EXPORT', 'ANONYMIZE-DATA' or 'IMPORT-DATA'"
return "Success."
@click.command()
@click.option('--export-date', '-d', required=False, type=str,
help='Date for secops export and anonymization.')
@click.option('--export-start-datetime', '-d', required=False, type=str,
help='Start datetime for secops export and anonymization.')
@click.option('--export-end-datetime', '-d', required=False, type=str,
help='End datetime for secops export and anonymization.')
@click.option('--log-type', type=str, multiple=True)
@click.option(
'--action',
type=click.Choice(['TRIGGER-EXPORT', 'ANONYMIZE-DATA',
'IMPORT-DATA']), required=True)
@click.option('--debug', is_flag=True, default=False,
help='Turn on debug logging.')
def main_cli(export_date, export_start_datetime, export_end_datetime,
log_type: list, action: str, debug=False):
"""
CLI entry point.
:param date: date for secops export and anonymization
:param debug: whether to enable debug logs
:return:
"""
logging.basicConfig(level=logging.INFO if not debug else logging.DEBUG)
match action:
case "TRIGGER-EXPORT":
trigger_export(export_date=export_date,
export_start_datetime=export_start_datetime,
export_end_datetime=export_end_datetime,
log_types=log_type)
case "ANONYMIZE-DATA":
anonymize_data(export_date=export_date)
case "IMPORT-DATA":
import_logs(export_date=export_date)
case _:
return "Action must be either 'TRIGGER-EXPORT', 'ANONYMIZE-DATA' or 'IMPORT-DATA'"
return "Success."
if __name__ == '__main__':
main_cli()

View File

@@ -0,0 +1,25 @@
# coding=utf-8
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
google-cloud-pubsub
requests==2.27.1
jwt==1.3.1
google-auth
google-cloud-secret-manager
jinja2
google-cloud-storage
click==8.1.3
google-cloud-dlp
google-cloud-logging

View File

@@ -0,0 +1,14 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Shared module for SecOps Anonymization Pipeline."""

View File

@@ -0,0 +1,111 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import google.auth
import logging
import requests
import os
from . import utils
from google.auth.transport.requests import AuthorizedSession
"""SecOps utility functions."""
LOGGER = logging.getLogger("secops")
SECOPS_REGION = os.environ.get("SECOPS_REGION")
SECOPS_EXPORT_BUCKET = os.environ.get("SECOPS_EXPORT_BUCKET")
SECOPS_OUTPUT_BUCKET = os.environ.get("SECOPS_OUTPUT_BUCKET")
class SecOpsUtils:
def __init__(self, credentials=None):
self.BACKSTORY_API_URL = f"https://{SECOPS_REGION}-backstory.googleapis.com/v1/tools/dataexport"
self.INGESTION_API_URL = F"https://{SECOPS_REGION}-malachiteingestion-pa.googleapis.com"
self.HTTP = AuthorizedSession(credentials=credentials if credentials
is not None else google.auth.default()[0])
def create_data_export(self, project, export_date, export_start_datetime,
export_end_datetime, log_type: str = None):
"""
Trigger Chronicle data export for the given date and log types.
:param export_start_datetime:
:param export_date:
:param project:
:param session: auth session for API call
:param date: date for which data will be exported
:return: Chronicle Data export response.
"""
if export_start_datetime and export_end_datetime:
start_time, end_time = export_start_datetime, export_end_datetime
else:
start_time, end_time = utils.format_date_time_range(
date_input=export_date)
gcs_bucket = f"projects/{project}/buckets/{SECOPS_EXPORT_BUCKET}"
body = {
"startTime": start_time,
"endTime": end_time,
"logType": "ALL_TYPES" if log_type is None else log_type,
"gcsBucket": gcs_bucket,
}
response = self.HTTP.post(self.BACKSTORY_API_URL, json=body)
response.raise_for_status()
print(f"Data export created successfully.")
return response.json()
def get_data_export(self, export_id: str) -> str:
"""
Get Chronicle data export information.
:param export_id: ID of Chronicle export to get information from
:return: Data Export status
:raises requests.exceptions.HTTPError: If the API request fails.
"""
try:
response = self.HTTP.get(f"{self.BACKSTORY_API_URL}/{export_id}")
response.raise_for_status(
) # Raise HTTPError for bad responses (4xx or 5xx)
print(
f"Data export for '{export_id}' retrieved, content is {response.json()}"
)
return response.json()
except requests.exceptions.HTTPError as e:
print(f"Error fetching data export '{export_id}': {e}")
# You can choose to handle the error in a more specific way here,
# like retrying the request, logging the error, or raising a custom exception.
raise # Re-raise the exception to be handled by the caller
def list_log_types(self, date):
start_date, end_date = utils.format_date_time_range(date)
params = {
"startTime": start_date,
"endTime": end_date,
}
response = self.HTTP.get(f"{self.BACKSTORY_API_URL}/listavailablelogtypes")
response.raise_for_status()
if response.status_code == 200:
logging.info(f"Log types for date: {date} is {response.json()}")
log_types = response.json()["availableLogTypes"]
else:
error_message = response.json().get("error",
{}).get("message", "Unknown error")
status_code = response.status_code
logging.error(
f"Error listing log types on {date} (Status code: {status_code}) Error message: {error_message}"
)
raise Exception("Error while listing log types.")
return log_types

View File

@@ -0,0 +1,218 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import logging
import math
import csv
from google.cloud import secretmanager, storage, exceptions
from datetime import datetime, timedelta, timezone, time
LOGGER = logging.getLogger('secops')
"""Utility functions required for ingestion scripts."""
MAX_FILE_SIZE = 61440000 # Max size supported by DLP
def get_value_from_secret_manager(resource_path: str) -> str:
"""Retrieve the value of the secret from the Google Cloud Secret Manager.
Args:
resource_path (str): Path of the secret with version included. Ex.:
"projects/<project_id>/secrets/<secret_name>/versions/1",
"projects/<project_id>/secrets/<secret_name>/versions/latest"
Returns:
str: Payload for secret.
"""
client = secretmanager.SecretManagerServiceClient()
response = client.access_secret_version(name=resource_path)
return response.payload.data.decode("UTF-8")
def format_date_time_range(date_input):
"""
Creates datetime objects for the beginning and end of the input date
and formats them.
Args:
date_input: A string representing the date (e.g., "2024-06-10").
Returns:
A tuple containing two formatted strings:
- Start of day: "YYYY-MM-DDTHH:MM:SSZ"
- End of day: "YYYY-MM-DDTHH:MM:SSZ"
"""
date_obj = datetime.strptime(date_input, "%Y-%m-%d")
start_of_day = datetime.combine(date_obj.date(), time.min,
tzinfo=timezone.utc)
end_of_day = start_of_day + timedelta(days=1, seconds=-1)
# Format both datetime objects
formatted_start = start_of_day.strftime("%Y-%m-%dT%H:%M:%SZ")
formatted_end = end_of_day.strftime("%Y-%m-%dT%H:%M:%SZ")
return formatted_start, formatted_end
def list_anonymized_folders(bucket_name, folder_name):
"""Lists all folders (prefixes) within a specified folder in a GCS bucket.
Args:
bucket_name: Name of the GCS bucket.
folder_name: Name of the folder (prefix) to search within.
Returns:
A list of folder names (prefixes) found.
"""
folders = []
storage_client = storage.Client()
for blob in storage_client.list_blobs(bucket_name, prefix=f"{folder_name}/"):
folder_name = blob.name.split('/')[1]
if not folder_name in folders:
folders.append(folder_name)
return folders
def delete_folder(bucket_name, folder_name):
"""Deletes a folder from a Google Cloud Storage bucket.
Args:
bucket_name: The name of the bucket.
folder_name: The name of the folder to delete.
"""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
# List all blobs with the given prefix (folder name)
blobs = list(bucket.list_blobs(prefix=folder_name))
# Delete the blobs in parallel
bucket.delete_blobs(blobs)
print(f"Folder {folder_name} deleted from bucket {bucket_name}")
def list_log_files(bucket_name, folder_name):
"""Lists all folders (prefixes) within a specified folder in a GCS bucket.
Args:
bucket_name: Name of the GCS bucket.
folder_name: Name of the folder (prefix) to search within.
Returns:
A list of folder names (prefixes) found.
"""
storage_client = storage.Client()
csv_files = []
for blob in storage_client.list_blobs(bucket_name, prefix=f"{folder_name}/"):
if blob.name.endswith(".log") or blob.name.endswith(".csv"):
csv_files.append(blob.name)
return csv_files
def split_csv(bucket_name, blob_name, file_size):
"""Splits a CSV file into smaller chunks and uploads them back to the bucket.
Args:
bucket_name: The name of the GCS bucket.
blob_name: The name of the CSV blob in the bucket.
max_file_size: The maximum size of each chunk in bytes.
"""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
# Download the blob to a local file
temp_file = '/tmp/temp.csv'
blob.download_to_filename(temp_file)
file = open(temp_file, encoding="utf8")
numline = sum(1 for row in csv.reader(file))
# Read the CSV file in chunks
chunk_number = math.ceil(numline * MAX_FILE_SIZE / file_size)
index = 0
lines = []
with open(temp_file, 'r', encoding="utf8") as f_in:
reader = csv.reader(f_in, delimiter='\n')
for line in reader:
lines.append(line[0] + "\n")
if len(lines) == chunk_number:
chunk_filename = f'{blob_name.split(".")[0]}_{index}.log'
chunk_path = f'/tmp/temp-{index}.csv'
with open(chunk_path, 'w') as fout:
fout.writelines(lines)
chunk_blob = bucket.blob(f'{chunk_filename}')
chunk_blob.upload_from_filename(chunk_path)
print(f'Uploaded {chunk_filename} to {bucket_name}')
os.remove(chunk_path) # Remove the local chunk file
index += 1
lines = []
chunk_filename = f'{blob_name.split(".")[0]}_{index}.log'
chunk_path = f'/tmp/temp-{index}.csv'
with open(chunk_path, 'w') as fout:
fout.writelines(lines)
chunk_blob = bucket.blob(f'{chunk_filename}')
chunk_blob.upload_from_filename(chunk_path)
print(f'Uploaded {chunk_filename} to {bucket_name}')
os.remove(chunk_path) # Remove the local chunk file
index += 1
lines = []
# Remove the temporary file
os.remove(temp_file)
# remove old log file
blob = bucket.blob(blob_name)
blob.delete()
def split_and_rename_csv_to_log_files(bucket_name, folder_name):
"""Renames all .csv files to .log files within a GCS bucket folder (and subfolders).
Args:
bucket_name (str): Name of the GCS bucket.
folder_prefix (str): Prefix of the folder within the bucket to process.
"""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blobs = storage_client.list_blobs(bucket, prefix=f"{folder_name}/")
for blob in blobs:
if blob.name.endswith(".csv") and blob.size >= MAX_FILE_SIZE:
split_csv(bucket_name, blob.name, blob.size)
elif blob.name.endswith(".csv"):
new_name = blob.name.replace(".csv", ".log")
bucket.rename_blob(blob, new_name)
def get_secops_export_folders_for_date(bucket_name, export_date):
storage_client = storage.Client()
export_ids = []
for blob in storage_client.list_blobs(bucket_name):
if blob.time_created.strftime(
"%Y-%m-%d") == export_date and blob.name.split(
'/')[0] not in export_ids:
export_ids.append(blob.name.split('/')[0])
return export_ids

View File

@@ -0,0 +1,107 @@
/**
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
variable "anonymization_scheduler" {
description = "Schedule for triggering export, anonymization and import of data."
type = object({
trigger-export = string
anonymize-data = string
import-data = string
})
default = {
trigger-export = "0 8 29 2 *"
anonymize-data = "0 12 29 2 *"
import-data = "0 13 29 2 *"
}
}
variable "cloud_function_config" {
description = "Optional Cloud Function configuration."
type = object({
build_worker_pool_id = optional(string)
build_sa = optional(string)
debug = optional(bool, false)
cpu = optional(number, 1)
memory_mb = optional(number, 2048)
timeout_seconds = optional(number, 3600)
vpc_connector = optional(object({
name = string
egress_settings = optional(string, "ALL_TRAFFIC")
}))
})
default = {}
nullable = false
}
variable "dlp_config" {
description = "Data Loss prevention configuration."
type = object({
region = string
deidentify_template_id = string
inspect_template_id = string
})
default = null
}
variable "prefix" {
description = "Prefix used for resource names."
type = string
nullable = false
validation {
condition = var.prefix != ""
error_message = "Prefix cannot be empty."
}
}
variable "project_id" {
description = "Project id, references existing project if `project_create` is null."
type = string
}
variable "regions" {
description = "Regions: primary for all resources and secondary for clouds scheduler since the latter is available in few regions."
type = object({
primary = string
secondary = string
})
default = {
primary = "europe-west1"
secondary = "europe-west1"
}
}
variable "secops_config" {
description = "SecOps config."
type = object({
region = string
alpha_apis_region = string
source_tenant = object({
gcp_project = string
export_sa_key_base64 = string
})
target_tenant = object({
gcp_project = string
customer_id = string
ingestion_sa_key_base64 = string
})
})
}
variable "skip_anonymization" {
description = "Whether to skip anonymization step and just import data exported from source tenant."
type = bool
default = false
}