From 2052f73e2a4adca591dd008a7a8bba75a6625c35 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Tue, 5 May 2026 11:18:24 +0530 Subject: [PATCH 1/7] Basic TLS/SASL --- event-gateway/.env.example | 7 + event-gateway/docker-compose.dev.yaml | 106 ++++-- event-gateway/docker/kafka/generate-certs.sh | 95 ++++++ .../gateway-runtime/cmd/event-gateway/main.go | 10 +- .../cmd/event-gateway/plugins.go | 26 +- .../gateway-runtime/configs/config.toml | 12 + .../gateway-runtime/internal/config/config.go | 55 +++ .../connectors/brokerdriver/kafka/config.go | 317 ++++++++++++++++++ .../brokerdriver/kafka/config_test.go | 123 +++++++ .../connectors/brokerdriver/kafka/consumer.go | 20 +- .../connectors/brokerdriver/kafka/endpoint.go | 52 ++- .../brokerdriver/kafka/publisher.go | 7 +- .../connectors/brokerdriver/kafka/replayer.go | 127 ++++--- .../connectors/receiver/websub/connector.go | 15 +- .../receiver/websub/consumer_manager.go | 33 +- .../internal/connectors/types.go | 3 + .../internal/runtime/runtime_test.go | 2 +- .../internal/subscription/reconciler.go | 116 ++----- .../internal/subscription/sync.go | 65 +--- .../internal/xdsclient/handler.go | 20 +- 20 files changed, 940 insertions(+), 271 deletions(-) create mode 100644 event-gateway/docker/kafka/generate-certs.sh create mode 100644 event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go create mode 100644 event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go diff --git a/event-gateway/.env.example b/event-gateway/.env.example index e4383b612..61d6b79b2 100644 --- a/event-gateway/.env.example +++ b/event-gateway/.env.example @@ -7,3 +7,10 @@ GATEWAY_REGISTRATION_TOKEN=replace-with-real-gateway-registration-token # Optional: override the control plane host if needed locally. GATEWAY_CONTROLPLANE_HOST=connect.preview-dv.bijira.dev + +# Kafka client credentials for the secured local dev listener. +KAFKA_CLIENT_USERNAME=egw +KAFKA_CLIENT_PASSWORD=egw-pass +KAFKA_CONTROLLER_PASSWORD=controller-pass +KAFKA_INTER_BROKER_PASSWORD=broker-pass +KAFKA_CERT_PASSWORD=changeit diff --git a/event-gateway/docker-compose.dev.yaml b/event-gateway/docker-compose.dev.yaml index 6a4ed0f71..8d8f1daa6 100644 --- a/event-gateway/docker-compose.dev.yaml +++ b/event-gateway/docker-compose.dev.yaml @@ -69,6 +69,11 @@ services: - "9003:9003" # Metrics environment: - APIP_EGW_KAFKA_BROKERS=kafka:29092 + - APIP_EGW_KAFKA_TLS=true + - APIP_EGW_KAFKA_TLS_CA_FILE=/etc/event-gateway/kafka/ca.crt + - APIP_EGW_KAFKA_SASL_MECHANISM=plain + - APIP_EGW_KAFKA_SASL_USERNAME=${KAFKA_CLIENT_USERNAME:-egw} + - APIP_EGW_KAFKA_SASL_PASSWORD=${KAFKA_CLIENT_PASSWORD:-egw-pass} - APIP_EGW_SERVER_WEBSUB_ENABLED=true - APIP_EGW_SERVER_WEBSUB_HTTP_PORT=8080 - APIP_EGW_SERVER_WEBSUB_HTTPS_PORT=8443 @@ -86,40 +91,97 @@ services: - ../gateway/gateway-controller/listener-certs:/etc/event-gateway/tls:ro - ./gateway-runtime/configs/config.toml:/etc/event-gateway/config.toml:ro - ./gateway-runtime/configs/channels.yaml:/etc/event-gateway/channels.yaml:ro + - kafka-certs:/etc/event-gateway/kafka:ro depends_on: gateway-controller: condition: service_started kafka: - # condition: service_healthy condition: service_started networks: - egw-network + kafka-cert-init: + image: bitnamilegacy/kafka:4.0.0-debian-12-r10 + user: "0:0" + command: ["bash", "/scripts/generate-certs.sh"] + environment: + KAFKA_CERT_PASSWORD: ${KAFKA_CERT_PASSWORD:-changeit} + KAFKA_BROKER_HOST: kafka + volumes: + - ./docker/kafka/generate-certs.sh:/scripts/generate-certs.sh:ro + - kafka-certs:/certs + networks: + - egw-network + kafka: - image: apache/kafka:latest + image: bitnamilegacy/kafka:4.0.0-debian-12-r10 hostname: kafka ports: - - "9092:9092" - "29092:29092" environment: - KAFKA_NODE_ID: 1 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 - KAFKA_PROCESS_ROLES: broker,controller - KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093 - KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,PLAINTEXT_HOST://0.0.0.0:9092 - KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - # healthcheck: - # test: ["CMD", "/opt/kafka/bin/kafka-topics.sh", "--bootstrap-server", "kafka:29092", "--list"] - # interval: 2s - # timeout: 3s - # start_period: 5s - # retries: 15 + KAFKA_CFG_NODE_ID: 1 + KAFKA_CFG_PROCESS_ROLES: broker,controller + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_LISTENERS: SASL_SSL://:29092,CONTROLLER://:29093 + KAFKA_CFG_ADVERTISED_LISTENERS: SASL_SSL://kafka:29092 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093 + KAFKA_CFG_SASL_ENABLED_MECHANISMS: PLAIN + KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL: PLAIN + KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: SASL_SSL + KAFKA_CLIENT_LISTENER_NAME: SASL_SSL + KAFKA_CLIENT_USERS: ${KAFKA_CLIENT_USERNAME:-egw} + KAFKA_CLIENT_PASSWORDS: ${KAFKA_CLIENT_PASSWORD:-egw-pass} + KAFKA_CONTROLLER_USER: controller_user + KAFKA_CONTROLLER_PASSWORD: ${KAFKA_CONTROLLER_PASSWORD:-controller-pass} + KAFKA_INTER_BROKER_USER: broker + KAFKA_INTER_BROKER_PASSWORD: ${KAFKA_INTER_BROKER_PASSWORD:-broker-pass} + KAFKA_TLS_TYPE: JKS + KAFKA_TLS_CLIENT_AUTH: none + KAFKA_CERTIFICATE_PASSWORD: ${KAFKA_CERT_PASSWORD:-changeit} + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + volumes: + - kafka-certs:/opt/bitnami/kafka/config/certs:ro + - kafka-data:/bitnami/kafka + depends_on: + kafka-cert-init: + condition: service_completed_successfully + networks: + - egw-network + + wh-listener: + image: event-gateway/webhook-listener:local + build: + context: webhook-listener + dockerfile: Dockerfile + ports: + - "8090:8090" + networks: + - egw-network + + kafka-ui: + image: provectuslabs/kafka-ui:latest + ports: + - "7080:8080" + environment: + DYNAMIC_CONFIG_ENABLED: "true" + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 + KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_SSL + KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: PLAIN + KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="${KAFKA_CLIENT_USERNAME:-egw}" password="${KAFKA_CLIENT_PASSWORD:-egw-pass}"; + KAFKA_CLUSTERS_0_PROPERTIES_SSL_TRUSTSTORE_LOCATION: /etc/kafka-ui/certs/kafka.truststore.jks + KAFKA_CLUSTERS_0_PROPERTIES_SSL_TRUSTSTORE_PASSWORD: ${KAFKA_CERT_PASSWORD:-changeit} + KAFKA_CLUSTERS_0_PROPERTIES_SSL_TRUSTSTORE_TYPE: JKS + volumes: + - kafka-certs:/etc/kafka-ui/certs:ro + depends_on: + kafka: + condition: service_started networks: - egw-network @@ -129,3 +191,5 @@ networks: volumes: controller-data: + kafka-certs: + kafka-data: diff --git a/event-gateway/docker/kafka/generate-certs.sh b/event-gateway/docker/kafka/generate-certs.sh new file mode 100644 index 000000000..740a6943c --- /dev/null +++ b/event-gateway/docker/kafka/generate-certs.sh @@ -0,0 +1,95 @@ +#!/usr/bin/env bash + +set -euo pipefail + +cert_dir="${CERT_DIR:-/certs}" +password="${KAFKA_CERT_PASSWORD:-changeit}" +broker_host="${KAFKA_BROKER_HOST:-kafka}" + +mkdir -p "${cert_dir}" + +if [[ -f "${cert_dir}/kafka.keystore.jks" && -f "${cert_dir}/kafka.truststore.jks" && -f "${cert_dir}/ca.crt" ]]; then + echo "Kafka TLS assets already exist in ${cert_dir}" + exit 0 +fi + +rm -f \ + "${cert_dir}/ca.crt" \ + "${cert_dir}/ca.key" \ + "${cert_dir}/ca.srl" \ + "${cert_dir}/kafka.keystore.jks" \ + "${cert_dir}/kafka.truststore.jks" \ + "${cert_dir}/broker.csr" \ + "${cert_dir}/broker.crt" \ + "${cert_dir}/openssl-san.cnf" + +openssl req \ + -new \ + -x509 \ + -days 3650 \ + -subj "/CN=event-gateway-kafka-dev-ca" \ + -passout "pass:${password}" \ + -keyout "${cert_dir}/ca.key" \ + -out "${cert_dir}/ca.crt" + +keytool \ + -genkeypair \ + -alias "${broker_host}" \ + -keyalg RSA \ + -validity 3650 \ + -storetype JKS \ + -keystore "${cert_dir}/kafka.keystore.jks" \ + -storepass "${password}" \ + -keypass "${password}" \ + -dname "CN=${broker_host}, OU=Event Gateway, O=WSO2, L=Colombo, S=Western, C=LK" \ + -ext "SAN=DNS:${broker_host},DNS:localhost,IP:127.0.0.1" + +keytool \ + -certreq \ + -alias "${broker_host}" \ + -keystore "${cert_dir}/kafka.keystore.jks" \ + -storepass "${password}" \ + -file "${cert_dir}/broker.csr" + +cat > "${cert_dir}/openssl-san.cnf" < 0 { - brokers = parsed - } - case []string: - if len(v) > 0 { - brokers = v - } - } - } + connectionCfg, err := kafka.ResolveConnectionConfig(cfg.Kafka, brokerDriverCfg) + if err != nil { + return nil, err } - return kafka.NewBrokerDriver(brokers) + return kafka.NewBrokerDriver(connectionCfg) }) registry.RegisterReceiver("websub", func(ecfg connectors.ReceiverConfig) (connectors.Receiver, error) { @@ -73,7 +56,6 @@ func registerConnectors(registry *connectors.Registry, cfg *config.Config) { DeliveryConcurrency: cfg.WebSub.DeliveryConcurrency, RuntimeID: cfg.RuntimeID, ConsumerGroupPrefix: cfg.Kafka.ConsumerGroupPrefix, - Brokers: cfg.Kafka.Brokers, }) }) diff --git a/event-gateway/gateway-runtime/configs/config.toml b/event-gateway/gateway-runtime/configs/config.toml index d8567f98c..9830bf7e7 100644 --- a/event-gateway/gateway-runtime/configs/config.toml +++ b/event-gateway/gateway-runtime/configs/config.toml @@ -26,6 +26,18 @@ metrics_port = 9003 # Default Kafka brokers. Channels can override with broker-driver.config.brokers. brokers = ["localhost:9092"] consumer_group_prefix = "event-gateway" +tls = false +# Optional PEM CA file for self-signed or private Kafka CAs. +# tls_ca_file = "/etc/event-gateway/kafka/ca.crt" +# Optional client certificate and key for Kafka mTLS. +# tls_cert_file = "/etc/event-gateway/kafka/client.crt" +# tls_key_file = "/etc/event-gateway/kafka/client.key" +# Optional TLS server name override. +# tls_server_name = "kafka" +# Optional SASL settings. Supported mechanisms: plain, scram-sha-256, scram-sha-512. +# sasl_mechanism = "plain" +# sasl_username = "egw" +# sasl_password = "egw-pass" [websub] verification_timeout_seconds = 10 diff --git a/event-gateway/gateway-runtime/internal/config/config.go b/event-gateway/gateway-runtime/internal/config/config.go index 3864038b9..69e417b4b 100644 --- a/event-gateway/gateway-runtime/internal/config/config.go +++ b/event-gateway/gateway-runtime/internal/config/config.go @@ -60,6 +60,10 @@ type KafkaConfig struct { Brokers []string `koanf:"brokers"` ConsumerGroupPrefix string `koanf:"consumer_group_prefix"` TLS bool `koanf:"tls"` + TLSCAFile string `koanf:"tls_ca_file"` + TLSCertFile string `koanf:"tls_cert_file"` + TLSKeyFile string `koanf:"tls_key_file"` + TLSServerName string `koanf:"tls_server_name"` SASLMechanism string `koanf:"sasl_mechanism"` SASLUsername string `koanf:"sasl_username"` SASLPassword string `koanf:"sasl_password"` @@ -277,6 +281,57 @@ func validate(cfg *Config) error { return fmt.Errorf("logging.format must be one of text, json") } + if err := validateKafkaConfig(cfg.Kafka); err != nil { + return err + } + + return nil +} + +func validateKafkaConfig(kafkaCfg KafkaConfig) error { + if len(kafkaCfg.Brokers) == 0 { + return fmt.Errorf("kafka.brokers must contain at least one broker") + } + + if kafkaCfg.TLS { + if strings.TrimSpace(kafkaCfg.TLSCAFile) != "" { + if err := validateReadableFile(kafkaCfg.TLSCAFile, "kafka.tls_ca_file"); err != nil { + return err + } + } + certFile := strings.TrimSpace(kafkaCfg.TLSCertFile) + keyFile := strings.TrimSpace(kafkaCfg.TLSKeyFile) + if certFile != "" || keyFile != "" { + if certFile == "" || keyFile == "" { + return fmt.Errorf("kafka.tls_cert_file and kafka.tls_key_file must be configured together") + } + if err := validateReadableFile(certFile, "kafka.tls_cert_file"); err != nil { + return err + } + if err := validateReadableFile(keyFile, "kafka.tls_key_file"); err != nil { + return err + } + } + } else if strings.TrimSpace(kafkaCfg.TLSCAFile) != "" || strings.TrimSpace(kafkaCfg.TLSCertFile) != "" || strings.TrimSpace(kafkaCfg.TLSKeyFile) != "" || strings.TrimSpace(kafkaCfg.TLSServerName) != "" { + return fmt.Errorf("kafka TLS files or server name require kafka.tls=true") + } + + mechanism := strings.ToLower(strings.TrimSpace(kafkaCfg.SASLMechanism)) + switch mechanism { + case "", "plain", "scram-sha-256", "scram-sha-512": + default: + return fmt.Errorf("kafka.sasl_mechanism must be one of plain, scram-sha-256, scram-sha-512") + } + + if mechanism != "" { + if kafkaCfg.SASLUsername == "" { + return fmt.Errorf("kafka.sasl_username is required when kafka.sasl_mechanism is set") + } + if kafkaCfg.SASLPassword == "" { + return fmt.Errorf("kafka.sasl_password is required when kafka.sasl_mechanism is set") + } + } + return nil } diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go new file mode 100644 index 000000000..914fa6c85 --- /dev/null +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go @@ -0,0 +1,317 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package kafka + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "os" + "strings" + + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/sasl" + "github.com/twmb/franz-go/pkg/sasl/plain" + "github.com/twmb/franz-go/pkg/sasl/scram" + "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/config" +) + +// ConnectionConfig holds the Kafka connection settings used by the driver. +type ConnectionConfig struct { + Brokers []string + TLS bool + TLSCAFile string + TLSCertFile string + TLSKeyFile string + TLSServerName string + SASLMechanism string + SASLUsername string + SASLPassword string +} + +// ResolveConnectionConfig merges global runtime config with per-binding overrides. +func ResolveConnectionConfig(global config.KafkaConfig, overrides map[string]interface{}) (ConnectionConfig, error) { + cfg := ConnectionConfig{ + Brokers: append([]string(nil), global.Brokers...), + TLS: global.TLS, + TLSCAFile: global.TLSCAFile, + TLSCertFile: global.TLSCertFile, + TLSKeyFile: global.TLSKeyFile, + TLSServerName: global.TLSServerName, + SASLMechanism: global.SASLMechanism, + SASLUsername: global.SASLUsername, + SASLPassword: global.SASLPassword, + } + + if overrides != nil { + if brokers, ok, err := stringSliceOverride(overrides["brokers"]); err != nil { + return ConnectionConfig{}, err + } else if ok { + cfg.Brokers = brokers + } + if v, ok, err := boolOverride(overrides["tls"]); err != nil { + return ConnectionConfig{}, err + } else if ok { + cfg.TLS = v + } + if v, ok, err := stringOverride(overrides["tls_ca_file"]); err != nil { + return ConnectionConfig{}, err + } else if ok { + cfg.TLSCAFile = v + } + if v, ok, err := stringOverride(overrides["tls_cert_file"]); err != nil { + return ConnectionConfig{}, err + } else if ok { + cfg.TLSCertFile = v + } + if v, ok, err := stringOverride(overrides["tls_key_file"]); err != nil { + return ConnectionConfig{}, err + } else if ok { + cfg.TLSKeyFile = v + } + if v, ok, err := stringOverride(overrides["tls_server_name"]); err != nil { + return ConnectionConfig{}, err + } else if ok { + cfg.TLSServerName = v + } + if v, ok, err := stringOverride(overrides["sasl_mechanism"]); err != nil { + return ConnectionConfig{}, err + } else if ok { + cfg.SASLMechanism = v + } + if v, ok, err := stringOverride(overrides["sasl_username"]); err != nil { + return ConnectionConfig{}, err + } else if ok { + cfg.SASLUsername = v + } + if v, ok, err := stringOverride(overrides["sasl_password"]); err != nil { + return ConnectionConfig{}, err + } else if ok { + cfg.SASLPassword = v + } + } + + normalizeConnectionConfig(&cfg) + if err := validateConnectionConfig(cfg); err != nil { + return ConnectionConfig{}, err + } + return cfg, nil +} + +func normalizeConnectionConfig(cfg *ConnectionConfig) { + normalizedBrokers := make([]string, 0, len(cfg.Brokers)) + for _, broker := range cfg.Brokers { + trimmed := strings.TrimSpace(broker) + if trimmed == "" { + continue + } + normalizedBrokers = append(normalizedBrokers, trimmed) + } + cfg.Brokers = normalizedBrokers + cfg.SASLMechanism = strings.ToLower(strings.TrimSpace(cfg.SASLMechanism)) + cfg.TLSCAFile = strings.TrimSpace(cfg.TLSCAFile) + cfg.TLSCertFile = strings.TrimSpace(cfg.TLSCertFile) + cfg.TLSKeyFile = strings.TrimSpace(cfg.TLSKeyFile) + cfg.TLSServerName = strings.TrimSpace(cfg.TLSServerName) +} + +func validateConnectionConfig(cfg ConnectionConfig) error { + if len(cfg.Brokers) == 0 { + return fmt.Errorf("kafka brokers must not be empty") + } + + if !cfg.TLS { + if cfg.TLSCAFile != "" || cfg.TLSCertFile != "" || cfg.TLSKeyFile != "" || cfg.TLSServerName != "" { + return fmt.Errorf("kafka TLS files or server name require kafka.tls=true") + } + } + + if cfg.TLS { + if cfg.TLSCAFile != "" { + if err := validateReadableFile(cfg.TLSCAFile, "kafka.tls_ca_file"); err != nil { + return err + } + } + if cfg.TLSCertFile != "" || cfg.TLSKeyFile != "" { + if cfg.TLSCertFile == "" || cfg.TLSKeyFile == "" { + return fmt.Errorf("kafka.tls_cert_file and kafka.tls_key_file must be configured together") + } + if err := validateReadableFile(cfg.TLSCertFile, "kafka.tls_cert_file"); err != nil { + return err + } + if err := validateReadableFile(cfg.TLSKeyFile, "kafka.tls_key_file"); err != nil { + return err + } + } + } + + switch cfg.SASLMechanism { + case "", "plain", "scram-sha-256", "scram-sha-512": + default: + return fmt.Errorf("unsupported kafka sasl mechanism %q", cfg.SASLMechanism) + } + + if cfg.SASLMechanism != "" { + if cfg.SASLUsername == "" { + return fmt.Errorf("kafka.sasl_username is required when kafka.sasl_mechanism is set") + } + if cfg.SASLPassword == "" { + return fmt.Errorf("kafka.sasl_password is required when kafka.sasl_mechanism is set") + } + } + + return nil +} + +// BuildClientOptions returns franz-go client options for the Kafka connection. +func BuildClientOptions(cfg ConnectionConfig, extraOpts ...kgo.Opt) ([]kgo.Opt, error) { + opts := []kgo.Opt{kgo.SeedBrokers(cfg.Brokers...)} + + if cfg.TLS { + tlsCfg, err := buildTLSConfig(cfg) + if err != nil { + return nil, err + } + opts = append(opts, kgo.DialTLSConfig(tlsCfg)) + } + + if cfg.SASLMechanism != "" { + mechanism, err := buildSASLMechanism(cfg) + if err != nil { + return nil, err + } + opts = append(opts, kgo.SASL(mechanism)) + } + + return append(opts, extraOpts...), nil +} + +func buildTLSConfig(cfg ConnectionConfig) (*tls.Config, error) { + tlsCfg := &tls.Config{ + MinVersion: tls.VersionTLS12, + ServerName: cfg.TLSServerName, + } + + if cfg.TLSCAFile != "" { + caPEM, err := os.ReadFile(cfg.TLSCAFile) + if err != nil { + return nil, fmt.Errorf("failed to read kafka TLS CA file: %w", err) + } + pool := x509.NewCertPool() + if !pool.AppendCertsFromPEM(caPEM) { + return nil, fmt.Errorf("failed to parse kafka TLS CA file %q", cfg.TLSCAFile) + } + tlsCfg.RootCAs = pool + } + + if cfg.TLSCertFile != "" && cfg.TLSKeyFile != "" { + cert, err := tls.LoadX509KeyPair(cfg.TLSCertFile, cfg.TLSKeyFile) + if err != nil { + return nil, fmt.Errorf("failed to load kafka client certificate: %w", err) + } + tlsCfg.Certificates = []tls.Certificate{cert} + } + + return tlsCfg, nil +} + +func buildSASLMechanism(cfg ConnectionConfig) (sasl.Mechanism, error) { + switch cfg.SASLMechanism { + case "plain": + return plain.Auth{ + User: cfg.SASLUsername, + Pass: cfg.SASLPassword, + }.AsMechanism(), nil + case "scram-sha-256": + return scram.Auth{ + User: cfg.SASLUsername, + Pass: cfg.SASLPassword, + }.AsSha256Mechanism(), nil + case "scram-sha-512": + return scram.Auth{ + User: cfg.SASLUsername, + Pass: cfg.SASLPassword, + }.AsSha512Mechanism(), nil + default: + return nil, fmt.Errorf("unsupported kafka sasl mechanism %q", cfg.SASLMechanism) + } +} + +func boolOverride(value interface{}) (bool, bool, error) { + if value == nil { + return false, false, nil + } + v, ok := value.(bool) + if !ok { + return false, false, fmt.Errorf("expected boolean Kafka config override, got %T", value) + } + return v, true, nil +} + +func stringOverride(value interface{}) (string, bool, error) { + if value == nil { + return "", false, nil + } + v, ok := value.(string) + if !ok { + return "", false, fmt.Errorf("expected string Kafka config override, got %T", value) + } + return v, true, nil +} + +func stringSliceOverride(value interface{}) ([]string, bool, error) { + if value == nil { + return nil, false, nil + } + + switch v := value.(type) { + case []string: + return append([]string(nil), v...), true, nil + case []interface{}: + out := make([]string, 0, len(v)) + for _, item := range v { + str, ok := item.(string) + if !ok { + return nil, false, fmt.Errorf("expected string broker entry, got %T", item) + } + out = append(out, str) + } + return out, true, nil + default: + return nil, false, fmt.Errorf("expected string slice Kafka config override, got %T", value) + } +} + +func validateReadableFile(filePath, fieldName string) error { + info, err := os.Stat(filePath) + if err != nil { + if os.IsNotExist(err) { + return fmt.Errorf("%s file %q does not exist", fieldName, filePath) + } + return fmt.Errorf("failed to access %s file %q: %w", fieldName, filePath, err) + } + if info.IsDir() { + return fmt.Errorf("%s path %q must be a file, not a directory", fieldName, filePath) + } + fileHandle, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("%s file %q is not readable: %w", fieldName, filePath, err) + } + return fileHandle.Close() +} diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go new file mode 100644 index 000000000..3a46a34f7 --- /dev/null +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go @@ -0,0 +1,123 @@ +package kafka + +import ( + "os" + "path/filepath" + "reflect" + "testing" + + runtimeconfig "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/config" +) + +func TestResolveConnectionConfig_MergesGlobalAndOverrides(t *testing.T) { + tempDir := t.TempDir() + caPath := filepath.Join(tempDir, "ca.crt") + if err := os.WriteFile(caPath, []byte("ca"), 0o644); err != nil { + t.Fatalf("write ca: %v", err) + } + + global := runtimeconfig.KafkaConfig{ + Brokers: []string{"broker-1:9092"}, + TLS: true, + TLSCAFile: caPath, + TLSServerName: "global-kafka", + SASLMechanism: "plain", + SASLUsername: "global-user", + SASLPassword: "global-pass", + } + + resolved, err := ResolveConnectionConfig(global, map[string]interface{}{ + "brokers": []interface{}{"broker-2:9093", " broker-3:9094 "}, + "tls_server_name": "binding-kafka", + "sasl_username": "binding-user", + }) + if err != nil { + t.Fatalf("ResolveConnectionConfig returned error: %v", err) + } + + wantBrokers := []string{"broker-2:9093", "broker-3:9094"} + if !reflect.DeepEqual(resolved.Brokers, wantBrokers) { + t.Fatalf("expected brokers %v, got %v", wantBrokers, resolved.Brokers) + } + if !resolved.TLS { + t.Fatalf("expected TLS to remain enabled") + } + if resolved.TLSCAFile != caPath { + t.Fatalf("expected global CA file to be preserved, got %q", resolved.TLSCAFile) + } + if resolved.TLSServerName != "binding-kafka" { + t.Fatalf("expected binding TLS server name override, got %q", resolved.TLSServerName) + } + if resolved.SASLUsername != "binding-user" { + t.Fatalf("expected binding SASL username override, got %q", resolved.SASLUsername) + } + if resolved.SASLPassword != "global-pass" { + t.Fatalf("expected global SASL password fallback, got %q", resolved.SASLPassword) + } +} + +func TestResolveConnectionConfig_PreservesOpaqueCredentials(t *testing.T) { + resolved, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{}, map[string]interface{}{ + "brokers": []interface{}{"broker:9092"}, + "sasl_mechanism": "plain", + "sasl_username": " user-with-spaces ", + "sasl_password": " secret-with-spaces ", + }) + if err != nil { + t.Fatalf("ResolveConnectionConfig returned error: %v", err) + } + + if resolved.SASLUsername != " user-with-spaces " { + t.Fatalf("expected username to be preserved verbatim, got %q", resolved.SASLUsername) + } + if resolved.SASLPassword != " secret-with-spaces " { + t.Fatalf("expected password to be preserved verbatim, got %q", resolved.SASLPassword) + } +} + +func TestResolveConnectionConfig_RequiresTLSWhenTLSFilesAreConfigured(t *testing.T) { + _, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{ + Brokers: []string{"broker:9092"}, + TLSCAFile: "/tmp/ca.crt", + }, nil) + if err == nil { + t.Fatalf("expected error when TLS files are set with TLS disabled") + } +} + +func TestResolveConnectionConfig_ValidatesReadableTLSFiles(t *testing.T) { + tempDir := t.TempDir() + caPath := filepath.Join(tempDir, "ca.crt") + if err := os.WriteFile(caPath, []byte("ca"), 0o644); err != nil { + t.Fatalf("write ca: %v", err) + } + + _, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{ + Brokers: []string{"broker:9092"}, + TLS: true, + TLSCAFile: caPath, + }, nil) + if err != nil { + t.Fatalf("expected readable CA file to validate, got %v", err) + } + + _, err = ResolveConnectionConfig(runtimeconfig.KafkaConfig{ + Brokers: []string{"broker:9092"}, + TLS: true, + TLSCAFile: filepath.Join(tempDir, "missing.crt"), + }, nil) + if err == nil { + t.Fatalf("expected missing CA file to fail validation") + } +} + +func TestResolveConnectionConfig_RequiresSASLCredentials(t *testing.T) { + _, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{}, map[string]interface{}{ + "brokers": []interface{}{"broker:9092"}, + "sasl_mechanism": "scram-sha-512", + "sasl_username": "user", + }) + if err == nil { + t.Fatalf("expected missing SASL password to fail validation") + } +} diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.go index 7430a8b48..f1ae471b9 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/consumer.go @@ -38,13 +38,17 @@ type Consumer struct { } // NewConsumer creates a new shared-group Kafka consumer. -func NewConsumer(brokers []string, groupID string, topics []string, handler connectors.MessageHandler) (*Consumer, error) { - client, err := kgo.NewClient( - kgo.SeedBrokers(brokers...), +func NewConsumer(cfg ConnectionConfig, groupID string, topics []string, handler connectors.MessageHandler) (*Consumer, error) { + opts, err := BuildClientOptions( + cfg, kgo.ConsumerGroup(groupID), kgo.ConsumeTopics(topics...), kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()), ) + if err != nil { + return nil, fmt.Errorf("failed to build kafka consumer options: %w", err) + } + client, err := kgo.NewClient(opts...) if err != nil { return nil, fmt.Errorf("failed to create kafka consumer: %w", err) } @@ -116,14 +120,18 @@ type ManualCommitConsumer struct { } // NewManualCommitConsumer creates a consumer with manual offset commit. -func NewManualCommitConsumer(brokers []string, groupID string, topics []string, handler connectors.MessageHandler) (*ManualCommitConsumer, error) { - client, err := kgo.NewClient( - kgo.SeedBrokers(brokers...), +func NewManualCommitConsumer(cfg ConnectionConfig, groupID string, topics []string, handler connectors.MessageHandler) (*ManualCommitConsumer, error) { + opts, err := BuildClientOptions( + cfg, kgo.ConsumerGroup(groupID), kgo.ConsumeTopics(topics...), kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()), kgo.DisableAutoCommit(), ) + if err != nil { + return nil, fmt.Errorf("failed to build manual-commit kafka consumer options: %w", err) + } + client, err := kgo.NewClient(opts...) if err != nil { return nil, fmt.Errorf("failed to create manual-commit kafka consumer: %w", err) } diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go index 3a30a38ee..0195ea892 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go @@ -33,19 +33,28 @@ import ( // It owns a shared publisher and creates consumers on demand. type KafkaBrokerDriver struct { publisher *Publisher - brokers []string + cfg ConnectionConfig admin *kadm.Client adminKgo *kgo.Client } -// NewBrokerDriver creates a Kafka broker-driver backed by the given brokers. -func NewBrokerDriver(brokers []string) (*KafkaBrokerDriver, error) { - pub, err := NewPublisher(brokers) +// NewClient creates a franz-go client using the shared Kafka connection config. +func NewClient(cfg ConnectionConfig, extraOpts ...kgo.Opt) (*kgo.Client, error) { + opts, err := BuildClientOptions(cfg, extraOpts...) + if err != nil { + return nil, err + } + return kgo.NewClient(opts...) +} + +// NewBrokerDriver creates a Kafka broker-driver backed by the given connection config. +func NewBrokerDriver(cfg ConnectionConfig) (*KafkaBrokerDriver, error) { + pub, err := NewPublisher(cfg) if err != nil { return nil, fmt.Errorf("failed to create kafka publisher: %w", err) } - adminKgo, err := kgo.NewClient(kgo.SeedBrokers(brokers...)) + adminKgo, err := NewClient(cfg) if err != nil { pub.Close() return nil, fmt.Errorf("failed to create kafka admin client: %w", err) @@ -53,7 +62,7 @@ func NewBrokerDriver(brokers []string) (*KafkaBrokerDriver, error) { return &KafkaBrokerDriver{ publisher: pub, - brokers: brokers, + cfg: cfg, admin: kadm.NewClient(adminKgo), adminKgo: adminKgo, }, nil @@ -67,7 +76,17 @@ func (e *KafkaBrokerDriver) Publish(ctx context.Context, topic string, msg *conn // Subscribe creates a consumer for the given topics using a shared consumer group. // The returned Receiver must be Start()ed by the caller. func (e *KafkaBrokerDriver) Subscribe(groupID string, topics []string, handler connectors.MessageHandler) (connectors.Receiver, error) { - return NewConsumer(e.brokers, groupID, topics, handler) + return NewConsumer(e.cfg, groupID, topics, handler) +} + +// SubscribeManual creates a consumer with manual offset commits for the given topics. +func (e *KafkaBrokerDriver) SubscribeManual(groupID string, topics []string, handler connectors.MessageHandler) (connectors.Receiver, error) { + return NewManualCommitConsumer(e.cfg, groupID, topics, handler) +} + +// Replay replays all records from the start of a compacted topic until caught up. +func (e *KafkaBrokerDriver) Replay(ctx context.Context, topic string, handler connectors.MessageHandler) error { + return ReplayTopic(ctx, e.cfg, topic, handler) } // TopicExists checks whether a topic exists in the Kafka cluster. @@ -102,6 +121,25 @@ func (e *KafkaBrokerDriver) EnsureTopics(ctx context.Context, topics []string) e return nil } +// EnsureCompactedTopic creates a compacted topic if it does not already exist. +func (e *KafkaBrokerDriver) EnsureCompactedTopic(ctx context.Context, topic string) error { + resp, err := e.admin.CreateTopics(ctx, 1, 1, map[string]*string{ + "cleanup.policy": kadm.StringPtr("compact"), + }, topic) + if err != nil { + return fmt.Errorf("failed to create compacted topic %s: %w", topic, err) + } + for _, t := range resp.Sorted() { + if t.Err != nil { + if isTopicAlreadyExistsErr(t.Err) { + return nil + } + return fmt.Errorf("failed to create compacted topic %s: %w", t.Topic, t.Err) + } + } + return nil +} + // isTopicAlreadyExistsErr checks if the error indicates the topic already exists. func isTopicAlreadyExistsErr(err error) bool { if err == nil { diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/publisher.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/publisher.go index 29b85e04a..b6972e7b0 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/publisher.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/publisher.go @@ -32,8 +32,11 @@ type Publisher struct { } // NewPublisher creates a new Kafka publisher. -func NewPublisher(brokers []string, opts ...kgo.Opt) (*Publisher, error) { - allOpts := append([]kgo.Opt{kgo.SeedBrokers(brokers...)}, opts...) +func NewPublisher(cfg ConnectionConfig, opts ...kgo.Opt) (*Publisher, error) { + allOpts, err := BuildClientOptions(cfg, opts...) + if err != nil { + return nil, fmt.Errorf("failed to build kafka publisher options: %w", err) + } client, err := kgo.NewClient(allOpts...) if err != nil { return nil, fmt.Errorf("failed to create kafka publisher: %w", err) diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/replayer.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/replayer.go index 86517c7aa..597570b30 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/replayer.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/replayer.go @@ -22,79 +22,120 @@ import ( "context" "fmt" "log/slog" - "sync" + "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/connectors" ) -// Replayer consumes from a compacted topic from offset 0, replaying all state. -// Each runtime uses its own consumer identity (NOT shared-group). -type Replayer struct { - client *kgo.Client - handler connectors.MessageHandler - cancel context.CancelFunc - wg sync.WaitGroup +type replayPartitionKey struct { + topic string + partition int32 } -// NewReplayer creates a new replayer for a compacted topic. -func NewReplayer(brokers []string, topic string, handler connectors.MessageHandler) (*Replayer, error) { - client, err := kgo.NewClient( - kgo.SeedBrokers(brokers...), +// ReplayTopic consumes a compacted topic from offset 0 until every partition +// reaches the end offset captured at the start of replay. +func ReplayTopic(ctx context.Context, cfg ConnectionConfig, topic string, handler connectors.MessageHandler) error { + adminClient, err := NewClient(cfg) + if err != nil { + return fmt.Errorf("failed to create kafka replay admin client: %w", err) + } + admin := kadm.NewClient(adminClient) + endOffsets, err := admin.ListEndOffsets(ctx, topic) + adminClient.Close() + if err != nil { + return fmt.Errorf("failed to list replay end offsets for topic %s: %w", topic, err) + } + + targetOffsets := make(map[replayPartitionKey]int64) + completedPartitions := make(map[replayPartitionKey]bool) + var offsetErr error + endOffsets.Each(func(o kadm.ListedOffset) { + if offsetErr != nil { + return + } + if o.Err != nil { + offsetErr = fmt.Errorf("failed to inspect replay offset for topic %s partition %d: %w", o.Topic, o.Partition, o.Err) + return + } + key := replayPartitionKey{topic: o.Topic, partition: o.Partition} + targetOffsets[key] = o.Offset + completedPartitions[key] = o.Offset == 0 + }) + if offsetErr != nil { + return offsetErr + } + if len(targetOffsets) == 0 || replayComplete(completedPartitions) { + return nil + } + + opts, err := BuildClientOptions( + cfg, kgo.ConsumeTopics(topic), kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), ) if err != nil { - return nil, fmt.Errorf("failed to create kafka replayer: %w", err) + return fmt.Errorf("failed to build kafka replay consumer options: %w", err) } - - return &Replayer{ - client: client, - handler: handler, - }, nil -} - -// Start begins consuming and replaying events. -func (r *Replayer) Start(ctx context.Context) { - ctx, r.cancel = context.WithCancel(ctx) - r.wg.Add(1) - go func() { - defer r.wg.Done() - r.consumeLoop(ctx) - }() -} - -// Stop stops the replayer. -func (r *Replayer) Stop() { - if r.cancel != nil { - r.cancel() + client, err := kgo.NewClient(opts...) + if err != nil { + return fmt.Errorf("failed to create kafka replay consumer: %w", err) } - r.wg.Wait() - r.client.Close() -} - -func (r *Replayer) consumeLoop(ctx context.Context) { + defer client.Close() for { - fetches := r.client.PollFetches(ctx) + fetches := client.PollFetches(ctx) if ctx.Err() != nil { - return + return ctx.Err() } if errs := fetches.Errors(); len(errs) > 0 { for _, e := range errs { slog.Error("Kafka replayer fetch error", "topic", e.Topic, "partition", e.Partition, "error", e.Err) } + return fmt.Errorf("fetch errors during replay for topic %s", topic) } + var handlerErr error fetches.EachRecord(func(record *kgo.Record) { - msg := recordToMessage(record) - if err := r.handler(ctx, msg); err != nil { + if handlerErr != nil { + return + } + partitionKey := replayPartitionKey{ + topic: record.Topic, + partition: record.Partition, + } + if completedPartitions[partitionKey] { + return + } + if err := handler(ctx, recordToMessage(record)); err != nil { slog.Error("Replayer handler error", "topic", record.Topic, + "partition", record.Partition, "offset", record.Offset, "error", err, ) + handlerErr = fmt.Errorf("replay handler failed for topic %s partition %d offset %d: %w", record.Topic, record.Partition, record.Offset, err) + return + } + if record.Offset+1 >= targetOffsets[partitionKey] { + completedPartitions[partitionKey] = true } }) + + if handlerErr != nil { + return handlerErr + } + if replayComplete(completedPartitions) { + return nil + } + } +} + +func replayComplete(completedPartitions map[replayPartitionKey]bool) bool { + for _, done := range completedPartitions { + if !done { + return false + } } + return true } diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go index e4b299a02..888fe49c2 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go @@ -40,7 +40,6 @@ type Options struct { DeliveryConcurrency int RuntimeID string ConsumerGroupPrefix string - Brokers []string } // WebSubReceiver is a multi-channel WebSub receiver. @@ -84,7 +83,7 @@ func NewReceiver(cfg connectors.ReceiverConfig, opts Options) (connectors.Receiv // Create consumer manager for per-callback consumers. consumerMgr := NewConsumerManager( - opts.Brokers, + cfg.BrokerDriver, opts.ConsumerGroupPrefix, cfg.Processor, cfg.Channel.Name, @@ -93,12 +92,8 @@ func NewReceiver(cfg connectors.ReceiverConfig, opts Options) (connectors.Receiv // Create sync producer for subscription state. var syncProducer *subscription.SyncProducer - if len(opts.Brokers) > 0 && cfg.Channel.InternalSubTopic != "" { - var err error - syncProducer, err = subscription.NewSyncProducer(opts.Brokers, opts.RuntimeID, cfg.Channel.InternalSubTopic) - if err != nil { - slog.Warn("Failed to create sync producer, subscription sync disabled", "error", err) - } + if cfg.Channel.InternalSubTopic != "" { + syncProducer = subscription.NewSyncProducer(cfg.BrokerDriver, opts.RuntimeID, cfg.Channel.InternalSubTopic) } verificationTimeout := time.Duration(opts.VerificationTimeoutSeconds) * time.Second @@ -190,11 +185,11 @@ func (e *WebSubReceiver) Stop(ctx context.Context) error { // reconcileSubscriptions replays subscriptions from the Kafka sync topic and // restores any active subscriptions whose channel names belong to this receiver. func (e *WebSubReceiver) reconcileSubscriptions(ctx context.Context) { - if len(e.opts.Brokers) == 0 { + if e.channel.InternalSubTopic == "" { return } - reconciler := subscription.NewReconciler(e.opts.Brokers, e.store, e.opts.RuntimeID, e.channel.InternalSubTopic) + reconciler := subscription.NewReconciler(e.brokerDriver, e.store, e.opts.RuntimeID, e.channel.InternalSubTopic) // Build a set of channel names this receiver owns. ownedChannels := make(map[string]bool, len(e.channel.Channels)) diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go index e04d2194a..f909444aa 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go @@ -27,7 +27,6 @@ import ( "sync" "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/connectors" - "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka" ) // managedConsumer tracks a per-callback consumer and its topic set. @@ -40,31 +39,31 @@ type managedConsumer struct { // Each unique callback URL gets its own consumer group. When a callback's // topic set changes (subscribe/unsubscribe), the consumer is recreated. type ConsumerManager struct { - mu sync.Mutex - consumers map[string]*managedConsumer // callbackURL → managedConsumer - brokers []string - groupPrefix string - processor connectors.MessageProcessor - bindingName string - deliverer *Deliverer - ctx context.Context + mu sync.Mutex + consumers map[string]*managedConsumer // callbackURL → managedConsumer + brokerDriver connectors.BrokerDriver + groupPrefix string + processor connectors.MessageProcessor + bindingName string + deliverer *Deliverer + ctx context.Context } // NewConsumerManager creates a new ConsumerManager. func NewConsumerManager( - brokers []string, + brokerDriver connectors.BrokerDriver, groupPrefix string, processor connectors.MessageProcessor, bindingName string, deliverer *Deliverer, ) *ConsumerManager { return &ConsumerManager{ - consumers: make(map[string]*managedConsumer), - brokers: brokers, - groupPrefix: groupPrefix, - processor: processor, - bindingName: bindingName, - deliverer: deliverer, + consumers: make(map[string]*managedConsumer), + brokerDriver: brokerDriver, + groupPrefix: groupPrefix, + processor: processor, + bindingName: bindingName, + deliverer: deliverer, } } @@ -211,7 +210,7 @@ func (cm *ConsumerManager) createConsumer(groupID string, topics []string, callb return cm.deliverer.Deliver(ctx, callbackURL, secret, processed) } - consumer, err := kafka.NewManualCommitConsumer(cm.brokers, groupID, topics, handler) + consumer, err := cm.brokerDriver.SubscribeManual(groupID, topics, handler) if err != nil { return nil, err } diff --git a/event-gateway/gateway-runtime/internal/connectors/types.go b/event-gateway/gateway-runtime/internal/connectors/types.go index 6b3139835..9655360ad 100644 --- a/event-gateway/gateway-runtime/internal/connectors/types.go +++ b/event-gateway/gateway-runtime/internal/connectors/types.go @@ -53,8 +53,11 @@ type MessageProcessor interface { type BrokerDriver interface { Publish(ctx context.Context, topic string, msg *Message) error Subscribe(groupID string, topics []string, handler MessageHandler) (Receiver, error) + SubscribeManual(groupID string, topics []string, handler MessageHandler) (Receiver, error) + Replay(ctx context.Context, topic string, handler MessageHandler) error TopicExists(ctx context.Context, topic string) (bool, error) EnsureTopics(ctx context.Context, topics []string) error + EnsureCompactedTopic(ctx context.Context, topic string) error DeleteTopics(ctx context.Context, topics []string) error Close() error } diff --git a/event-gateway/gateway-runtime/internal/runtime/runtime_test.go b/event-gateway/gateway-runtime/internal/runtime/runtime_test.go index 5daec54b6..7d45cdb21 100644 --- a/event-gateway/gateway-runtime/internal/runtime/runtime_test.go +++ b/event-gateway/gateway-runtime/internal/runtime/runtime_test.go @@ -157,7 +157,7 @@ func TestWebSubSubscriptionSyncTopic_FallsBackToDerivedTopic(t *testing.T) { rt := &Runtime{cfg: &config.Config{}} got := rt.webSubSubscriptionSyncTopic("repo-watcher", "v1.0") - want := binding.WebSubApiTopicName("repo-watcher", "v1.0", "_subscriptions") + want := binding.WebSubApiSubscriptionTopic("repo-watcher", "v1.0") if got != want { t.Fatalf("webSubSubscriptionSyncTopic() = %q, want %q", got, want) } diff --git a/event-gateway/gateway-runtime/internal/subscription/reconciler.go b/event-gateway/gateway-runtime/internal/subscription/reconciler.go index 87bd93b24..a44c2827c 100644 --- a/event-gateway/gateway-runtime/internal/subscription/reconciler.go +++ b/event-gateway/gateway-runtime/internal/subscription/reconciler.go @@ -24,8 +24,7 @@ import ( "fmt" "log/slog" - "github.com/twmb/franz-go/pkg/kadm" - "github.com/twmb/franz-go/pkg/kgo" + "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/connectors" ) // SubscriptionCallback is called when a subscription is added/removed during reconciliation. @@ -33,7 +32,7 @@ type SubscriptionCallback func(sub *Subscription, isTombstone bool) // Reconciler rebuilds the in-memory subscription store from a per-API Kafka compacted topic on startup. type Reconciler struct { - brokers []string + driver connectors.BrokerDriver store SubscriptionStore runtimeID string syncTopic string @@ -41,9 +40,9 @@ type Reconciler struct { } // NewReconciler creates a new Reconciler that replays from the given syncTopic. -func NewReconciler(brokers []string, store SubscriptionStore, runtimeID, syncTopic string) *Reconciler { +func NewReconciler(driver connectors.BrokerDriver, store SubscriptionStore, runtimeID, syncTopic string) *Reconciler { return &Reconciler{ - brokers: brokers, + driver: driver, store: store, runtimeID: runtimeID, syncTopic: syncTopic, @@ -60,96 +59,35 @@ func (r *Reconciler) SetCallback(cb SubscriptionCallback) { // Returns when the consumer has caught up to the high watermark. func (r *Reconciler) Reconcile(ctx context.Context) error { slog.Info("Starting subscription reconciliation from Kafka") - - // Get high watermarks to know when we're caught up - adminClient, err := kgo.NewClient(kgo.SeedBrokers(r.brokers...)) - if err != nil { - return fmt.Errorf("failed to create admin client: %w", err) - } - admin := kadm.NewClient(adminClient) - - endOffsets, err := admin.ListEndOffsets(ctx, r.syncTopic) - if err != nil { - adminClient.Close() - return fmt.Errorf("failed to list end offsets: %w", err) - } - adminClient.Close() - - // Calculate total messages to replay - var totalEnd int64 - endOffsets.Each(func(o kadm.ListedOffset) { - totalEnd += o.Offset - }) - - if totalEnd == 0 { - slog.Info("No subscription data to reconcile") - return nil - } - - // Create a consumer from the beginning - client, err := kgo.NewClient( - kgo.SeedBrokers(r.brokers...), - kgo.ConsumeTopics(r.syncTopic), - kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), - ) - if err != nil { - return fmt.Errorf("failed to create reconciliation consumer: %w", err) - } - defer client.Close() - - var replayed int64 - for { - fetches := client.PollFetches(ctx) - if ctx.Err() != nil { - return ctx.Err() - } - - if errs := fetches.Errors(); len(errs) > 0 { - return fmt.Errorf("fetch errors during reconciliation: %v", errs) - } - - fetches.EachRecord(func(record *kgo.Record) { - if record.Value == nil { - // Tombstone — remove from store - parts := parseSyncKey(string(record.Key)) - if parts != nil { - _ = r.store.Remove(parts[0], parts[1]) - if r.callback != nil { - r.callback(&Subscription{Topic: parts[0], CallbackURL: parts[1]}, true) - } - } - } else { - var sub Subscription - if err := json.Unmarshal(record.Value, &sub); err != nil { - slog.Error("Failed to unmarshal subscription during reconciliation", "error", err) - return - } - _ = r.store.Add(&sub) + replayed := 0 + err := r.driver.Replay(ctx, r.syncTopic, func(_ context.Context, msg *connectors.Message) error { + replayed++ + if msg.Value == nil { + parts := parseSyncKey(string(msg.Key)) + if parts != nil { + _ = r.store.Remove(parts[0], parts[1]) if r.callback != nil { - r.callback(&sub, false) + r.callback(&Subscription{Topic: parts[0], CallbackURL: parts[1]}, true) } } - replayed++ - }) - - // Check if we've caught up to all partitions - caughtUp := true - endOffsets.Each(func(o kadm.ListedOffset) { - if o.Offset > 0 { - // Simplified catch-up check - caughtUp = caughtUp && (replayed >= totalEnd) - } - }) + return nil + } - if caughtUp { - break + var sub Subscription + if err := json.Unmarshal(msg.Value, &sub); err != nil { + slog.Error("Failed to unmarshal subscription during reconciliation", "error", err) + return nil } + _ = r.store.Add(&sub) + if r.callback != nil { + r.callback(&sub, false) + } + return nil + }) + if err != nil { + return fmt.Errorf("failed to replay subscriptions: %w", err) } - slog.Info("Subscription reconciliation complete", - "replayed", replayed, - "active_subscriptions", len(r.store.GetActive()), - ) - + slog.Info("Subscription reconciliation complete", "replayed", replayed, "active_subscriptions", len(r.store.GetActive())) return nil } diff --git a/event-gateway/gateway-runtime/internal/subscription/sync.go b/event-gateway/gateway-runtime/internal/subscription/sync.go index 7c8c4193a..eebbca01f 100644 --- a/event-gateway/gateway-runtime/internal/subscription/sync.go +++ b/event-gateway/gateway-runtime/internal/subscription/sync.go @@ -23,61 +23,30 @@ import ( "encoding/json" "fmt" "log/slog" - "strings" "sync" "time" - "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" + "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/connectors" ) // SyncProducer publishes subscription state changes to a per-API sync topic. type SyncProducer struct { - client *kgo.Client + driver connectors.BrokerDriver runtimeID string - brokers []string syncTopic string } // NewSyncProducer creates a new sync producer that writes to the given syncTopic. -func NewSyncProducer(brokers []string, runtimeID, syncTopic string) (*SyncProducer, error) { - client, err := kgo.NewClient( - kgo.SeedBrokers(brokers...), - kgo.DefaultProduceTopic(syncTopic), - ) - if err != nil { - return nil, fmt.Errorf("failed to create sync producer: %w", err) - } - return &SyncProducer{client: client, runtimeID: runtimeID, brokers: brokers, syncTopic: syncTopic}, nil +func NewSyncProducer(driver connectors.BrokerDriver, runtimeID, syncTopic string) *SyncProducer { + return &SyncProducer{driver: driver, runtimeID: runtimeID, syncTopic: syncTopic} } // EnsureSyncTopic creates the per-API subscription sync topic if it // does not already exist. The topic is created with cleanup.policy=compact // so that the latest subscription state per key is retained indefinitely. func (p *SyncProducer) EnsureSyncTopic(ctx context.Context) error { - adminKgo, err := kgo.NewClient(kgo.SeedBrokers(p.brokers...)) - if err != nil { - return fmt.Errorf("failed to create admin client: %w", err) - } - defer adminKgo.Close() - - admin := kadm.NewClient(adminKgo) - topicConfig := map[string]*string{ - "cleanup.policy": kadm.StringPtr("compact"), - } - resp, err := admin.CreateTopics(ctx, 1, 1, topicConfig, p.syncTopic) - if err != nil { - return fmt.Errorf("failed to create sync topic: %w", err) - } - for _, t := range resp.Sorted() { - if t.Err != nil { - errStr := t.Err.Error() - if !(strings.Contains(errStr, "TOPIC_ALREADY_EXISTS") || strings.Contains(errStr, "already exists")) { - return fmt.Errorf("failed to create sync topic %s: %w", t.Topic, t.Err) - } - } - } - return nil + return p.driver.EnsureCompactedTopic(ctx, p.syncTopic) } // PublishSubscription publishes a subscription state change synchronously. @@ -101,7 +70,11 @@ func (p *SyncProducer) PublishSubscription(_ context.Context, sub *Subscription) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - if err := p.client.ProduceSync(ctx, record).FirstErr(); err != nil { + if err := p.driver.Publish(ctx, p.syncTopic, &connectors.Message{ + Key: record.Key, + Value: record.Value, + Topic: record.Topic, + }); err != nil { slog.Error("Failed to publish subscription sync", "key", key, "error", err) return fmt.Errorf("failed to publish subscription sync: %w", err) } @@ -112,16 +85,14 @@ func (p *SyncProducer) PublishSubscription(_ context.Context, sub *Subscription) // PublishTombstone publishes a tombstone (deletion) for a subscription synchronously. func (p *SyncProducer) PublishTombstone(_ context.Context, topic, callbackURL string) error { key := syncKey(topic, callbackURL) - record := &kgo.Record{ - Key: []byte(key), - Value: nil, // tombstone - Topic: p.syncTopic, - } - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - if err := p.client.ProduceSync(ctx, record).FirstErr(); err != nil { + if err := p.driver.Publish(ctx, p.syncTopic, &connectors.Message{ + Key: []byte(key), + Value: nil, + Topic: p.syncTopic, + }); err != nil { slog.Error("Failed to publish subscription tombstone", "key", key, "error", err) return fmt.Errorf("failed to publish subscription tombstone: %w", err) } @@ -131,12 +102,6 @@ func (p *SyncProducer) PublishTombstone(_ context.Context, topic, callbackURL st // Close flushes any buffered records and closes the sync producer. func (p *SyncProducer) Close() { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := p.client.Flush(ctx); err != nil { - slog.Warn("Failed to flush sync producer before close", "error", err) - } - p.client.Close() } // SyncConsumer consumes subscription state changes from the sync topic. diff --git a/event-gateway/gateway-runtime/internal/xdsclient/handler.go b/event-gateway/gateway-runtime/internal/xdsclient/handler.go index 0265070be..81e80743f 100644 --- a/event-gateway/gateway-runtime/internal/xdsclient/handler.go +++ b/event-gateway/gateway-runtime/internal/xdsclient/handler.go @@ -42,7 +42,15 @@ type BindingManager interface { // KafkaConfig holds local Kafka broker settings used as defaults. type KafkaConfig struct { - Brokers []string + Brokers []string + TLS bool + TLSCAFile string + TLSCertFile string + TLSKeyFile string + TLSServerName string + SASLMechanism string + SASLUsername string + SASLPassword string } // EventChannelResource represents the decoded EventChannelConfig JSON payload. @@ -258,7 +266,15 @@ func (h *Handler) resolveBrokerDriver(bd BrokerDriverEntry) binding.BrokerDriver if len(cfg) == 0 { // Use the event gateway's own Kafka brokers. cfg = map[string]interface{}{ - "brokers": h.kafkaConfig.Brokers, + "brokers": h.kafkaConfig.Brokers, + "tls": h.kafkaConfig.TLS, + "tls_ca_file": h.kafkaConfig.TLSCAFile, + "tls_cert_file": h.kafkaConfig.TLSCertFile, + "tls_key_file": h.kafkaConfig.TLSKeyFile, + "tls_server_name": h.kafkaConfig.TLSServerName, + "sasl_mechanism": h.kafkaConfig.SASLMechanism, + "sasl_username": h.kafkaConfig.SASLUsername, + "sasl_password": h.kafkaConfig.SASLPassword, } } From be250280db70716ee6f0516f1766da29d7be2390 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Tue, 5 May 2026 12:10:00 +0530 Subject: [PATCH 2/7] Remove unnecessary code --- .../connectors/receiver/websub/connector.go | 3 - .../internal/subscription/sync.go | 88 ------------------- 2 files changed, 91 deletions(-) diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go index 888fe49c2..526a27c34 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go @@ -138,9 +138,6 @@ func (e *WebSubReceiver) Start(ctx context.Context) error { for _, kafkaTopic := range e.channel.Channels { topicsToEnsure = append(topicsToEnsure, kafkaTopic) } - if e.channel.InternalSubTopic != "" { - topicsToEnsure = append(topicsToEnsure, e.channel.InternalSubTopic) - } if len(topicsToEnsure) > 0 { if err := e.brokerDriver.EnsureTopics(ctx, topicsToEnsure); err != nil { diff --git a/event-gateway/gateway-runtime/internal/subscription/sync.go b/event-gateway/gateway-runtime/internal/subscription/sync.go index eebbca01f..38bc09e69 100644 --- a/event-gateway/gateway-runtime/internal/subscription/sync.go +++ b/event-gateway/gateway-runtime/internal/subscription/sync.go @@ -23,7 +23,6 @@ import ( "encoding/json" "fmt" "log/slog" - "sync" "time" "github.com/twmb/franz-go/pkg/kgo" @@ -104,93 +103,6 @@ func (p *SyncProducer) PublishTombstone(_ context.Context, topic, callbackURL st func (p *SyncProducer) Close() { } -// SyncConsumer consumes subscription state changes from the sync topic. -type SyncConsumer struct { - client *kgo.Client - store SubscriptionStore - runtimeID string - cancel context.CancelFunc - wg sync.WaitGroup -} - -// NewSyncConsumer creates a new sync consumer that reads from the given syncTopic. -func NewSyncConsumer(brokers []string, store SubscriptionStore, runtimeID, syncTopic string) (*SyncConsumer, error) { - client, err := kgo.NewClient( - kgo.SeedBrokers(brokers...), - kgo.ConsumeTopics(syncTopic), - kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), - ) - if err != nil { - return nil, fmt.Errorf("failed to create sync consumer: %w", err) - } - return &SyncConsumer{ - client: client, - store: store, - runtimeID: runtimeID, - }, nil -} - -// Start begins consuming subscription state changes. -func (c *SyncConsumer) Start(ctx context.Context) { - ctx, c.cancel = context.WithCancel(ctx) - c.wg.Add(1) - go func() { - defer c.wg.Done() - c.consumeLoop(ctx) - }() -} - -// Stop stops the sync consumer. -func (c *SyncConsumer) Stop() { - if c.cancel != nil { - c.cancel() - } - c.wg.Wait() - c.client.Close() -} - -func (c *SyncConsumer) consumeLoop(ctx context.Context) { - for { - fetches := c.client.PollFetches(ctx) - if ctx.Err() != nil { - return - } - - fetches.EachRecord(func(record *kgo.Record) { - c.processRecord(record) - }) - } -} - -func (c *SyncConsumer) processRecord(record *kgo.Record) { - // Tombstone — remove subscription - if record.Value == nil { - parts := parseSyncKey(string(record.Key)) - if parts == nil { - return - } - if err := c.store.Remove(parts[0], parts[1]); err != nil { - slog.Debug("Failed to remove subscription from sync", "key", string(record.Key), "error", err) - } - return - } - - var sub Subscription - if err := json.Unmarshal(record.Value, &sub); err != nil { - slog.Error("Failed to unmarshal subscription from sync", "error", err) - return - } - - // Skip self-originated messages - if sub.RuntimeID == c.runtimeID { - return - } - - if err := c.store.Add(&sub); err != nil { - slog.Error("Failed to add subscription from sync", "error", err) - } -} - func syncKey(topic, callbackURL string) string { return topic + ":" + callbackURL } From 5b860e9ac41ebba6438e5d46b56b1dc8f8592c2f Mon Sep 17 00:00:00 2001 From: AnujaK Date: Mon, 4 May 2026 14:56:15 +0530 Subject: [PATCH 3/7] Add subscription metadata topic name to the config toml --- .../connectors/brokerdriver/kafka/endpoint.go | 2 +- .../internal/runtime/runtime_test.go | 26 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go index 0195ea892..ed74bbf15 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go @@ -103,7 +103,7 @@ func (e *KafkaBrokerDriver) TopicExists(ctx context.Context, topic string) (bool func (e *KafkaBrokerDriver) EnsureTopics(ctx context.Context, topics []string) error { resp, err := e.admin.CreateTopics(ctx, 1, 1, nil, topics...) if err != nil { - return fmt.Errorf("failed to create topics: %w", err) + return fmt.Errorf("failed to create topics: %w", err) } for _, t := range resp.Sorted() { diff --git a/event-gateway/gateway-runtime/internal/runtime/runtime_test.go b/event-gateway/gateway-runtime/internal/runtime/runtime_test.go index 7d45cdb21..8e3b5d061 100644 --- a/event-gateway/gateway-runtime/internal/runtime/runtime_test.go +++ b/event-gateway/gateway-runtime/internal/runtime/runtime_test.go @@ -163,6 +163,32 @@ func TestWebSubSubscriptionSyncTopic_FallsBackToDerivedTopic(t *testing.T) { } } +func TestWebSubSubscriptionSyncTopic_UsesConfigOverrideWhenSet(t *testing.T) { + rt := &Runtime{ + cfg: &config.Config{ + WebSub: config.WebSubConfig{ + SubscriptionsTopicName: "websub.subscriptions", + }, + }, + } + + got := rt.webSubSubscriptionSyncTopic("repo-watcher", "v1.0") + want := binding.WebSubApiTopicName("repo-watcher", "v1.0", "websub.subscriptions") + if got != want { + t.Fatalf("webSubSubscriptionSyncTopic() = %q, want %q", got, want) + } +} + +func TestWebSubSubscriptionSyncTopic_FallsBackToDerivedTopic(t *testing.T) { + rt := &Runtime{cfg: &config.Config{}} + + got := rt.webSubSubscriptionSyncTopic("repo-watcher", "v1.0") + want := binding.WebSubApiTopicName("repo-watcher", "v1.0", "_subscriptions") + if got != want { + t.Fatalf("webSubSubscriptionSyncTopic() = %q, want %q", got, want) + } +} + func TestStartReceiverWithRetry_RetriesUntilSuccess(t *testing.T) { previousInitial := initialReceiverStartBackoff previousMax := maxReceiverStartBackoff From 673141cc5386acf305ca31fa7a384a67b4ba5da9 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Mon, 4 May 2026 15:35:28 +0530 Subject: [PATCH 4/7] Fix topic name combining character issue --- .../connectors/brokerdriver/kafka/endpoint.go | 2 +- .../internal/runtime/runtime_test.go | 24 ------------------- 2 files changed, 1 insertion(+), 25 deletions(-) diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go index ed74bbf15..0195ea892 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go @@ -103,7 +103,7 @@ func (e *KafkaBrokerDriver) TopicExists(ctx context.Context, topic string) (bool func (e *KafkaBrokerDriver) EnsureTopics(ctx context.Context, topics []string) error { resp, err := e.admin.CreateTopics(ctx, 1, 1, nil, topics...) if err != nil { - return fmt.Errorf("failed to create topics: %w", err) + return fmt.Errorf("failed to create topics: %w", err) } for _, t := range resp.Sorted() { diff --git a/event-gateway/gateway-runtime/internal/runtime/runtime_test.go b/event-gateway/gateway-runtime/internal/runtime/runtime_test.go index 8e3b5d061..2f82dccdb 100644 --- a/event-gateway/gateway-runtime/internal/runtime/runtime_test.go +++ b/event-gateway/gateway-runtime/internal/runtime/runtime_test.go @@ -136,30 +136,6 @@ func TestJoinNormalizedTopic_NormalizesUnsupportedCharacters(t *testing.T) { t.Fatalf("JoinNormalizedTopic() = %q, want %q", got, want) } } - -func TestWebSubSubscriptionSyncTopic_UsesConfigOverrideWhenSet(t *testing.T) { - rt := &Runtime{ - cfg: &config.Config{ - WebSub: config.WebSubConfig{ - SubscriptionsTopicName: "websub.subscriptions", - }, - }, - } - - got := rt.webSubSubscriptionSyncTopic("repo-watcher", "v1.0") - want := binding.WebSubApiTopicName("repo-watcher", "v1.0", "websub.subscriptions") - if got != want { - t.Fatalf("webSubSubscriptionSyncTopic() = %q, want %q", got, want) - } -} - -func TestWebSubSubscriptionSyncTopic_FallsBackToDerivedTopic(t *testing.T) { - rt := &Runtime{cfg: &config.Config{}} - - got := rt.webSubSubscriptionSyncTopic("repo-watcher", "v1.0") - want := binding.WebSubApiSubscriptionTopic("repo-watcher", "v1.0") - if got != want { - t.Fatalf("webSubSubscriptionSyncTopic() = %q, want %q", got, want) } } From 39455eb229f707a337ca56aaf46c1ce22bce8c95 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Tue, 5 May 2026 14:14:43 +0530 Subject: [PATCH 5/7] Fix EnsureCompactedTopic functionality --- event-gateway/docker/kafka/generate-certs.sh | 3 +- .../connectors/brokerdriver/kafka/endpoint.go | 42 +++++++++++++++++++ .../internal/subscription/sync.go | 12 ++---- 3 files changed, 47 insertions(+), 10 deletions(-) diff --git a/event-gateway/docker/kafka/generate-certs.sh b/event-gateway/docker/kafka/generate-certs.sh index 740a6943c..85ca71308 100644 --- a/event-gateway/docker/kafka/generate-certs.sh +++ b/event-gateway/docker/kafka/generate-certs.sh @@ -92,4 +92,5 @@ keytool \ -storepass "${password}" \ -file "${cert_dir}/ca.crt" -chmod 0644 "${cert_dir}/ca.crt" "${cert_dir}/kafka.keystore.jks" "${cert_dir}/kafka.truststore.jks" +chmod 0644 "${cert_dir}/ca.crt" +chmod 0600 "${cert_dir}/kafka.keystore.jks" "${cert_dir}/kafka.truststore.jks" diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go index 0195ea892..ffa77ffeb 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go @@ -132,6 +132,9 @@ func (e *KafkaBrokerDriver) EnsureCompactedTopic(ctx context.Context, topic stri for _, t := range resp.Sorted() { if t.Err != nil { if isTopicAlreadyExistsErr(t.Err) { + if err := e.verifyCompactedTopic(ctx, t.Topic); err != nil { + return err + } return nil } return fmt.Errorf("failed to create compacted topic %s: %w", t.Topic, t.Err) @@ -140,6 +143,45 @@ func (e *KafkaBrokerDriver) EnsureCompactedTopic(ctx context.Context, topic stri return nil } +func (e *KafkaBrokerDriver) verifyCompactedTopic(ctx context.Context, topic string) error { + configs, err := e.admin.DescribeTopicConfigs(ctx, topic) + if err != nil { + return fmt.Errorf("failed to describe compacted topic %s config: %w", topic, err) + } + + config, err := configs.On(topic, nil) + if err != nil { + return fmt.Errorf("failed to load compacted topic %s config: %w", topic, err) + } + if config.Err != nil { + return fmt.Errorf("failed to describe compacted topic %s config: %w", topic, config.Err) + } + if !hasCleanupPolicy(config, "compact") { + return fmt.Errorf("existing topic %s is not compacted", topic) + } + return nil +} + +func hasCleanupPolicy(config kadm.ResourceConfig, required string) bool { + required = strings.ToLower(strings.TrimSpace(required)) + if required == "" { + return false + } + + for _, entry := range config.Configs { + if entry.Key != "cleanup.policy" { + continue + } + for _, policy := range strings.Split(entry.MaybeValue(), ",") { + if strings.ToLower(strings.TrimSpace(policy)) == required { + return true + } + } + } + + return false +} + // isTopicAlreadyExistsErr checks if the error indicates the topic already exists. func isTopicAlreadyExistsErr(err error) bool { if err == nil { diff --git a/event-gateway/gateway-runtime/internal/subscription/sync.go b/event-gateway/gateway-runtime/internal/subscription/sync.go index 38bc09e69..c8a67c706 100644 --- a/event-gateway/gateway-runtime/internal/subscription/sync.go +++ b/event-gateway/gateway-runtime/internal/subscription/sync.go @@ -25,7 +25,6 @@ import ( "log/slog" "time" - "github.com/twmb/franz-go/pkg/kgo" "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/connectors" ) @@ -60,19 +59,14 @@ func (p *SyncProducer) PublishSubscription(_ context.Context, sub *Subscription) } key := syncKey(sub.Topic, sub.CallbackURL) - record := &kgo.Record{ - Key: []byte(key), - Value: value, - Topic: p.syncTopic, - } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := p.driver.Publish(ctx, p.syncTopic, &connectors.Message{ - Key: record.Key, - Value: record.Value, - Topic: record.Topic, + Key: []byte(key), + Value: value, + Topic: p.syncTopic, }); err != nil { slog.Error("Failed to publish subscription sync", "key", key, "error", err) return fmt.Errorf("failed to publish subscription sync: %w", err) From e8f95b8ccea18c7ca2203d2892077bb3097f8e29 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Tue, 5 May 2026 14:55:10 +0530 Subject: [PATCH 6/7] Revert generate cert file permission --- event-gateway/docker/kafka/generate-certs.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/event-gateway/docker/kafka/generate-certs.sh b/event-gateway/docker/kafka/generate-certs.sh index 85ca71308..a62ce4e5d 100644 --- a/event-gateway/docker/kafka/generate-certs.sh +++ b/event-gateway/docker/kafka/generate-certs.sh @@ -92,5 +92,7 @@ keytool \ -storepass "${password}" \ -file "${cert_dir}/ca.crt" +# Local dev: these files are shared read-only with the Kafka, Kafka UI, and runtime +# containers via a Docker volume, so they must remain world-readable here. chmod 0644 "${cert_dir}/ca.crt" -chmod 0600 "${cert_dir}/kafka.keystore.jks" "${cert_dir}/kafka.truststore.jks" +chmod 0644 "${cert_dir}/kafka.keystore.jks" "${cert_dir}/kafka.truststore.jks" From 69c653811ae950b70a9052ae666fee6e65b442c7 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Tue, 5 May 2026 15:42:18 +0530 Subject: [PATCH 7/7] Fix error --- event-gateway/gateway-runtime/internal/runtime/runtime_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/event-gateway/gateway-runtime/internal/runtime/runtime_test.go b/event-gateway/gateway-runtime/internal/runtime/runtime_test.go index 2f82dccdb..5daec54b6 100644 --- a/event-gateway/gateway-runtime/internal/runtime/runtime_test.go +++ b/event-gateway/gateway-runtime/internal/runtime/runtime_test.go @@ -136,8 +136,6 @@ func TestJoinNormalizedTopic_NormalizesUnsupportedCharacters(t *testing.T) { t.Fatalf("JoinNormalizedTopic() = %q, want %q", got, want) } } - } -} func TestWebSubSubscriptionSyncTopic_UsesConfigOverrideWhenSet(t *testing.T) { rt := &Runtime{