New Managed Kafka module (#3054)

This commit is contained in:
Julio Castillo
2025-04-24 08:52:03 +02:00
committed by GitHub
parent 9e6d1030d0
commit fdf9191460
12 changed files with 796 additions and 1 deletions

View File

@@ -0,0 +1,35 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Fabric release: v39.0.0
terraform {
required_version = ">= 1.10.2"
required_providers {
google = {
source = "hashicorp/google"
version = ">= 6.28.0, < 7.0.0" # tftest
}
google-beta = {
source = "hashicorp/google-beta"
version = ">= 6.28.0, < 7.0.0" # tftest
}
}
provider_meta "google" {
module_name = "google-pso-tool/cloud-foundation-fabric/modules/dataplex-aspect-types:v39.0.0-tf"
}
provider_meta "google-beta" {
module_name = "google-pso-tool/cloud-foundation-fabric/modules/dataplex-aspect-types:v39.0.0-tf"
}
}

View File

@@ -0,0 +1,35 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Fabric release: v39.0.0
terraform {
required_version = ">= 1.9.0"
required_providers {
google = {
source = "hashicorp/google"
version = ">= 6.28.0, < 7.0.0" # tftest
}
google-beta = {
source = "hashicorp/google-beta"
version = ">= 6.28.0, < 7.0.0" # tftest
}
}
provider_meta "google" {
module_name = "google-pso-tool/cloud-foundation-fabric/modules/dataplex-aspect-types:v39.0.0-tofu"
}
provider_meta "google-beta" {
module_name = "google-pso-tool/cloud-foundation-fabric/modules/dataplex-aspect-types:v39.0.0-tofu"
}
}

View File

@@ -0,0 +1,191 @@
# Managed Kafka Module
This module allows simplified creation and management of Google Cloud Managed Kafka clusters, including topics, Kafka Connect clusters, and connectors.
## TOC
<!-- BEGIN TOC -->
- [TOC](#toc)
- [Simple Cluster Example](#simple-cluster-example)
- [Cluster with Topics](#cluster-with-topics)
- [Cluster with Kafka Connect](#cluster-with-kafka-connect)
- [Variables](#variables)
- [Outputs](#outputs)
<!-- END TOC -->
## Simple Cluster Example
This example creates a basic Managed Kafka cluster.
```hcl
module "kafka-cluster" {
source = "./fabric/modules/managed-kafka"
project_id = var.project_id
location = var.regions.primary
cluster_id = "my-kafka-cluster"
capacity_config = {
vcpu_count = 3
memory_bytes = 3221225472 # 3 GiB
}
subnets = [
var.subnets.primary.id
]
labels = {
environment = "development"
}
}
# tftest modules=1 resources=1 inventory=simple.yaml
```
## Cluster with Topics
This example creates a Managed Kafka cluster along with predefined topics.
```hcl
module "kafka-cluster-with-topics" {
source = "./fabric/modules/managed-kafka"
project_id = var.project_id
location = "europe-west1"
cluster_id = "my-kafka-cluster-topics"
capacity_config = {
vcpu_count = 6
memory_bytes = 6442450944 # 6 GiB
}
subnets = [var.subnets.primary.id]
topics = {
topic-a = {
partition_count = 3
replication_factor = 3
configs = {
"cleanup.policy" = "delete"
}
}
topic-b = {
partition_count = 6
replication_factor = 3
}
}
}
# tftest modules=1 resources=3 inventory=topics.yaml
```
## Cluster with Kafka Connect
This example demonstrates creating a Kafka cluster, a Kafka Connect cluster, and a connector. Note that Connect resources require the `google-beta` provider.
```hcl
module "gcs" {
source = "./fabric/modules/gcs"
project_id = var.project_id
location = var.region
name = "gmk-sink"
prefix = var.prefix
}
module "vpc" {
source = "./fabric/modules/net-vpc"
project_id = var.project_id
name = "vpc"
subnets = [
{
ip_cidr_range = "10.0.0.0/20"
name = "subnet1"
region = var.region
},
{
ip_cidr_range = "10.0.16.0/20"
name = "subnet2"
region = var.region
},
{
ip_cidr_range = "10.0.32.0/20"
name = "subnet3"
region = var.region
},
]
}
module "kafka-cluster-with-connect" {
source = "./fabric/modules/managed-kafka"
project_id = var.project_id
location = var.region
cluster_id = "my-kafka-cluster-connect"
capacity_config = {
vcpu_count = 3
memory_bytes = 3221225472 # 3 GiB
}
subnets = [
module.vpc.subnet_ids["${var.region}/subnet1"]
]
connect_clusters = {
my-connect-cluster = {
vcpu_count = 3
memory_bytes = 3221225472 # 3 GiB
primary_subnet = module.vpc.subnet_ids["${var.region}/subnet1"]
additional_subnets = [
module.vpc.subnet_ids["${var.region}/subnet2"],
module.vpc.subnet_ids["${var.region}/subnet3"]
]
}
}
connect_connectors = {
my-gcs-connector = {
connect_cluster = "my-connect-cluster"
configs = {
"connector.class" = "io.aiven.kafka.connect.gcs.GcsSinkConnector"
"file.name.prefix" = ""
"format.output.type" = "json"
"gcs.bucket.name" = module.gcs.name
"gcs.credentials.default" = "true"
"key.converter" = "org.apache.kafka.connect.storage.StringConverter"
"tasks.max" = "3"
"topics" = "topic1"
"value.converter" = "org.apache.kafka.connect.json.JsonConverter"
"value.converter.schemas.enable" = "false"
}
task_restart_policy = {
minimum_backoff = "60s"
maximum_backoff = "300s"
}
}
}
}
# tftest modules=3 resources=11 inventory=connect.yaml
```
<!-- BEGIN TFDOC -->
## Variables
| name | description | type | required | default |
|---|---|:---:|:---:|:---:|
| [capacity_config](variables.tf#L17) | Capacity configuration for the Kafka cluster. | <code title="object&#40;&#123;&#10; vcpu_count &#61; number&#10; memory_bytes &#61; number&#10;&#125;&#41;">object&#40;&#123;&#8230;&#125;&#41;</code> | ✓ | |
| [cluster_id](variables.tf#L25) | The ID of the Kafka cluster. | <code>string</code> | ✓ | |
| [location](variables.tf#L79) | The GCP region for the Kafka cluster. | <code>string</code> | ✓ | |
| [project_id](variables.tf#L84) | The ID of the project where the Kafka cluster will be created. | <code>string</code> | ✓ | |
| [subnets](variables.tf#L95) | List of VPC subnets for the Kafka cluster network configuration. | <code>list&#40;string&#41;</code> | ✓ | |
| [connect_clusters](variables.tf#L30) | Map of Kafka Connect cluster configurations to create. | <code title="map&#40;object&#40;&#123;&#10; vcpu_count &#61; number&#10; memory_bytes &#61; number&#10; primary_subnet &#61; string&#10; project_id &#61; optional&#40;string&#41;&#10; location &#61; optional&#40;string&#41;&#10; additional_subnets &#61; optional&#40;list&#40;string&#41;&#41;&#10; dns_domain_names &#61; optional&#40;list&#40;string&#41;&#41;&#10; labels &#61; optional&#40;map&#40;string&#41;&#41;&#10;&#125;&#41;&#41;">map&#40;object&#40;&#123;&#8230;&#125;&#41;&#41;</code> | | <code>&#123;&#125;</code> |
| [connect_connectors](variables.tf#L46) | Map of Kafka Connect Connectors to create. | <code title="map&#40;object&#40;&#123;&#10; connect_cluster &#61; string&#10; configs &#61; map&#40;string&#41;&#10; task_restart_policy &#61; optional&#40;object&#40;&#123;&#10; minimum_backoff &#61; optional&#40;string&#41;&#10; maximum_backoff &#61; optional&#40;string&#41;&#10; &#125;&#41;&#41;&#10;&#125;&#41;&#41;">map&#40;object&#40;&#123;&#8230;&#125;&#41;&#41;</code> | | <code>&#123;&#125;</code> |
| [kms_key](variables.tf#L67) | Customer-managed encryption key (CMEK) used for the Kafka cluster. | <code>string</code> | | <code>null</code> |
| [labels](variables.tf#L73) | Labels to apply to the Kafka cluster. | <code>map&#40;string&#41;</code> | | <code>null</code> |
| [rebalance_mode](variables.tf#L89) | Rebalancing mode for the Kafka cluster. | <code>string</code> | | <code>null</code> |
| [topics](variables.tf#L100) | Map of Kafka topics to create within the cluster. | <code title="map&#40;object&#40;&#123;&#10; replication_factor &#61; number&#10; partition_count &#61; number&#10; configs &#61; optional&#40;map&#40;string&#41;&#41;&#10;&#125;&#41;&#41;">map&#40;object&#40;&#123;&#8230;&#125;&#41;&#41;</code> | | <code>&#123;&#125;</code> |
## Outputs
| name | description | sensitive |
|---|---|:---:|
| [connect_cluster_ids](outputs.tf#L17) | Map of Kafka Connect cluster IDs. | |
| [connect_connectors](outputs.tf#L25) | Map of Kafka Connect Connector IDs. | |
| [id](outputs.tf#L33) | The ID of the Managed Kafka cluster. | |
| [topic_ids](outputs.tf#L38) | Map of Kafka topic IDs. | |
<!-- END TFDOC -->

View File

@@ -0,0 +1,95 @@
/**
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
resource "google_managed_kafka_cluster" "cluster" {
project = var.project_id
cluster_id = var.cluster_id
location = var.location
labels = var.labels
capacity_config {
vcpu_count = var.capacity_config.vcpu_count
memory_bytes = var.capacity_config.memory_bytes
}
gcp_config {
access_config {
dynamic "network_configs" {
for_each = var.subnets
iterator = subnet
content {
subnet = subnet.value
}
}
}
kms_key = var.kms_key
}
dynamic "rebalance_config" {
for_each = var.rebalance_mode == null ? [] : [""]
content {
mode = var.rebalance_config.mode
}
}
}
resource "google_managed_kafka_topic" "topics" {
for_each = var.topics
project = var.project_id
topic_id = each.key
cluster = google_managed_kafka_cluster.cluster.cluster_id
location = var.location
partition_count = each.value.partition_count
replication_factor = each.value.replication_factor
configs = each.value.configs
}
resource "google_managed_kafka_connect_cluster" "connect_clusters" {
provider = google-beta
for_each = var.connect_clusters
kafka_cluster = google_managed_kafka_cluster.cluster.id
connect_cluster_id = each.key
project = coalesce(each.value.project_id, var.project_id)
location = coalesce(each.value.location, var.location)
capacity_config {
vcpu_count = each.value.vcpu_count
memory_bytes = each.value.memory_bytes
}
gcp_config {
access_config {
network_configs {
primary_subnet = each.value.primary_subnet
additional_subnets = each.value.additional_subnets
dns_domain_names = each.value.dns_domain_names
}
}
}
labels = each.value.labels
}
resource "google_managed_kafka_connector" "connectors" {
provider = google-beta
for_each = var.connect_connectors
connector_id = each.key
project = google_managed_kafka_connect_cluster.connect_clusters[each.value.connect_cluster].project
connect_cluster = google_managed_kafka_connect_cluster.connect_clusters[each.value.connect_cluster].connect_cluster_id
location = google_managed_kafka_connect_cluster.connect_clusters[each.value.connect_cluster].location
configs = merge(each.value.configs, { name = each.key })
dynamic "task_restart_policy" {
for_each = each.value.task_restart_policy == null ? [] : [""]
content {
minimum_backoff = each.value.task_restart_policy.minimum_backoff
maximum_backoff = each.value.task_restart_policy.maximum_backoff
}
}
}

View File

@@ -0,0 +1,44 @@
/**
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
output "connect_cluster_ids" {
description = "Map of Kafka Connect cluster IDs."
value = {
for k, v in google_managed_kafka_connect_cluster.connect_clusters :
k => v.id
}
}
output "connect_connectors" {
description = "Map of Kafka Connect Connector IDs."
value = {
for k, v in google_managed_kafka_connector.connectors :
k => v.id
}
}
output "id" {
description = "The ID of the Managed Kafka cluster."
value = google_managed_kafka_cluster.cluster.id
}
output "topic_ids" {
description = "Map of Kafka topic IDs."
value = {
for k, v in google_managed_kafka_topic.topics :
k => v.id
}
}

View File

@@ -0,0 +1,109 @@
/**
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
variable "capacity_config" {
description = "Capacity configuration for the Kafka cluster."
type = object({
vcpu_count = number
memory_bytes = number
})
}
variable "cluster_id" {
description = "The ID of the Kafka cluster."
type = string
}
variable "connect_clusters" {
description = "Map of Kafka Connect cluster configurations to create."
type = map(object({
vcpu_count = number
memory_bytes = number
primary_subnet = string
project_id = optional(string)
location = optional(string)
additional_subnets = optional(list(string))
dns_domain_names = optional(list(string))
labels = optional(map(string))
}))
default = {}
nullable = false
}
variable "connect_connectors" {
description = "Map of Kafka Connect Connectors to create."
type = map(object({
connect_cluster = string
configs = map(string)
task_restart_policy = optional(object({
minimum_backoff = optional(string)
maximum_backoff = optional(string)
}))
}))
default = {}
nullable = false
validation {
condition = alltrue([
for k, v in var.connect_connectors :
contains(keys(var.connect_clusters), v.connect_cluster)
])
error_message = "The connect_cluster attribute of all connectors must be one of the keys in the connect_clusters variable."
}
}
variable "kms_key" {
description = "Customer-managed encryption key (CMEK) used for the Kafka cluster."
type = string
default = null
}
variable "labels" {
description = "Labels to apply to the Kafka cluster."
type = map(string)
default = null
}
variable "location" {
description = "The GCP region for the Kafka cluster."
type = string
}
variable "project_id" {
description = "The ID of the project where the Kafka cluster will be created."
type = string
}
variable "rebalance_mode" {
description = "Rebalancing mode for the Kafka cluster."
type = string
default = null
}
variable "subnets" {
description = "List of VPC subnets for the Kafka cluster network configuration."
type = list(string)
}
variable "topics" {
description = "Map of Kafka topics to create within the cluster."
type = map(object({
replication_factor = number
partition_count = number
configs = optional(map(string))
}))
default = {}
nullable = false
}

35
modules/managed-kafka/versions.tf generated Normal file
View File

@@ -0,0 +1,35 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Fabric release: v39.0.0
terraform {
required_version = ">= 1.10.2"
required_providers {
google = {
source = "hashicorp/google"
version = ">= 6.28.0, < 7.0.0" # tftest
}
google-beta = {
source = "hashicorp/google-beta"
version = ">= 6.28.0, < 7.0.0" # tftest
}
}
provider_meta "google" {
module_name = "google-pso-tool/cloud-foundation-fabric/modules/managed-kafka:v39.0.0-tf"
}
provider_meta "google-beta" {
module_name = "google-pso-tool/cloud-foundation-fabric/modules/managed-kafka:v39.0.0-tf"
}
}

35
modules/managed-kafka/versions.tofu generated Normal file
View File

@@ -0,0 +1,35 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Fabric release: v39.0.0
terraform {
required_version = ">= 1.9.0"
required_providers {
google = {
source = "hashicorp/google"
version = ">= 6.28.0, < 7.0.0" # tftest
}
google-beta = {
source = "hashicorp/google-beta"
version = ">= 6.28.0, < 7.0.0" # tftest
}
}
provider_meta "google" {
module_name = "google-pso-tool/cloud-foundation-fabric/modules/managed-kafka:v39.0.0-tofu"
}
provider_meta "google-beta" {
module_name = "google-pso-tool/cloud-foundation-fabric/modules/managed-kafka:v39.0.0-tofu"
}
}