diff --git a/fast/project-templates/secops-anonymization-pipeline/README.md b/fast/project-templates/secops-anonymization-pipeline/README.md
index 92c0ca02b..8be973103 100644
--- a/fast/project-templates/secops-anonymization-pipeline/README.md
+++ b/fast/project-templates/secops-anonymization-pipeline/README.md
@@ -22,13 +22,15 @@ The following diagram illustrates the high-level design of the solution, which c
The use case is a SecOps deployment composed of 2 tenants (one for production and one for development/testing). There might be the need to export production data from the prod tenant and import them back in DEV (possibly anonymizing it) for rules and/or parser development, that is why this pipeline might be convenient for speeding up the data migration process.
+The solution is based on a custom Python script responsible for implementing the aforementioned logic. The script leverages the new [SecOps API Wrapper](https://github.com/google/secops-wrapper) available also in [PyPi](https://pypi.org/project/secops/).
+
### Pipeline Steps
-- **SecOps Export**: Triggered via the corresponding TRIGGER-EXPORT action. Call SecOps Export API to trigger raw logs export on a GCS bucket based on either all the log types or one o more of them for a specific time frame. By default, the export will be for the previous day, otherwise the following parameters can be specified to change the time frame:
+- **SecOps Export**: Triggered via the corresponding TRIGGER-EXPORT action. Call [SecOps Export API](https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.dataExports) to trigger raw logs export on a GCS bucket based on either all the log types or one o more of them for a specific time frame. By default, the export will be for the previous day, otherwise the following parameters can be specified to change the time frame:
* `EXPORT_DATE` date for the export (format %Y-%m-%d)
* `EXPORT_START_DATETIME` and `EXPORT_END_DATETIME` start and end datetime for the export (format %Y-%m-%dT%H:%M:%SZ). This is useful for verbose log source with GB/TB of raw logs ingested on a daily basis
- **Anonymize Data**: Triggered via the corresponding ANONYMIZE-DATA action. Split the exported CSV files to one or more CSV files where the size of each file is less than 60MB (which is the maximum file size supported by DLP). It also renames those files in .log for better handling by the DLP Job. It will then trigger an asynchronous DLP job to anonymize data.
-- **Import Data**: Triggered via the corresponding IMPORT-DATA action. Import the exported raw logs (or anonymized ones according to the pipeline configuration) data into the target SecOps tenant leveraging the [Ingestion API](https://cloud.google.com/chronicle/docs/reference/ingestion-api).
+- **Import Data**: Triggered via the corresponding IMPORT-DATA action. Import the exported raw logs (or anonymized ones according to the pipeline configuration) data into the target SecOps tenant leveraging the new [SecOps Ingestion API](https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.logTypes.logs/import).
### Limitations
@@ -56,11 +58,11 @@ information (for more precise configuration see the Variables section):
* GCP Project ID for SecOps anonymization pipeline deployment
* SecOps tenants information:
- * GCP projects of SecOps tenants
- * customer ID
- * deployment region for both the tenants (must be the same)
- * SA credentials with export permissions on source tenant
- * SA credentials with ingestion API grants on target tenant
+ * GCP projects of both source and target SecOps tenants
+ * SecOps customer IDs for both source and target SecOps tenants
+ * SecOps deployment region for both the tenants (must be the same)
+ * SecOps Forwarder ID for target tenant (this is mandatory for new ingestion APIs and requires at least an empty collector forwarder to be setup in target tenant)
+ * **Grant Pipeline SA Chronicle API Editor role on both source and target tenant** (this might be restricred to data export permissions on source and import logs permissions on target tenant)
#### Step 2: Prepare the variables
@@ -92,19 +94,21 @@ terraform apply
#### Step 5: Test solution
Test the solution triggering an export from the Cloud Scheduler page, after few hours (accoding to the size of the export) logs should be available on secops-export bucket. Please check for any issue during export using the corresponding APIs and the export ID.
+
## Variables
| name | description | type | required | default |
|---|---|:---:|:---:|:---:|
| [prefix](variables.tf#L59) | Prefix used for resource names. | string | ✓ | |
-| [project_id](variables.tf#L69) | Project id, references existing project if `project_create` is null. | string | ✓ | |
-| [secops_config](variables.tf#L86) | SecOps config. | object({…}) | ✓ | |
+| [project_id](variables.tf#L78) | Project id that references existing project. | string | ✓ | |
+| [secops_config](variables.tf#L95) | SecOps config. | object({…}) | ✓ | |
| [anonymization_scheduler](variables.tf#L17) | Schedule for triggering export, anonymization and import of data. | object({…}) | | {…} |
| [cloud_function_config](variables.tf#L31) | Optional Cloud Function configuration. | object({…}) | | {} |
| [dlp_config](variables.tf#L49) | Data Loss prevention configuration. | object({…}) | | null |
-| [regions](variables.tf#L74) | Regions: primary for all resources and secondary for clouds scheduler since the latter is available in few regions. | object({…}) | | {…} |
-| [skip_anonymization](variables.tf#L103) | Whether to skip anonymization step and just import data exported from source tenant. | bool | | false |
+| [project_create_config](variables.tf#L69) | Create project instead of using an existing one. | object({…}) | | null |
+| [regions](variables.tf#L83) | Regions: primary for all resources and secondary for clouds scheduler since the latter is available in few regions. | object({…}) | | {…} |
+| [skip_anonymization](variables.tf#L111) | Whether to skip anonymization step and just import data exported from source tenant. | bool | | false |
## Outputs
@@ -121,22 +125,25 @@ module "test" {
region = "europe"
alpha_apis_region = "eu"
source_tenant = {
- gcp_project = "SOURCE_PROJECT_ID"
- export_sa_key_base64 = "dGVzdAo="
+ gcp_project = "SOURCE_PROJECT_ID"
+ customer_id = "xxx-xxxxxx-xxxxx"
}
target_tenant = {
- gcp_project = "TARGET_PROJECT_ID"
- customer_id = "xxx-xxxxxx-xxxxx"
- ingestion_sa_key_base64 = "dGVzdAo="
+ gcp_project = "TARGET_PROJECT_ID"
+ customer_id = "xxx-xxxxxx-xxxxx"
+ forwarder_id = "xxxxxxxxx"
}
}
skip_anonymization = false
prefix = "pre"
project_id = "gcp-project-id"
+ project_create_config = {
+ billing_account = "12345-ABCDE-12345"
+ }
regions = {
primary = "europe-west1"
secondary = "europe-west1"
}
}
-# tftest modules=8 resources=54
+# tftest modules=7 resources=49
```
diff --git a/fast/project-templates/secops-anonymization-pipeline/main.tf b/fast/project-templates/secops-anonymization-pipeline/main.tf
index 39afb864d..fc39cc27f 100644
--- a/fast/project-templates/secops-anonymization-pipeline/main.tf
+++ b/fast/project-templates/secops-anonymization-pipeline/main.tf
@@ -20,13 +20,14 @@ locals {
deidentify_template_id = google_data_loss_prevention_deidentify_template.dlp_deidentify_template.0.id
inspect_template_id = google_data_loss_prevention_inspect_template.dlp_inspect_template.0.id
} : var.dlp_config
- secops_anonymization_export_secret_id = "secops-export-secret-json"
- secops_anonymization_import_secret_id = "secops-import-secret-json"
}
module "project" {
- source = "../../../modules/project"
- name = var.project_id
+ source = "../../../modules/project"
+ name = var.project_id
+ billing_account = try(var.project_create_config.billing_account, null)
+ parent = try(var.project_create_config.parent, null)
+ project_reuse = var.project_create_config != null ? null : {}
services = concat([
"secretmanager.googleapis.com",
"run.googleapis.com",
@@ -41,6 +42,7 @@ module "project" {
"roles/dlp.reader" = [module.function.service_account_iam_email]
"roles/dlp.jobsEditor" = [module.function.service_account_iam_email]
"roles/serviceusage.serviceUsageConsumer" = [module.function.service_account_iam_email]
+ "roles/chronicle.editor" = [module.function.service_account_iam_email]
}
iam_bindings_additive = {
function-log-writer = {
@@ -50,45 +52,6 @@ module "project" {
}
}
-module "secrets" {
- source = "../../../modules/secret-manager"
- project_id = module.project.project_id
- secrets = {
- (local.secops_anonymization_export_secret_id) = {
- locations = [var.regions.primary]
- }
- (local.secops_anonymization_import_secret_id) = {
- locations = [var.regions.primary]
- }
- }
- versions = {
- (local.secops_anonymization_export_secret_id) = {
- latest = {
- enabled = true,
- data = base64decode(var.secops_config.source_tenant.export_sa_key_base64)
- }
- }
- (local.secops_anonymization_import_secret_id) = {
- latest = {
- enabled = true,
- data = base64decode(var.secops_config.target_tenant.ingestion_sa_key_base64)
- }
- }
- }
- iam = {
- (local.secops_anonymization_export_secret_id) = {
- "roles/secretmanager.secretAccessor" = [
- "serviceAccount:${module.function.service_account_email}"
- ]
- }
- (local.secops_anonymization_import_secret_id) = {
- "roles/secretmanager.secretAccessor" = [
- "serviceAccount:${module.function.service_account_email}"
- ]
- }
- }
-}
-
module "export-bucket" {
source = "../../../modules/gcs"
project_id = module.project.project_id
@@ -177,15 +140,16 @@ module "function" {
path = "${path.module}/source"
}
environment_variables = merge({
- GCP_PROJECT = module.project.project_id
- SKIP_ANONYMIZATION = var.skip_anonymization
- SECOPS_SOURCE_SA_KEY_SECRET_PATH = "/app/secrets/source/latest"
- SECOPS_TARGET_SA_KEY_SECRET_PATH = "/app/secrets/target/latest"
- SECOPS_TARGET_CUSTOMER_ID = var.secops_config.target_tenant.customer_id
- SECOPS_REGION = var.secops_config.region
- SECOPS_ALPHA_APIS_REGION = var.secops_config.alpha_apis_region
- SECOPS_EXPORT_BUCKET = module.export-bucket.name
- LOG_EXECUTION_ID = "true"
+ GCP_PROJECT = module.project.project_id
+ SKIP_ANONYMIZATION = var.skip_anonymization
+ SECOPS_SOURCE_PROJECT = var.secops_config.source_tenant.gcp_project
+ SECOPS_TARGET_PROJECT = var.secops_config.target_tenant.gcp_project
+ SECOPS_SOURCE_CUSTOMER_ID = var.secops_config.source_tenant.customer_id
+ SECOPS_TARGET_CUSTOMER_ID = var.secops_config.target_tenant.customer_id
+ SECOPS_TARGET_FORWARDER_ID = var.secops_config.target_tenant.forwarder_id
+ SECOPS_REGION = var.secops_config.region
+ SECOPS_EXPORT_BUCKET = module.export-bucket.name
+ LOG_EXECUTION_ID = "true"
}, var.skip_anonymization ? {} : {
SECOPS_OUTPUT_BUCKET = module.anonymized-bucket.0.name
DLP_DEIDENTIFY_TEMPLATE_ID = local.dlp_config.deidentify_template_id
@@ -202,24 +166,7 @@ module "function" {
"serviceAccount:${module.scheduler-sa.email}"
]
}
- secrets = {
- "/app/secrets/source" = {
- is_volume = true
- project_id = module.project.number
- secret = local.secops_anonymization_export_secret_id
- versions = [
- "latest:latest"
- ]
- }
- "/app/secrets/target" = {
- is_volume = true
- project_id = module.project.number
- secret = local.secops_anonymization_import_secret_id
- versions = [
- "latest:latest"
- ]
- }
- }
+ secrets = {}
vpc_connector = (
var.cloud_function_config.vpc_connector == null
? {}
diff --git a/fast/project-templates/secops-anonymization-pipeline/source/main.py b/fast/project-templates/secops-anonymization-pipeline/source/main.py
index b02f16e50..d7949cef4 100644
--- a/fast/project-templates/secops-anonymization-pipeline/source/main.py
+++ b/fast/project-templates/secops-anonymization-pipeline/source/main.py
@@ -17,16 +17,13 @@ import json
import os
import click
import logging
-import sys
import google.cloud.logging
-from google.auth.transport.requests import AuthorizedSession
-from google.oauth2 import service_account
-from shared.secops import SecOpsUtils
from jinja2 import Template
from shared import utils
from google.cloud import dlp_v2
from google.cloud import storage
-from datetime import date, timedelta
+from datetime import date, timedelta, datetime
+from secops import SecOpsClient
client = google.cloud.logging.Client()
client.setup_logging()
@@ -37,79 +34,53 @@ logging.basicConfig(
format='[%(levelname)-8s] - %(asctime)s - %(message)s')
logging.root.setLevel(logging.DEBUG)
-SCOPES = [
- "https://www.googleapis.com/auth/chronicle-backstory",
- "https://www.googleapis.com/auth/malachite-ingestion"
-]
-
-# Threshold value in bytes for ingesting the logs to the SecOps.
-# SecOps Ingestion API allows the maximum 1MB of payload and we kept 0.5MB as a buffer.
-SIZE_THRESHOLD_BYTES = 950000
-
SECOPS_REGION = os.environ.get("SECOPS_REGION")
-SECOPS_ALPHA_APIS_REGION = os.environ.get("SECOPS_ALPHA_APIS_REGION")
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT")
SECOPS_EXPORT_BUCKET = os.environ.get("SECOPS_EXPORT_BUCKET")
SECOPS_OUTPUT_BUCKET = os.environ.get("SECOPS_OUTPUT_BUCKET")
-SECOPS_SOURCE_SA_KEY_SECRET_PATH = os.environ.get(
- "SECOPS_SOURCE_SA_KEY_SECRET_PATH")
-SECOPS_TARGET_SA_KEY_SECRET_PATH = os.environ.get(
- "SECOPS_TARGET_SA_KEY_SECRET_PATH")
+SECOPS_SOURCE_PROJECT = os.environ.get("SECOPS_SOURCE_PROJECT")
+SECOPS_TARGET_PROJECT = os.environ.get("SECOPS_TARGET_PROJECT")
+SECOPS_SOURCE_CUSTOMER_ID = os.environ.get("SECOPS_SOURCE_CUSTOMER_ID")
SECOPS_TARGET_CUSTOMER_ID = os.environ.get("SECOPS_TARGET_CUSTOMER_ID")
-
-SKIP_ANONYMIZATION = False if (os.environ.get(
- "SKIP_ANONYMIZATION", "false").lower() == "false") else True
+SECOPS_TARGET_FORWARDER_ID = os.environ.get("SECOPS_TARGET_FORWARDER_ID")
+SKIP_ANONYMIZATION = False if (os.environ.get("SKIP_ANONYMIZATION", "false").lower() == "false") else True
DLP_DEIDENTIFY_TEMPLATE_ID = os.environ.get("DLP_DEIDENTIFY_TEMPLATE_ID")
DLP_INSPECT_TEMPLATE_ID = os.environ.get("DLP_INSPECT_TEMPLATE_ID")
DLP_REGION = os.environ.get("DLP_REGION")
-INGESTION_API_URL = F"https://{SECOPS_REGION}-malachiteingestion-pa.googleapis.com"
-URI_UNSTRUCTURED = f"{INGESTION_API_URL}/v2/unstructuredlogentries:batchCreate"
-
def import_logs(export_date):
+ client = SecOpsClient()
+ chronicle = client.chronicle(customer_id=SECOPS_TARGET_CUSTOMER_ID, project_id=SECOPS_TARGET_PROJECT, region=SECOPS_REGION)
+
storage_client = storage.Client()
BUCKET = SECOPS_OUTPUT_BUCKET if not SKIP_ANONYMIZATION else SECOPS_EXPORT_BUCKET
bucket = storage_client.bucket(BUCKET)
export_ids = utils.get_secops_export_folders_for_date(BUCKET, export_date)
- backstory_credentials = service_account.Credentials.from_service_account_file(
- SECOPS_TARGET_SA_KEY_SECRET_PATH, scopes=SCOPES)
- authed_session = AuthorizedSession(backstory_credentials)
for export_id in export_ids:
for folder in utils.list_anonymized_folders(BUCKET, export_id):
log_type = folder.split("-")[0]
for log_file in utils.list_log_files(BUCKET, f"{export_id}/{folder}"):
- blob = bucket.blob(log_file) # Directly get the blob object
- with blob.open("r") as f:
- cur_entries = []
- body = {
- "customer_id": SECOPS_TARGET_CUSTOMER_ID,
- "log_type": log_type,
- "entries": cur_entries
- }
- size_of_empty_payload = sys.getsizeof(json.dumps(body))
- for line in f:
- next_entries = cur_entries + [{"logText": line.rstrip('\n')}]
- if size_of_empty_payload + sys.getsizeof(
- json.dumps(next_entries)) >= SIZE_THRESHOLD_BYTES:
- body["entries"] = cur_entries
- LOGGER.debug(body)
- LOGGER.debug(sys.getsizeof(json.dumps(body)))
- response = authed_session.post(URI_UNSTRUCTURED, json=body)
- LOGGER.debug(response)
- cur_entries = [{"logText": line.rstrip('\n')}]
- else:
- cur_entries.append({"logText": line.rstrip('\n')})
+ try:
+ blob = bucket.blob(log_file) # Directly get the blob object
+ with blob.open("r") as f:
+ logs = []
+ for line in f:
+ logs.append(line.rstrip('\n'))
+ if len(logs) == 1000:
+ response = chronicle.ingest_log(log_message=logs, log_type=log_type, forwarder_id=SECOPS_TARGET_FORWARDER_ID)
+ LOGGER.debug(response)
+ logs = []
- # Send any remaining entries
- if cur_entries:
- body["entries"] = cur_entries
- LOGGER.debug(sys.getsizeof(json.dumps(body)))
- LOGGER.debug(body)
- response = authed_session.post(URI_UNSTRUCTURED, json=body)
- LOGGER.debug(response)
+ # Send any remaining entries
+ if len(logs) > 0:
+ response = chronicle.ingest_log(log_message=logs, log_type=log_type, forwarder_id=SECOPS_TARGET_FORWARDER_ID)
+ LOGGER.debug(response)
+ except Exception as e:
+ LOGGER.error(f"Error during log ingestion': {e}")
+ raise SystemExit(f'Error during log ingestion: {e}')
# delete both export and anonymized buckets after ingesting logs
utils.delete_folder(BUCKET, export_id)
@@ -120,7 +91,7 @@ def import_logs(export_date):
def trigger_export(export_date: str, export_start_datetime: str,
- export_end_datetime: str, log_types: list):
+ export_end_datetime: str, log_types: str):
"""
Trigger secops export using Data Export API for a specific date
:param secops_source_sa_key_secret_path:
@@ -133,36 +104,35 @@ def trigger_export(export_date: str, export_start_datetime: str,
:param date: datetime (as string) with DD-MM-YYYY format
:return:
"""
- backstory_credentials = service_account.Credentials.from_service_account_file(
- SECOPS_SOURCE_SA_KEY_SECRET_PATH, scopes=SCOPES)
- secops_utils = SecOpsUtils(backstory_credentials)
+
+ client = SecOpsClient()
+ chronicle = client.chronicle(customer_id=SECOPS_SOURCE_CUSTOMER_ID, project_id=SECOPS_SOURCE_PROJECT, region=SECOPS_REGION)
export_ids = []
+
+ if export_start_datetime and export_end_datetime:
+ start_time, end_time = datetime.strptime(export_start_datetime, "%Y-%m-%dT%H:%M:%SZ"), datetime.strptime(export_end_datetime, "%Y-%m-%dT%H:%M:%SZ")
+ else:
+ start_time, end_time = utils.format_date_time_range(date_input=export_date)
+ gcs_bucket = f"projects/{GCP_PROJECT_ID}/buckets/{SECOPS_EXPORT_BUCKET}"
+
try:
- if log_types is None:
- export_response = secops_utils.create_data_export(
- project=GCP_PROJECT_ID, export_date=export_date,
- export_start_datetime=export_start_datetime,
- export_end_datetime=export_end_datetime)
+ if log_types is None or log_types == "":
+ export_response = chronicle.create_data_export(start_time=start_time, end_time=end_time, gcs_bucket=gcs_bucket, export_all_logs=True)
LOGGER.info(export_response)
- export_ids.append(export_response["dataExportId"])
- LOGGER.info(
- f"Triggered export with ID: {export_response['dataExportId']}")
+ export_id = export_response["dataExportStatus"]["name"].split("/")[-1]
+ export_ids.append(export_id)
+ LOGGER.info(f"Triggered export with ID: {export_id}")
else:
- for log_type in log_types:
- export_response = secops_utils.create_data_export(
- project=GCP_PROJECT_ID, export_date=export_date,
- export_start_datetime=export_start_datetime,
- export_end_datetime=export_end_datetime, log_type=log_type)
- LOGGER.info(export_response)
- export_ids.append(export_response["dataExportId"])
- LOGGER.info(
- f"Triggered export with ID: {export_response['dataExportId']}")
+ for log_type in log_types.split(","):
+ export_response = chronicle.create_data_export(start_time=start_time, end_time=end_time, gcs_bucket=gcs_bucket, log_type=log_type)
+ export_id = export_response["dataExportStatus"]["name"].split("/")[-1]
+ export_ids.append(export_id)
+ LOGGER.info(f"Triggered export with ID: {export_id}")
except Exception as e:
LOGGER.error(f"Error during export': {e}")
raise SystemExit(f'Error during secops export: {e}')
- LOGGER.info(f"Export IDs: {export_response['dataExportId']}")
return export_ids
@@ -172,19 +142,21 @@ def anonymize_data(export_date):
:param export_date: date for which data should be anonymized
:return:
"""
- backstory_credentials = service_account.Credentials.from_service_account_file(
- SECOPS_SOURCE_SA_KEY_SECRET_PATH, scopes=SCOPES)
- secops_utils = SecOpsUtils(backstory_credentials)
- export_ids = utils.get_secops_export_folders_for_date(SECOPS_EXPORT_BUCKET,
- export_date=export_date)
+
+ client = SecOpsClient()
+ chronicle = client.chronicle(customer_id=SECOPS_SOURCE_CUSTOMER_ID, project_id=SECOPS_SOURCE_PROJECT, region=SECOPS_REGION)
+ export_ids = utils.get_secops_export_folders_for_date(SECOPS_EXPORT_BUCKET, export_date=export_date)
export_finished = True
for export_id in export_ids:
- export = secops_utils.get_data_export(export_id=export_id)
- export_state = export["dataExportStatus"]["stage"]
- LOGGER.info(f"Export status: {export_state}.")
- if export_state != "FINISHED_SUCCESS":
+ export = chronicle.get_data_export(data_export_id=export_id)
+ LOGGER.info(f"Export response: {export}.")
+ if "dataExportStatus"in export and export["dataExportStatus"]["stage"] == "FINISHED_SUCCESS":
+ export_state = export["dataExportStatus"]["stage"]
+ LOGGER.info(f"Export status: {export_state}.")
+ else:
export_finished = False
+ break
if export_finished:
for export_id in export_ids:
@@ -209,10 +181,13 @@ def anonymize_data(export_date):
"inspect_job": dlp_job
}
- dlp_client = dlp_v2.DlpServiceClient(
- client_options={'quota_project_id': GCP_PROJECT_ID})
- response = dlp_client.create_dlp_job(request=job_request)
- LOGGER.info(response)
+ try:
+ dlp_client = dlp_v2.DlpServiceClient(client_options={'quota_project_id': GCP_PROJECT_ID})
+ response = dlp_client.create_dlp_job(request=job_request)
+ LOGGER.info(response)
+ except Exception as e:
+ LOGGER.error(f"Error during export': {e}")
+ raise SystemExit(f'Error during secops export: {e}')
else:
LOGGER.error("Export is not finished yet, please try again later.")
@@ -259,21 +234,13 @@ def main(request):
@click.command()
-@click.option('--export-date', '-d', required=False, type=str,
- help='Date for secops export and anonymization.')
-@click.option('--export-start-datetime', '-d', required=False, type=str,
- help='Start datetime for secops export and anonymization.')
-@click.option('--export-end-datetime', '-d', required=False, type=str,
- help='End datetime for secops export and anonymization.')
+@click.option('--export-date', '-d', required=False, type=str, help='Date for secops export and anonymization.')
+@click.option('--export-start-datetime', '-d', required=False, type=str, help='Start datetime for secops export and anonymization.')
+@click.option('--export-end-datetime', '-d', required=False, type=str, help='End datetime for secops export and anonymization.')
@click.option('--log-type', type=str, multiple=True)
-@click.option(
- '--action',
- type=click.Choice(['TRIGGER-EXPORT', 'ANONYMIZE-DATA',
- 'IMPORT-DATA']), required=True)
-@click.option('--debug', is_flag=True, default=False,
- help='Turn on debug logging.')
-def main_cli(export_date, export_start_datetime, export_end_datetime,
- log_type: list, action: str, debug=False):
+@click.option('--action', type=click.Choice(['TRIGGER-EXPORT', 'ANONYMIZE-DATA', 'IMPORT-DATA']), required=True)
+@click.option('--debug', is_flag=True, default=False, help='Turn on debug logging.')
+def main_cli(export_date, export_start_datetime, export_end_datetime, log_type: list, action: str, debug=False):
"""
CLI entry point.
:param date: date for secops export and anonymization
@@ -286,7 +253,7 @@ def main_cli(export_date, export_start_datetime, export_end_datetime,
trigger_export(export_date=export_date,
export_start_datetime=export_start_datetime,
export_end_datetime=export_end_datetime,
- log_types=log_type)
+ log_types=','.join(log_type))
case "ANONYMIZE-DATA":
anonymize_data(export_date=export_date)
case "IMPORT-DATA":
diff --git a/fast/project-templates/secops-anonymization-pipeline/source/requirements.txt b/fast/project-templates/secops-anonymization-pipeline/source/requirements.txt
index a61d7ec92..c82eef8f5 100644
--- a/fast/project-templates/secops-anonymization-pipeline/source/requirements.txt
+++ b/fast/project-templates/secops-anonymization-pipeline/source/requirements.txt
@@ -23,3 +23,4 @@ google-cloud-storage
click==8.1.3
google-cloud-dlp
google-cloud-logging
+secops
\ No newline at end of file
diff --git a/fast/project-templates/secops-anonymization-pipeline/source/shared/secops.py b/fast/project-templates/secops-anonymization-pipeline/source/shared/secops.py
deleted file mode 100644
index 21df22859..000000000
--- a/fast/project-templates/secops-anonymization-pipeline/source/shared/secops.py
+++ /dev/null
@@ -1,111 +0,0 @@
-# Copyright 2025 Google LLC
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import google.auth
-import logging
-import requests
-import os
-from . import utils
-from google.auth.transport.requests import AuthorizedSession
-"""SecOps utility functions."""
-
-LOGGER = logging.getLogger("secops")
-SECOPS_REGION = os.environ.get("SECOPS_REGION")
-SECOPS_EXPORT_BUCKET = os.environ.get("SECOPS_EXPORT_BUCKET")
-SECOPS_OUTPUT_BUCKET = os.environ.get("SECOPS_OUTPUT_BUCKET")
-
-
-class SecOpsUtils:
-
- def __init__(self, credentials=None):
- self.BACKSTORY_API_URL = f"https://{SECOPS_REGION}-backstory.googleapis.com/v1/tools/dataexport"
- self.INGESTION_API_URL = F"https://{SECOPS_REGION}-malachiteingestion-pa.googleapis.com"
- self.HTTP = AuthorizedSession(credentials=credentials if credentials
- is not None else google.auth.default()[0])
-
- def create_data_export(self, project, export_date, export_start_datetime,
- export_end_datetime, log_type: str = None):
- """
- Trigger Chronicle data export for the given date and log types.
-
- :param export_start_datetime:
- :param export_date:
- :param project:
- :param session: auth session for API call
- :param date: date for which data will be exported
- :return: Chronicle Data export response.
- """
- if export_start_datetime and export_end_datetime:
- start_time, end_time = export_start_datetime, export_end_datetime
- else:
- start_time, end_time = utils.format_date_time_range(
- date_input=export_date)
- gcs_bucket = f"projects/{project}/buckets/{SECOPS_EXPORT_BUCKET}"
-
- body = {
- "startTime": start_time,
- "endTime": end_time,
- "logType": "ALL_TYPES" if log_type is None else log_type,
- "gcsBucket": gcs_bucket,
- }
-
- response = self.HTTP.post(self.BACKSTORY_API_URL, json=body)
- response.raise_for_status()
- print(f"Data export created successfully.")
- return response.json()
-
- def get_data_export(self, export_id: str) -> str:
- """
- Get Chronicle data export information.
-
- :param export_id: ID of Chronicle export to get information from
- :return: Data Export status
- :raises requests.exceptions.HTTPError: If the API request fails.
- """
- try:
- response = self.HTTP.get(f"{self.BACKSTORY_API_URL}/{export_id}")
- response.raise_for_status(
- ) # Raise HTTPError for bad responses (4xx or 5xx)
- print(
- f"Data export for '{export_id}' retrieved, content is {response.json()}"
- )
- return response.json()
- except requests.exceptions.HTTPError as e:
- print(f"Error fetching data export '{export_id}': {e}")
- # You can choose to handle the error in a more specific way here,
- # like retrying the request, logging the error, or raising a custom exception.
- raise # Re-raise the exception to be handled by the caller
-
- def list_log_types(self, date):
- start_date, end_date = utils.format_date_time_range(date)
- params = {
- "startTime": start_date,
- "endTime": end_date,
- }
- response = self.HTTP.get(f"{self.BACKSTORY_API_URL}/listavailablelogtypes")
- response.raise_for_status()
- if response.status_code == 200:
- logging.info(f"Log types for date: {date} is {response.json()}")
- log_types = response.json()["availableLogTypes"]
- else:
- error_message = response.json().get("error",
- {}).get("message", "Unknown error")
- status_code = response.status_code
- logging.error(
- f"Error listing log types on {date} (Status code: {status_code}) Error message: {error_message}"
- )
- raise Exception("Error while listing log types.")
-
- return log_types
diff --git a/fast/project-templates/secops-anonymization-pipeline/source/shared/utils.py b/fast/project-templates/secops-anonymization-pipeline/source/shared/utils.py
index 8da1bf1b9..e14f9efcc 100644
--- a/fast/project-templates/secops-anonymization-pipeline/source/shared/utils.py
+++ b/fast/project-templates/secops-anonymization-pipeline/source/shared/utils.py
@@ -17,7 +17,7 @@ import os
import logging
import math
import csv
-from google.cloud import secretmanager, storage, exceptions
+from google.cloud import storage
from datetime import datetime, timedelta, timezone, time
LOGGER = logging.getLogger('secops')
@@ -25,22 +25,6 @@ LOGGER = logging.getLogger('secops')
MAX_FILE_SIZE = 61440000 # Max size supported by DLP
-def get_value_from_secret_manager(resource_path: str) -> str:
- """Retrieve the value of the secret from the Google Cloud Secret Manager.
-
- Args:
- resource_path (str): Path of the secret with version included. Ex.:
- "projects//secrets//versions/1",
- "projects//secrets//versions/latest"
-
- Returns:
- str: Payload for secret.
- """
- client = secretmanager.SecretManagerServiceClient()
- response = client.access_secret_version(name=resource_path)
- return response.payload.data.decode("UTF-8")
-
-
def format_date_time_range(date_input):
"""
Creates datetime objects for the beginning and end of the input date
@@ -56,15 +40,10 @@ def format_date_time_range(date_input):
"""
date_obj = datetime.strptime(date_input, "%Y-%m-%d")
- start_of_day = datetime.combine(date_obj.date(), time.min,
- tzinfo=timezone.utc)
+ start_of_day = datetime.combine(date_obj.date(), time.min,tzinfo=timezone.utc)
end_of_day = start_of_day + timedelta(days=1, seconds=-1)
- # Format both datetime objects
- formatted_start = start_of_day.strftime("%Y-%m-%dT%H:%M:%SZ")
- formatted_end = end_of_day.strftime("%Y-%m-%dT%H:%M:%SZ")
-
- return formatted_start, formatted_end
+ return start_of_day, end_of_day
def list_anonymized_folders(bucket_name, folder_name):
@@ -94,16 +73,10 @@ def delete_folder(bucket_name, folder_name):
bucket_name: The name of the bucket.
folder_name: The name of the folder to delete.
"""
-
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
-
- # List all blobs with the given prefix (folder name)
blobs = list(bucket.list_blobs(prefix=folder_name))
-
- # Delete the blobs in parallel
bucket.delete_blobs(blobs)
-
print(f"Folder {folder_name} deleted from bucket {bucket_name}")
@@ -210,9 +183,9 @@ def get_secops_export_folders_for_date(bucket_name, export_date):
export_ids = []
for blob in storage_client.list_blobs(bucket_name):
- if blob.time_created.strftime(
- "%Y-%m-%d") == export_date and blob.name.split(
- '/')[0] not in export_ids:
+ if "_$folder$" in blob.name:
+ continue
+ if blob.time_created.strftime("%Y-%m-%d") == export_date and blob.name.split('/')[0] not in export_ids:
export_ids.append(blob.name.split('/')[0])
return export_ids
diff --git a/fast/project-templates/secops-anonymization-pipeline/variables.tf b/fast/project-templates/secops-anonymization-pipeline/variables.tf
index 587baaedc..56d8c323c 100644
--- a/fast/project-templates/secops-anonymization-pipeline/variables.tf
+++ b/fast/project-templates/secops-anonymization-pipeline/variables.tf
@@ -66,8 +66,17 @@ variable "prefix" {
}
}
+variable "project_create_config" {
+ description = "Create project instead of using an existing one."
+ type = object({
+ billing_account = string
+ parent = optional(string)
+ })
+ default = null
+}
+
variable "project_id" {
- description = "Project id, references existing project if `project_create` is null."
+ description = "Project id that references existing project."
type = string
}
@@ -86,16 +95,15 @@ variable "regions" {
variable "secops_config" {
description = "SecOps config."
type = object({
- region = string
- alpha_apis_region = string
+ region = string
source_tenant = object({
- gcp_project = string
- export_sa_key_base64 = string
+ customer_id = string
+ gcp_project = string
})
target_tenant = object({
- gcp_project = string
- customer_id = string
- ingestion_sa_key_base64 = string
+ gcp_project = string
+ customer_id = string
+ forwarder_id = string
})
})
}