Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions event-gateway/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,10 @@ GATEWAY_REGISTRATION_TOKEN=replace-with-real-gateway-registration-token

# Optional: override the control plane host if needed locally.
GATEWAY_CONTROLPLANE_HOST=connect.preview-dv.bijira.dev

# Kafka client credentials for the secured local dev listener.
KAFKA_CLIENT_USERNAME=egw
KAFKA_CLIENT_PASSWORD=egw-pass
KAFKA_CONTROLLER_PASSWORD=controller-pass
KAFKA_INTER_BROKER_PASSWORD=broker-pass
KAFKA_CERT_PASSWORD=changeit
106 changes: 85 additions & 21 deletions event-gateway/docker-compose.dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ services:
- "9003:9003" # Metrics
environment:
- APIP_EGW_KAFKA_BROKERS=kafka:29092
- APIP_EGW_KAFKA_TLS=true
- APIP_EGW_KAFKA_TLS_CA_FILE=/etc/event-gateway/kafka/ca.crt
- APIP_EGW_KAFKA_SASL_MECHANISM=plain
- APIP_EGW_KAFKA_SASL_USERNAME=${KAFKA_CLIENT_USERNAME:-egw}
- APIP_EGW_KAFKA_SASL_PASSWORD=${KAFKA_CLIENT_PASSWORD:-egw-pass}
- APIP_EGW_SERVER_WEBSUB_ENABLED=true
- APIP_EGW_SERVER_WEBSUB_HTTP_PORT=8080
- APIP_EGW_SERVER_WEBSUB_HTTPS_PORT=8443
Expand All @@ -86,40 +91,97 @@ services:
- ../gateway/gateway-controller/listener-certs:/etc/event-gateway/tls:ro
- ./gateway-runtime/configs/config.toml:/etc/event-gateway/config.toml:ro
- ./gateway-runtime/configs/channels.yaml:/etc/event-gateway/channels.yaml:ro
- kafka-certs:/etc/event-gateway/kafka:ro
depends_on:
gateway-controller:
condition: service_started
kafka:
# condition: service_healthy
condition: service_started
networks:
- egw-network

kafka-cert-init:
image: bitnamilegacy/kafka:4.0.0-debian-12-r10
user: "0:0"
command: ["bash", "/scripts/generate-certs.sh"]
environment:
KAFKA_CERT_PASSWORD: ${KAFKA_CERT_PASSWORD:-changeit}
KAFKA_BROKER_HOST: kafka
volumes:
- ./docker/kafka/generate-certs.sh:/scripts/generate-certs.sh:ro
- kafka-certs:/certs
networks:
- egw-network

kafka:
image: apache/kafka:latest
image: bitnamilegacy/kafka:4.0.0-debian-12-r10
hostname: kafka
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
# healthcheck:
# test: ["CMD", "/opt/kafka/bin/kafka-topics.sh", "--bootstrap-server", "kafka:29092", "--list"]
# interval: 2s
# timeout: 3s
# start_period: 5s
# retries: 15
KAFKA_CFG_NODE_ID: 1
KAFKA_CFG_PROCESS_ROLES: broker,controller
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_LISTENERS: SASL_SSL://:29092,CONTROLLER://:29093
KAFKA_CFG_ADVERTISED_LISTENERS: SASL_SSL://kafka:29092
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093
KAFKA_CFG_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL: PLAIN
KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: SASL_SSL
KAFKA_CLIENT_LISTENER_NAME: SASL_SSL
KAFKA_CLIENT_USERS: ${KAFKA_CLIENT_USERNAME:-egw}
KAFKA_CLIENT_PASSWORDS: ${KAFKA_CLIENT_PASSWORD:-egw-pass}
KAFKA_CONTROLLER_USER: controller_user
KAFKA_CONTROLLER_PASSWORD: ${KAFKA_CONTROLLER_PASSWORD:-controller-pass}
KAFKA_INTER_BROKER_USER: broker
KAFKA_INTER_BROKER_PASSWORD: ${KAFKA_INTER_BROKER_PASSWORD:-broker-pass}
KAFKA_TLS_TYPE: JKS
KAFKA_TLS_CLIENT_AUTH: none
KAFKA_CERTIFICATE_PASSWORD: ${KAFKA_CERT_PASSWORD:-changeit}
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
volumes:
- kafka-certs:/opt/bitnami/kafka/config/certs:ro
- kafka-data:/bitnami/kafka
depends_on:
kafka-cert-init:
condition: service_completed_successfully
networks:
- egw-network

wh-listener:
image: event-gateway/webhook-listener:local
build:
context: webhook-listener
dockerfile: Dockerfile
ports:
- "8090:8090"
networks:
- egw-network

kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "7080:8080"
environment:
DYNAMIC_CONFIG_ENABLED: "true"
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_SSL
KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: PLAIN
KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="${KAFKA_CLIENT_USERNAME:-egw}" password="${KAFKA_CLIENT_PASSWORD:-egw-pass}";
KAFKA_CLUSTERS_0_PROPERTIES_SSL_TRUSTSTORE_LOCATION: /etc/kafka-ui/certs/kafka.truststore.jks
KAFKA_CLUSTERS_0_PROPERTIES_SSL_TRUSTSTORE_PASSWORD: ${KAFKA_CERT_PASSWORD:-changeit}
KAFKA_CLUSTERS_0_PROPERTIES_SSL_TRUSTSTORE_TYPE: JKS
volumes:
- kafka-certs:/etc/kafka-ui/certs:ro
depends_on:
kafka:
condition: service_started
networks:
- egw-network

Expand All @@ -129,3 +191,5 @@ networks:

volumes:
controller-data:
kafka-certs:
kafka-data:
98 changes: 98 additions & 0 deletions event-gateway/docker/kafka/generate-certs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/usr/bin/env bash

set -euo pipefail

cert_dir="${CERT_DIR:-/certs}"
password="${KAFKA_CERT_PASSWORD:-changeit}"
broker_host="${KAFKA_BROKER_HOST:-kafka}"

mkdir -p "${cert_dir}"

if [[ -f "${cert_dir}/kafka.keystore.jks" && -f "${cert_dir}/kafka.truststore.jks" && -f "${cert_dir}/ca.crt" ]]; then
echo "Kafka TLS assets already exist in ${cert_dir}"
exit 0
fi

rm -f \
"${cert_dir}/ca.crt" \
"${cert_dir}/ca.key" \
"${cert_dir}/ca.srl" \
"${cert_dir}/kafka.keystore.jks" \
"${cert_dir}/kafka.truststore.jks" \
"${cert_dir}/broker.csr" \
"${cert_dir}/broker.crt" \
"${cert_dir}/openssl-san.cnf"

openssl req \
-new \
-x509 \
-days 3650 \
-subj "/CN=event-gateway-kafka-dev-ca" \
-passout "pass:${password}" \
-keyout "${cert_dir}/ca.key" \
-out "${cert_dir}/ca.crt"

keytool \
-genkeypair \
-alias "${broker_host}" \
-keyalg RSA \
-validity 3650 \
-storetype JKS \
-keystore "${cert_dir}/kafka.keystore.jks" \
-storepass "${password}" \
-keypass "${password}" \
-dname "CN=${broker_host}, OU=Event Gateway, O=WSO2, L=Colombo, S=Western, C=LK" \
-ext "SAN=DNS:${broker_host},DNS:localhost,IP:127.0.0.1"

keytool \
-certreq \
-alias "${broker_host}" \
-keystore "${cert_dir}/kafka.keystore.jks" \
-storepass "${password}" \
-file "${cert_dir}/broker.csr"

cat > "${cert_dir}/openssl-san.cnf" <<EOF
[v3_req]
subjectAltName=DNS:${broker_host},DNS:localhost,IP:127.0.0.1
EOF

openssl x509 \
-req \
-days 3650 \
-CA "${cert_dir}/ca.crt" \
-CAkey "${cert_dir}/ca.key" \
-CAcreateserial \
-passin "pass:${password}" \
-in "${cert_dir}/broker.csr" \
-out "${cert_dir}/broker.crt" \
-extfile "${cert_dir}/openssl-san.cnf" \
-extensions v3_req

keytool \
-importcert \
-noprompt \
-alias CARoot \
-keystore "${cert_dir}/kafka.keystore.jks" \
-storepass "${password}" \
-file "${cert_dir}/ca.crt"

keytool \
-importcert \
-noprompt \
-alias "${broker_host}" \
-keystore "${cert_dir}/kafka.keystore.jks" \
-storepass "${password}" \
-file "${cert_dir}/broker.crt"

keytool \
-importcert \
-noprompt \
-alias CARoot \
-keystore "${cert_dir}/kafka.truststore.jks" \
-storepass "${password}" \
-file "${cert_dir}/ca.crt"

# Local dev: these files are shared read-only with the Kafka, Kafka UI, and runtime
# containers via a Docker volume, so they must remain world-readable here.
chmod 0644 "${cert_dir}/ca.crt"
chmod 0644 "${cert_dir}/kafka.keystore.jks" "${cert_dir}/kafka.truststore.jks"
10 changes: 9 additions & 1 deletion event-gateway/gateway-runtime/cmd/event-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,15 @@ func main() {
"xds_address", cfg.ControlPlane.XDSAddress)

handler := xdsclient.NewHandler(rt, xdsclient.KafkaConfig{
Brokers: cfg.Kafka.Brokers,
Brokers: cfg.Kafka.Brokers,
TLS: cfg.Kafka.TLS,
TLSCAFile: cfg.Kafka.TLSCAFile,
TLSCertFile: cfg.Kafka.TLSCertFile,
TLSKeyFile: cfg.Kafka.TLSKeyFile,
TLSServerName: cfg.Kafka.TLSServerName,
SASLMechanism: cfg.Kafka.SASLMechanism,
SASLUsername: cfg.Kafka.SASLUsername,
SASLPassword: cfg.Kafka.SASLPassword,
})
eventConfigClient := xdsclient.NewClient(
cfg.ControlPlane.XDSAddress,
Expand Down
26 changes: 4 additions & 22 deletions event-gateway/gateway-runtime/cmd/event-gateway/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,11 @@ import (
// 3. Add bindings in channels.yaml — no changes to main.go or runtime needed
func registerConnectors(registry *connectors.Registry, cfg *config.Config) {
registry.RegisterBrokerDriver("kafka", func(brokerDriverCfg map[string]interface{}) (connectors.BrokerDriver, error) {
brokers := cfg.Kafka.Brokers // fallback to global config
if brokerDriverCfg != nil {
if b, ok := brokerDriverCfg["brokers"]; ok {
switch v := b.(type) {
case []interface{}:
parsed := make([]string, 0, len(v))
for _, item := range v {
if s, ok := item.(string); ok {
parsed = append(parsed, s)
}
}
if len(parsed) > 0 {
brokers = parsed
}
case []string:
if len(v) > 0 {
brokers = v
}
}
}
connectionCfg, err := kafka.ResolveConnectionConfig(cfg.Kafka, brokerDriverCfg)
if err != nil {
return nil, err
}
return kafka.NewBrokerDriver(brokers)
return kafka.NewBrokerDriver(connectionCfg)
})

registry.RegisterReceiver("websub", func(ecfg connectors.ReceiverConfig) (connectors.Receiver, error) {
Expand All @@ -73,7 +56,6 @@ func registerConnectors(registry *connectors.Registry, cfg *config.Config) {
DeliveryConcurrency: cfg.WebSub.DeliveryConcurrency,
RuntimeID: cfg.RuntimeID,
ConsumerGroupPrefix: cfg.Kafka.ConsumerGroupPrefix,
Brokers: cfg.Kafka.Brokers,
})
})

Expand Down
12 changes: 12 additions & 0 deletions event-gateway/gateway-runtime/configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ metrics_port = 9003
# Default Kafka brokers. Channels can override with broker-driver.config.brokers.
brokers = ["localhost:9092"]
consumer_group_prefix = "event-gateway"
tls = false
# Optional PEM CA file for self-signed or private Kafka CAs.
# tls_ca_file = "/etc/event-gateway/kafka/ca.crt"
# Optional client certificate and key for Kafka mTLS.
# tls_cert_file = "/etc/event-gateway/kafka/client.crt"
# tls_key_file = "/etc/event-gateway/kafka/client.key"
# Optional TLS server name override.
# tls_server_name = "kafka"
# Optional SASL settings. Supported mechanisms: plain, scram-sha-256, scram-sha-512.
# sasl_mechanism = "plain"
# sasl_username = "egw"
# sasl_password = "egw-pass"

[websub]
verification_timeout_seconds = 10
Expand Down
55 changes: 55 additions & 0 deletions event-gateway/gateway-runtime/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type KafkaConfig struct {
Brokers []string `koanf:"brokers"`
ConsumerGroupPrefix string `koanf:"consumer_group_prefix"`
TLS bool `koanf:"tls"`
TLSCAFile string `koanf:"tls_ca_file"`
TLSCertFile string `koanf:"tls_cert_file"`
TLSKeyFile string `koanf:"tls_key_file"`
TLSServerName string `koanf:"tls_server_name"`
SASLMechanism string `koanf:"sasl_mechanism"`
SASLUsername string `koanf:"sasl_username"`
SASLPassword string `koanf:"sasl_password"`
Expand Down Expand Up @@ -277,6 +281,57 @@ func validate(cfg *Config) error {
return fmt.Errorf("logging.format must be one of text, json")
}

if err := validateKafkaConfig(cfg.Kafka); err != nil {
return err
}

return nil
}

func validateKafkaConfig(kafkaCfg KafkaConfig) error {
if len(kafkaCfg.Brokers) == 0 {
return fmt.Errorf("kafka.brokers must contain at least one broker")
}

if kafkaCfg.TLS {
if strings.TrimSpace(kafkaCfg.TLSCAFile) != "" {
if err := validateReadableFile(kafkaCfg.TLSCAFile, "kafka.tls_ca_file"); err != nil {
return err
}
}
certFile := strings.TrimSpace(kafkaCfg.TLSCertFile)
keyFile := strings.TrimSpace(kafkaCfg.TLSKeyFile)
if certFile != "" || keyFile != "" {
if certFile == "" || keyFile == "" {
return fmt.Errorf("kafka.tls_cert_file and kafka.tls_key_file must be configured together")
}
if err := validateReadableFile(certFile, "kafka.tls_cert_file"); err != nil {
return err
}
if err := validateReadableFile(keyFile, "kafka.tls_key_file"); err != nil {
return err
}
}
} else if strings.TrimSpace(kafkaCfg.TLSCAFile) != "" || strings.TrimSpace(kafkaCfg.TLSCertFile) != "" || strings.TrimSpace(kafkaCfg.TLSKeyFile) != "" || strings.TrimSpace(kafkaCfg.TLSServerName) != "" {
return fmt.Errorf("kafka TLS files or server name require kafka.tls=true")
}

mechanism := strings.ToLower(strings.TrimSpace(kafkaCfg.SASLMechanism))
switch mechanism {
case "", "plain", "scram-sha-256", "scram-sha-512":
default:
return fmt.Errorf("kafka.sasl_mechanism must be one of plain, scram-sha-256, scram-sha-512")
}

if mechanism != "" {
if kafkaCfg.SASLUsername == "" {
return fmt.Errorf("kafka.sasl_username is required when kafka.sasl_mechanism is set")
}
if kafkaCfg.SASLPassword == "" {
return fmt.Errorf("kafka.sasl_password is required when kafka.sasl_mechanism is set")
}
}

return nil
}

Expand Down
Loading