From 1809552f8a69c45876307cebcf012f9946c91fe4 Mon Sep 17 00:00:00 2001 From: simonebruzzechesse <60114646+simonebruzzechesse@users.noreply.github.com> Date: Sat, 29 Mar 2025 19:09:37 +0100 Subject: [PATCH] Improve SecOps Anonymization pipeline (#2988) * update secops anonymization pipeline with new chronicle APIs * improvements to doc for secops anonymization pipeline --------- Co-authored-by: Ludovico Magnocavallo --- .../secops-anonymization-pipeline/README.md | 38 +- .../secops-anonymization-pipeline/main.tf | 87 +---- .../source/main.py | 124 +++---- .../source/shared/secops.py | 329 +++++++++++++++++- .../source/shared/utils.py | 2 + .../variables.tf | 24 +- 6 files changed, 431 insertions(+), 173 deletions(-) diff --git a/fast/project-templates/secops-anonymization-pipeline/README.md b/fast/project-templates/secops-anonymization-pipeline/README.md index 92c0ca02b..93ff571c7 100644 --- a/fast/project-templates/secops-anonymization-pipeline/README.md +++ b/fast/project-templates/secops-anonymization-pipeline/README.md @@ -24,11 +24,11 @@ The use case is a SecOps deployment composed of 2 tenants (one for production an ### Pipeline Steps -- **SecOps Export**: Triggered via the corresponding TRIGGER-EXPORT action. Call SecOps Export API to trigger raw logs export on a GCS bucket based on either all the log types or one o more of them for a specific time frame. By default, the export will be for the previous day, otherwise the following parameters can be specified to change the time frame: +- **SecOps Export**: Triggered via the corresponding TRIGGER-EXPORT action. Call [SecOps Export API](https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.dataExports) to trigger raw logs export on a GCS bucket based on either all the log types or one o more of them for a specific time frame. By default, the export will be for the previous day, otherwise the following parameters can be specified to change the time frame: * `EXPORT_DATE` date for the export (format %Y-%m-%d) * `EXPORT_START_DATETIME` and `EXPORT_END_DATETIME` start and end datetime for the export (format %Y-%m-%dT%H:%M:%SZ). This is useful for verbose log source with GB/TB of raw logs ingested on a daily basis - **Anonymize Data**: Triggered via the corresponding ANONYMIZE-DATA action. Split the exported CSV files to one or more CSV files where the size of each file is less than 60MB (which is the maximum file size supported by DLP). It also renames those files in .log for better handling by the DLP Job. It will then trigger an asynchronous DLP job to anonymize data. -- **Import Data**: Triggered via the corresponding IMPORT-DATA action. Import the exported raw logs (or anonymized ones according to the pipeline configuration) data into the target SecOps tenant leveraging the [Ingestion API](https://cloud.google.com/chronicle/docs/reference/ingestion-api). +- **Import Data**: Triggered via the corresponding IMPORT-DATA action. Import the exported raw logs (or anonymized ones according to the pipeline configuration) data into the target SecOps tenant leveraging the new [SecOps Ingestion API](https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.logTypes.logs/import). ### Limitations @@ -56,11 +56,11 @@ information (for more precise configuration see the Variables section): * GCP Project ID for SecOps anonymization pipeline deployment * SecOps tenants information: - * GCP projects of SecOps tenants - * customer ID - * deployment region for both the tenants (must be the same) - * SA credentials with export permissions on source tenant - * SA credentials with ingestion API grants on target tenant + * GCP projects of both source and target SecOps tenants + * SecOps customer IDs for both source and target SecOps tenants + * SecOps deployment region for both the tenants (must be the same) + * SecOps Forwarder ID for target tenant (this is mandatory for new ingestion APIs and requires at least an empty collector forwarder to be setup in target tenant) + * **Grant Pipeline SA Chronicle API Editor role on both source and target tenant** (this might be restricred to data export permissions on source and import logs permissions on target tenant) #### Step 2: Prepare the variables @@ -98,13 +98,14 @@ Test the solution triggering an export from the Cloud Scheduler page, after few | name | description | type | required | default | |---|---|:---:|:---:|:---:| | [prefix](variables.tf#L59) | Prefix used for resource names. | string | ✓ | | -| [project_id](variables.tf#L69) | Project id, references existing project if `project_create` is null. | string | ✓ | | -| [secops_config](variables.tf#L86) | SecOps config. | object({…}) | ✓ | | +| [project_id](variables.tf#L78) | Project id that references existing project. | string | ✓ | | +| [secops_config](variables.tf#L95) | SecOps config. | object({…}) | ✓ | | | [anonymization_scheduler](variables.tf#L17) | Schedule for triggering export, anonymization and import of data. | object({…}) | | {…} | | [cloud_function_config](variables.tf#L31) | Optional Cloud Function configuration. | object({…}) | | {} | | [dlp_config](variables.tf#L49) | Data Loss prevention configuration. | object({…}) | | null | -| [regions](variables.tf#L74) | Regions: primary for all resources and secondary for clouds scheduler since the latter is available in few regions. | object({…}) | | {…} | -| [skip_anonymization](variables.tf#L103) | Whether to skip anonymization step and just import data exported from source tenant. | bool | | false | +| [project_create_config](variables.tf#L69) | Create project instead of using an existing one. | object({…}) | | null | +| [regions](variables.tf#L83) | Regions: primary for all resources and secondary for clouds scheduler since the latter is available in few regions. | object({…}) | | {…} | +| [skip_anonymization](variables.tf#L111) | Whether to skip anonymization step and just import data exported from source tenant. | bool | | false | ## Outputs @@ -121,22 +122,25 @@ module "test" { region = "europe" alpha_apis_region = "eu" source_tenant = { - gcp_project = "SOURCE_PROJECT_ID" - export_sa_key_base64 = "dGVzdAo=" + gcp_project = "SOURCE_PROJECT_ID" + customer_id = "xxx-xxxxxx-xxxxx" } target_tenant = { - gcp_project = "TARGET_PROJECT_ID" - customer_id = "xxx-xxxxxx-xxxxx" - ingestion_sa_key_base64 = "dGVzdAo=" + gcp_project = "TARGET_PROJECT_ID" + customer_id = "xxx-xxxxxx-xxxxx" + forwarder_id = "xxxxxxxxx" } } skip_anonymization = false prefix = "pre" project_id = "gcp-project-id" + project_create_config = { + billing_account = "12345-ABCDE-12345" + } regions = { primary = "europe-west1" secondary = "europe-west1" } } -# tftest modules=8 resources=54 +# tftest modules=7 resources=49 ``` diff --git a/fast/project-templates/secops-anonymization-pipeline/main.tf b/fast/project-templates/secops-anonymization-pipeline/main.tf index 39afb864d..fc39cc27f 100644 --- a/fast/project-templates/secops-anonymization-pipeline/main.tf +++ b/fast/project-templates/secops-anonymization-pipeline/main.tf @@ -20,13 +20,14 @@ locals { deidentify_template_id = google_data_loss_prevention_deidentify_template.dlp_deidentify_template.0.id inspect_template_id = google_data_loss_prevention_inspect_template.dlp_inspect_template.0.id } : var.dlp_config - secops_anonymization_export_secret_id = "secops-export-secret-json" - secops_anonymization_import_secret_id = "secops-import-secret-json" } module "project" { - source = "../../../modules/project" - name = var.project_id + source = "../../../modules/project" + name = var.project_id + billing_account = try(var.project_create_config.billing_account, null) + parent = try(var.project_create_config.parent, null) + project_reuse = var.project_create_config != null ? null : {} services = concat([ "secretmanager.googleapis.com", "run.googleapis.com", @@ -41,6 +42,7 @@ module "project" { "roles/dlp.reader" = [module.function.service_account_iam_email] "roles/dlp.jobsEditor" = [module.function.service_account_iam_email] "roles/serviceusage.serviceUsageConsumer" = [module.function.service_account_iam_email] + "roles/chronicle.editor" = [module.function.service_account_iam_email] } iam_bindings_additive = { function-log-writer = { @@ -50,45 +52,6 @@ module "project" { } } -module "secrets" { - source = "../../../modules/secret-manager" - project_id = module.project.project_id - secrets = { - (local.secops_anonymization_export_secret_id) = { - locations = [var.regions.primary] - } - (local.secops_anonymization_import_secret_id) = { - locations = [var.regions.primary] - } - } - versions = { - (local.secops_anonymization_export_secret_id) = { - latest = { - enabled = true, - data = base64decode(var.secops_config.source_tenant.export_sa_key_base64) - } - } - (local.secops_anonymization_import_secret_id) = { - latest = { - enabled = true, - data = base64decode(var.secops_config.target_tenant.ingestion_sa_key_base64) - } - } - } - iam = { - (local.secops_anonymization_export_secret_id) = { - "roles/secretmanager.secretAccessor" = [ - "serviceAccount:${module.function.service_account_email}" - ] - } - (local.secops_anonymization_import_secret_id) = { - "roles/secretmanager.secretAccessor" = [ - "serviceAccount:${module.function.service_account_email}" - ] - } - } -} - module "export-bucket" { source = "../../../modules/gcs" project_id = module.project.project_id @@ -177,15 +140,16 @@ module "function" { path = "${path.module}/source" } environment_variables = merge({ - GCP_PROJECT = module.project.project_id - SKIP_ANONYMIZATION = var.skip_anonymization - SECOPS_SOURCE_SA_KEY_SECRET_PATH = "/app/secrets/source/latest" - SECOPS_TARGET_SA_KEY_SECRET_PATH = "/app/secrets/target/latest" - SECOPS_TARGET_CUSTOMER_ID = var.secops_config.target_tenant.customer_id - SECOPS_REGION = var.secops_config.region - SECOPS_ALPHA_APIS_REGION = var.secops_config.alpha_apis_region - SECOPS_EXPORT_BUCKET = module.export-bucket.name - LOG_EXECUTION_ID = "true" + GCP_PROJECT = module.project.project_id + SKIP_ANONYMIZATION = var.skip_anonymization + SECOPS_SOURCE_PROJECT = var.secops_config.source_tenant.gcp_project + SECOPS_TARGET_PROJECT = var.secops_config.target_tenant.gcp_project + SECOPS_SOURCE_CUSTOMER_ID = var.secops_config.source_tenant.customer_id + SECOPS_TARGET_CUSTOMER_ID = var.secops_config.target_tenant.customer_id + SECOPS_TARGET_FORWARDER_ID = var.secops_config.target_tenant.forwarder_id + SECOPS_REGION = var.secops_config.region + SECOPS_EXPORT_BUCKET = module.export-bucket.name + LOG_EXECUTION_ID = "true" }, var.skip_anonymization ? {} : { SECOPS_OUTPUT_BUCKET = module.anonymized-bucket.0.name DLP_DEIDENTIFY_TEMPLATE_ID = local.dlp_config.deidentify_template_id @@ -202,24 +166,7 @@ module "function" { "serviceAccount:${module.scheduler-sa.email}" ] } - secrets = { - "/app/secrets/source" = { - is_volume = true - project_id = module.project.number - secret = local.secops_anonymization_export_secret_id - versions = [ - "latest:latest" - ] - } - "/app/secrets/target" = { - is_volume = true - project_id = module.project.number - secret = local.secops_anonymization_import_secret_id - versions = [ - "latest:latest" - ] - } - } + secrets = {} vpc_connector = ( var.cloud_function_config.vpc_connector == null ? {} diff --git a/fast/project-templates/secops-anonymization-pipeline/source/main.py b/fast/project-templates/secops-anonymization-pipeline/source/main.py index b02f16e50..350edfd69 100644 --- a/fast/project-templates/secops-anonymization-pipeline/source/main.py +++ b/fast/project-templates/secops-anonymization-pipeline/source/main.py @@ -21,12 +21,12 @@ import sys import google.cloud.logging from google.auth.transport.requests import AuthorizedSession from google.oauth2 import service_account -from shared.secops import SecOpsUtils from jinja2 import Template from shared import utils from google.cloud import dlp_v2 from google.cloud import storage from datetime import date, timedelta +from shared import secops client = google.cloud.logging.Client() client.setup_logging() @@ -42,20 +42,15 @@ SCOPES = [ "https://www.googleapis.com/auth/malachite-ingestion" ] -# Threshold value in bytes for ingesting the logs to the SecOps. -# SecOps Ingestion API allows the maximum 1MB of payload and we kept 0.5MB as a buffer. -SIZE_THRESHOLD_BYTES = 950000 - SECOPS_REGION = os.environ.get("SECOPS_REGION") -SECOPS_ALPHA_APIS_REGION = os.environ.get("SECOPS_ALPHA_APIS_REGION") GCP_PROJECT_ID = os.environ.get("GCP_PROJECT") SECOPS_EXPORT_BUCKET = os.environ.get("SECOPS_EXPORT_BUCKET") SECOPS_OUTPUT_BUCKET = os.environ.get("SECOPS_OUTPUT_BUCKET") -SECOPS_SOURCE_SA_KEY_SECRET_PATH = os.environ.get( - "SECOPS_SOURCE_SA_KEY_SECRET_PATH") -SECOPS_TARGET_SA_KEY_SECRET_PATH = os.environ.get( - "SECOPS_TARGET_SA_KEY_SECRET_PATH") +SECOPS_SOURCE_PROJECT = os.environ.get("SECOPS_SOURCE_PROJECT") +SECOPS_TARGET_PROJECT = os.environ.get("SECOPS_TARGET_PROJECT") +SECOPS_SOURCE_CUSTOMER_ID = os.environ.get("SECOPS_SOURCE_CUSTOMER_ID") SECOPS_TARGET_CUSTOMER_ID = os.environ.get("SECOPS_TARGET_CUSTOMER_ID") +SECOPS_TARGET_FORWARDER_ID = os.environ.get("SECOPS_TARGET_FORWARDER_ID") SKIP_ANONYMIZATION = False if (os.environ.get( "SKIP_ANONYMIZATION", "false").lower() == "false") else True @@ -63,18 +58,23 @@ DLP_DEIDENTIFY_TEMPLATE_ID = os.environ.get("DLP_DEIDENTIFY_TEMPLATE_ID") DLP_INSPECT_TEMPLATE_ID = os.environ.get("DLP_INSPECT_TEMPLATE_ID") DLP_REGION = os.environ.get("DLP_REGION") -INGESTION_API_URL = F"https://{SECOPS_REGION}-malachiteingestion-pa.googleapis.com" -URI_UNSTRUCTURED = f"{INGESTION_API_URL}/v2/unstructuredlogentries:batchCreate" - def import_logs(export_date): + # Initialize with default credentials - will automatically use the service account + # assigned to your Google Cloud resource + client = secops.SecOpsClient() + + # Initialize Chronicle client + chronicle = client.chronicle( + customer_id=SECOPS_TARGET_CUSTOMER_ID, # Your Chronicle instance ID + project_id=SECOPS_TARGET_PROJECT, # Your GCP project ID + region=SECOPS_REGION # Chronicle API region + ) + storage_client = storage.Client() BUCKET = SECOPS_OUTPUT_BUCKET if not SKIP_ANONYMIZATION else SECOPS_EXPORT_BUCKET bucket = storage_client.bucket(BUCKET) export_ids = utils.get_secops_export_folders_for_date(BUCKET, export_date) - backstory_credentials = service_account.Credentials.from_service_account_file( - SECOPS_TARGET_SA_KEY_SECRET_PATH, scopes=SCOPES) - authed_session = AuthorizedSession(backstory_credentials) for export_id in export_ids: for folder in utils.list_anonymized_folders(BUCKET, export_id): @@ -83,33 +83,18 @@ def import_logs(export_date): for log_file in utils.list_log_files(BUCKET, f"{export_id}/{folder}"): blob = bucket.blob(log_file) # Directly get the blob object with blob.open("r") as f: - cur_entries = [] - body = { - "customer_id": SECOPS_TARGET_CUSTOMER_ID, - "log_type": log_type, - "entries": cur_entries - } - size_of_empty_payload = sys.getsizeof(json.dumps(body)) + logs = [] for line in f: - next_entries = cur_entries + [{"logText": line.rstrip('\n')}] - if size_of_empty_payload + sys.getsizeof( - json.dumps(next_entries)) >= SIZE_THRESHOLD_BYTES: - body["entries"] = cur_entries - LOGGER.debug(body) - LOGGER.debug(sys.getsizeof(json.dumps(body))) - response = authed_session.post(URI_UNSTRUCTURED, json=body) + logs.append(line.rstrip('\n')) + if len(logs) == 1000: + response = chronicle.ingest_logs(logs=logs, log_type=log_type, forwarder_id=SECOPS_TARGET_FORWARDER_ID) LOGGER.debug(response) - cur_entries = [{"logText": line.rstrip('\n')}] - else: - cur_entries.append({"logText": line.rstrip('\n')}) + logs = [] # Send any remaining entries - if cur_entries: - body["entries"] = cur_entries - LOGGER.debug(sys.getsizeof(json.dumps(body))) - LOGGER.debug(body) - response = authed_session.post(URI_UNSTRUCTURED, json=body) - LOGGER.debug(response) + if len(logs) > 0: + response = chronicle.ingest_logs(logs=logs, log_type=log_type, forwarder_id=SECOPS_TARGET_FORWARDER_ID) + LOGGER.debug(response) # delete both export and anonymized buckets after ingesting logs utils.delete_folder(BUCKET, export_id) @@ -120,7 +105,7 @@ def import_logs(export_date): def trigger_export(export_date: str, export_start_datetime: str, - export_end_datetime: str, log_types: list): + export_end_datetime: str, log_types: str): """ Trigger secops export using Data Export API for a specific date :param secops_source_sa_key_secret_path: @@ -133,36 +118,43 @@ def trigger_export(export_date: str, export_start_datetime: str, :param date: datetime (as string) with DD-MM-YYYY format :return: """ - backstory_credentials = service_account.Credentials.from_service_account_file( - SECOPS_SOURCE_SA_KEY_SECRET_PATH, scopes=SCOPES) - secops_utils = SecOpsUtils(backstory_credentials) + + + # Initialize with default credentials - will automatically use the service account + # assigned to your Google Cloud resource + client = secops.SecOpsClient() + + # Initialize Chronicle client + chronicle = client.chronicle( + customer_id=SECOPS_SOURCE_CUSTOMER_ID, # Your Chronicle instance ID + project_id=SECOPS_SOURCE_PROJECT, # Your GCP project ID + region=SECOPS_REGION # Chronicle API region + ) export_ids = [] try: if log_types is None: - export_response = secops_utils.create_data_export( + export_response = chronicle.create_data_export( project=GCP_PROJECT_ID, export_date=export_date, export_start_datetime=export_start_datetime, export_end_datetime=export_end_datetime) LOGGER.info(export_response) - export_ids.append(export_response["dataExportId"]) - LOGGER.info( - f"Triggered export with ID: {export_response['dataExportId']}") + export_id = export_response["dataExportStatus"]["name"].split("/")[-1] + export_ids.append(export_id) + LOGGER.info(f"Triggered export with ID: {export_id}") else: - for log_type in log_types: - export_response = secops_utils.create_data_export( + for log_type in log_types.split(","): + export_response = chronicle.create_data_export( project=GCP_PROJECT_ID, export_date=export_date, export_start_datetime=export_start_datetime, export_end_datetime=export_end_datetime, log_type=log_type) - LOGGER.info(export_response) - export_ids.append(export_response["dataExportId"]) - LOGGER.info( - f"Triggered export with ID: {export_response['dataExportId']}") + export_id = export_response["dataExportStatus"]["name"].split("/")[-1] + export_ids.append(export_id) + LOGGER.info(f"Triggered export with ID: {export_id}") except Exception as e: LOGGER.error(f"Error during export': {e}") raise SystemExit(f'Error during secops export: {e}') - LOGGER.info(f"Export IDs: {export_response['dataExportId']}") return export_ids @@ -172,19 +164,29 @@ def anonymize_data(export_date): :param export_date: date for which data should be anonymized :return: """ - backstory_credentials = service_account.Credentials.from_service_account_file( - SECOPS_SOURCE_SA_KEY_SECRET_PATH, scopes=SCOPES) - secops_utils = SecOpsUtils(backstory_credentials) + # Initialize with default credentials - will automatically use the service account + # assigned to your Google Cloud resource + client = secops.SecOpsClient() + + # Initialize Chronicle client + chronicle = client.chronicle( + customer_id=SECOPS_SOURCE_CUSTOMER_ID, # Your Chronicle instance ID + project_id=SECOPS_SOURCE_PROJECT, # Your GCP project ID + region=SECOPS_REGION # Chronicle API region + ) export_ids = utils.get_secops_export_folders_for_date(SECOPS_EXPORT_BUCKET, export_date=export_date) export_finished = True for export_id in export_ids: - export = secops_utils.get_data_export(export_id=export_id) - export_state = export["dataExportStatus"]["stage"] - LOGGER.info(f"Export status: {export_state}.") - if export_state != "FINISHED_SUCCESS": + export = chronicle.get_data_export(name=export_id) + LOGGER.info(f"Export response: {export}.") + if "dataExportStatus"in export and export["dataExportStatus"]["stage"] == "FINISHED_SUCCESS": + export_state = export["dataExportStatus"]["stage"] + LOGGER.info(f"Export status: {export_state}.") + else: export_finished = False + break if export_finished: for export_id in export_ids: diff --git a/fast/project-templates/secops-anonymization-pipeline/source/shared/secops.py b/fast/project-templates/secops-anonymization-pipeline/source/shared/secops.py index 21df22859..1cfad468f 100644 --- a/fast/project-templates/secops-anonymization-pipeline/source/shared/secops.py +++ b/fast/project-templates/secops-anonymization-pipeline/source/shared/secops.py @@ -12,6 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import base64 +import uuid +from datetime import datetime +from typing import Optional, Any, List, Dict import google.auth import logging @@ -19,6 +23,12 @@ import requests import os from . import utils from google.auth.transport.requests import AuthorizedSession +from google.auth.transport import requests as google_auth_requests +from google.auth.credentials import Credentials +from google.oauth2 import service_account +import google.auth +import google.auth.transport.requests + """SecOps utility functions.""" LOGGER = logging.getLogger("secops") @@ -27,16 +37,141 @@ SECOPS_EXPORT_BUCKET = os.environ.get("SECOPS_EXPORT_BUCKET") SECOPS_OUTPUT_BUCKET = os.environ.get("SECOPS_OUTPUT_BUCKET") -class SecOpsUtils: - def __init__(self, credentials=None): - self.BACKSTORY_API_URL = f"https://{SECOPS_REGION}-backstory.googleapis.com/v1/tools/dataexport" - self.INGESTION_API_URL = F"https://{SECOPS_REGION}-malachiteingestion-pa.googleapis.com" - self.HTTP = AuthorizedSession(credentials=credentials if credentials - is not None else google.auth.default()[0]) +# Define default scopes needed for Chronicle API +CHRONICLE_SCOPES = [ + "https://www.googleapis.com/auth/cloud-platform" +] - def create_data_export(self, project, export_date, export_start_datetime, - export_end_datetime, log_type: str = None): +class SecOpsAuth: + """Handles authentication for the Google SecOps SDK.""" + + def __init__( + self, + credentials: Optional[Credentials] = None, + service_account_path: Optional[str] = None, + service_account_info: Optional[Dict[str, Any]] = None, + scopes: Optional[List[str]] = None + ): + """Initialize authentication for SecOps. + + Args: + credentials: Optional pre-existing Google Auth credentials + service_account_path: Optional path to service account JSON key file + service_account_info: Optional service account JSON key data as dict + scopes: Optional list of OAuth scopes to request + """ + self.scopes = scopes or CHRONICLE_SCOPES + self.credentials = self._get_credentials( + credentials, + service_account_path, + service_account_info + ) + self._session = None + + def _get_credentials( + self, + credentials: Optional[Credentials], + service_account_path: Optional[str], + service_account_info: Optional[Dict[str, Any]] + ) -> Credentials: + """Get credentials from various sources.""" + try: + if credentials: + return credentials.with_scopes(self.scopes) + + if service_account_info: + return service_account.Credentials.from_service_account_info( + service_account_info, + scopes=self.scopes + ) + + if service_account_path: + return service_account.Credentials.from_service_account_file( + service_account_path, + scopes=self.scopes + ) + + # Try to get default credentials + credentials, project = google.auth.default(scopes=self.scopes) + return credentials + except Exception as e: + raise Exception(f"Failed to get credentials: {str(e)}") + + @property + def session(self): + """Get an authorized session using the credentials. + + Returns: + Authorized session for API requests + """ + if self._session is None: + self._session = google.auth.transport.requests.AuthorizedSession( + self.credentials + ) + return self._session + + +class ChronicleClient: + """Client for the Chronicle API.""" + + def __init__( + self, + project_id: str, + customer_id: str, + region: str = "us", + auth: Optional[Any] = None, + session: Optional[Any] = None, + extra_scopes: Optional[List[str]] = None, + credentials: Optional[Any] = None, + ): + """Initialize ChronicleClient. + + Args: + project_id: Google Cloud project ID + customer_id: Chronicle customer ID + region: Chronicle region, typically "us" or "eu" + auth: Authentication object + session: Custom session object + extra_scopes: Additional OAuth scopes + credentials: Credentials object + """ + self.project_id = project_id + self.customer_id = customer_id + self.region = region + + # Format the instance ID to match the expected format + self.instance_id = f"projects/{project_id}/locations/{region}/instances/{customer_id}" + + # Set up the base URL + self.base_url = f"https://{self.region}-chronicle.googleapis.com/v1alpha" + + # Create a session with authentication + if session: + self._session = session + else: + + if auth is None: + auth = SecOpsAuth( + scopes=[ + "https://www.googleapis.com/auth/cloud-platform", + "https://www.googleapis.com/auth/chronicle-backstory", + ] + (extra_scopes or []), + credentials=credentials, + ) + + self._session = auth.session + + @property + def session(self) -> google_auth_requests.AuthorizedSession: + """Get an authenticated session. + + Returns: + Authorized session for API requests + """ + return self._session + + def create_data_export(self, project, export_date, export_start_datetime, export_end_datetime, log_type: str = None): """ Trigger Chronicle data export for the given date and log types. @@ -51,21 +186,181 @@ class SecOpsUtils: start_time, end_time = export_start_datetime, export_end_datetime else: start_time, end_time = utils.format_date_time_range( - date_input=export_date) + date_input=export_date) gcs_bucket = f"projects/{project}/buckets/{SECOPS_EXPORT_BUCKET}" - body = { - "startTime": start_time, - "endTime": end_time, - "logType": "ALL_TYPES" if log_type is None else log_type, - "gcsBucket": gcs_bucket, - } + # Construct the import URL + url = f"{self.base_url}/{self.instance_id}/dataExports" - response = self.HTTP.post(self.BACKSTORY_API_URL, json=body) - response.raise_for_status() + # Generate a unique ID for this log entry + log_id = str(uuid.uuid4()) + + # Construct the request payload + payload = { + "name": log_id, + "start_time": start_time, + "end_time": end_time, + "log_type": "" if log_type is None else f"projects/{self.project_id}/locations/{self.region}/instances/{self.customer_id}/logTypes/{log_type}", + "gcs_bucket": gcs_bucket + #"export_all_logs": "true" if log_type is None else "false" + } + print(f"Payload: {payload}") + + # Send the request + response = self.session.post(url, json=payload) print(f"Data export created successfully.") return response.json() + + def get_data_export(self, name: str): + """ + Trigger Chronicle data export for the given date and log types. + + :param name: name of data export request + :return: Chronicle Data export response. + """ + + # Construct the import URL + url = f"{self.base_url}/{self.instance_id}/dataExports/{name}" + + # Send the request + response = self.session.get(url) + print(f"Data export created successfully.") + return response.json() + + def ingest_logs( + self, + log_type: str, + logs: list, + log_entry_time: Optional[datetime] = None, + collection_time: Optional[datetime] = None, + forwarder_id: Optional[str] = None, + force_log_type: bool = False + ) -> Dict[str, Any]: + """Ingest a log into Chronicle. + + Args: + self: ChronicleClient instance + log_type: Chronicle log type (e.g., "OKTA", "WINDOWS", etc.) + log_message: The raw log message to ingest + log_entry_time: The time the log entry was created (defaults to current time) + collection_time: The time the log was collected (defaults to current time) + forwarder_id: ID of the forwarder to use (creates or uses default if None) + force_log_type: Whether to force using the log type even if not in the valid list + + Returns: + Dictionary containing the operation details for the ingestion + + Raises: + ValueError: If the log type is invalid or timestamps are invalid + APIError: If the API request fails + """ + # Validate log type + # if not is_valid_log_type(log_type) and not force_log_type: + # raise ValueError(f"Invalid log type: {log_type}. Use force_log_type=True to override.") + + # Get current time as default for log_entry_time and collection_time + now = datetime.now() + + # If log_entry_time is not provided, use current time + if log_entry_time is None: + log_entry_time = now + + # If collection_time is not provided, use current time + if collection_time is None: + collection_time = now + + # Validate that collection_time is not before log_entry_time + if collection_time < log_entry_time: + raise ValueError("Collection time must be same or after log entry time") + + # Format timestamps for API + log_entry_time_str = log_entry_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + collection_time_str = collection_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + + # If forwarder_id is not provided, get or create default forwarder + # if forwarder_id is None: + # forwarder = get_or_create_forwarder(client) + # forwarder_id = extract_forwarder_id(forwarder["name"]) + + # Construct the full forwarder resource name if needed + if '/' not in forwarder_id: + forwarder_resource = f"{self.instance_id}/forwarders/{forwarder_id}" + else: + forwarder_resource = forwarder_id + + # Construct the import URL + url = f"{self.base_url}/{self.instance_id}/logTypes/{log_type}/logs:import" + + # Generate a unique ID for this log entry + log_id = str(uuid.uuid4()) + + # Construct the request payload + payload = { + "inline_source": { + "logs": [ + { + "name": f"{self.instance_id}/logTypes/{log_type}/logs/{log_id}", + "data": base64.b64encode(log.encode('utf-8')).decode('utf-8'), + "log_entry_time": log_entry_time_str, + "collection_time": collection_time_str + } for log in logs + ], + "forwarder": forwarder_resource + } + } + + # Send the request + response = self.session.post(url, json=payload) + + # Check for errors + if response.status_code != 200: + raise Exception(f"Failed to ingest log: {response.text}") + + return response.json() + +class SecOpsClient: + """Main client class for interacting with Google SecOps.""" + + def __init__( + self, + credentials: Optional[Credentials] = None, + service_account_path: Optional[str] = None, + service_account_info: Optional[Dict[str, Any]] = None + ): + """Initialize the SecOps client. + + Args: + credentials: Optional pre-existing Google Auth credentials + service_account_path: Optional path to service account JSON key file + service_account_info: Optional service account JSON key data as dict + """ + self.auth = SecOpsAuth( + credentials=credentials, + service_account_path=service_account_path, + service_account_info=service_account_info + ) + self._chronicle = None + + def chronicle(self, customer_id: str, project_id: str, region: str = "us") -> ChronicleClient: + """Get Chronicle API client. + + Args: + customer_id: Chronicle customer ID + project_id: GCP project ID + region: Chronicle API region (default: "us") + + Returns: + ChronicleClient instance + """ + return ChronicleClient( + customer_id=customer_id, + project_id=project_id, + region=region, + auth=self.auth + ) + + def get_data_export(self, export_id: str) -> str: """ Get Chronicle data export information. diff --git a/fast/project-templates/secops-anonymization-pipeline/source/shared/utils.py b/fast/project-templates/secops-anonymization-pipeline/source/shared/utils.py index 8da1bf1b9..e966c83b5 100644 --- a/fast/project-templates/secops-anonymization-pipeline/source/shared/utils.py +++ b/fast/project-templates/secops-anonymization-pipeline/source/shared/utils.py @@ -210,6 +210,8 @@ def get_secops_export_folders_for_date(bucket_name, export_date): export_ids = [] for blob in storage_client.list_blobs(bucket_name): + if "_$folder$" in blob.name: + continue if blob.time_created.strftime( "%Y-%m-%d") == export_date and blob.name.split( '/')[0] not in export_ids: diff --git a/fast/project-templates/secops-anonymization-pipeline/variables.tf b/fast/project-templates/secops-anonymization-pipeline/variables.tf index 587baaedc..56d8c323c 100644 --- a/fast/project-templates/secops-anonymization-pipeline/variables.tf +++ b/fast/project-templates/secops-anonymization-pipeline/variables.tf @@ -66,8 +66,17 @@ variable "prefix" { } } +variable "project_create_config" { + description = "Create project instead of using an existing one." + type = object({ + billing_account = string + parent = optional(string) + }) + default = null +} + variable "project_id" { - description = "Project id, references existing project if `project_create` is null." + description = "Project id that references existing project." type = string } @@ -86,16 +95,15 @@ variable "regions" { variable "secops_config" { description = "SecOps config." type = object({ - region = string - alpha_apis_region = string + region = string source_tenant = object({ - gcp_project = string - export_sa_key_base64 = string + customer_id = string + gcp_project = string }) target_tenant = object({ - gcp_project = string - customer_id = string - ingestion_sa_key_base64 = string + gcp_project = string + customer_id = string + forwarder_id = string }) }) }