diff --git a/analytics/kafka/README.md b/analytics/kafka/README.md new file mode 100644 index 000000000..7b8f16505 --- /dev/null +++ b/analytics/kafka/README.md @@ -0,0 +1,168 @@ +# Kafka on EKS Auto Mode (Strimzi) + +A workshop-ready Apache Kafka cluster running on the Spark-on-EKS Auto Mode cluster, deployed via the [Strimzi](https://strimzi.io/) operator in **KRaft mode** (no ZooKeeper). + +The shipped configuration runs **3 controllers + 3 brokers** with rack-aware placement across availability zones, JMX Prometheus metrics, Cruise Control, and the Kafka Exporter. + +## Why this folder? + +The workshop's Spark labs already use the EKS Auto Mode cluster created by `analytics/terraform/spark-k8s-operator/`. This folder adds an interactive Kafka lab on top of that same cluster — no additional infrastructure required. You install the operator, deploy the Kafka custom resources, and run a smoke test. + +## Prerequisites + +- The Spark-on-EKS workshop cluster is already running (you've completed the bootstrap that the workshop CFN does on your IDE). +- `kubectl` is configured against the cluster: + ```sh + kubectl get nodes + ``` +- The default `gp3` StorageClass exists (the workshop's Terraform already creates it): + ```sh + kubectl get storageclass + ``` + You should see `gp3 (default)` with provisioner `ebs.csi.eks.amazonaws.com`. + +## Layout + +``` +analytics/kafka/ +├── README.md # this file +├── install-strimzi.sh # installs the latest Strimzi operator +├── kafka-cluster.yaml # Kafka CR + KafkaNodePools + JMX ConfigMap +├── deploy-kafka.sh # applies the cluster and waits for Ready +├── cleanup.sh # tears everything down +└── examples/ + ├── hello-test-topic.yaml # 3-partition, 3-replica test topic + └── kafka-rebalance.yaml # Cruise Control rebalance demo +``` + +## Step 1: Install the Strimzi operator + +The Strimzi Cluster Operator is the controller that watches for `Kafka`, `KafkaNodePool`, `KafkaTopic`, and other Strimzi CRs and provisions the corresponding workloads. + +```sh +./install-strimzi.sh +``` + +This creates the `kafka` namespace and applies the latest Strimzi install bundle directly from `https://strimzi.io/install/latest?namespace=kafka`. The script waits for `deploy/strimzi-cluster-operator` to be Ready (typically under a minute). + +> **Note:** the script uses `kubectl create` rather than `kubectl apply` because the Strimzi install bundle contains CRDs whose annotations exceed Kubernetes' size limit for `last-applied-configuration`. + +## Step 2: Deploy the Kafka cluster + +```sh +./deploy-kafka.sh +``` + +This applies `kafka-cluster.yaml` which contains: + +- A `Kafka` resource named `cluster` (KRaft mode, Kafka 4.0.0, plain + TLS internal listeners, rack-aware) +- A `KafkaNodePool` named `controller` with **3 controller replicas**, 100 Gi gp3 each +- A `KafkaNodePool` named `broker` with **3 broker replicas**, 100 Gi gp3 each +- A `ConfigMap` with the JMX Prometheus exporter rules +- The `entityOperator` (for `KafkaTopic` / `KafkaUser` reconciliation), `cruiseControl`, and `kafkaExporter` all enabled + +Each broker requests 6 CPU / 58 Gi memory (limit 8 CPU / 64 Gi). On Auto Mode, this typically pulls **6 × `r5a.4xlarge` instances** (one per broker/controller) over 3 availability zones thanks to the `rack` and `topologySpreadConstraints` settings. The first deploy takes around 3 minutes once Karpenter has provisioned nodes. + +When complete you should see: + +```console +NAME READY METADATA STATE WARNINGS +kafka.kafka.strimzi.io/cluster True +NAME DESIRED REPLICAS ROLES NODEIDS +kafkanodepool.kafka.strimzi.io/broker 3 ["broker"] [0,1,2] +kafkanodepool.kafka.strimzi.io/controller 3 ["controller"] [3,4,5] + +cluster-broker-0 1/1 Running +cluster-broker-1 1/1 Running +cluster-broker-2 1/1 Running +cluster-controller-3 1/1 Running +cluster-controller-4 1/1 Running +cluster-controller-5 1/1 Running +cluster-cruise-control-* 1/1 Running +cluster-entity-operator-* 2/2 Running +cluster-kafka-exporter-* 1/1 Running +``` + +Bootstrap servers in the cluster: + +- Plain: `cluster-kafka-bootstrap.kafka.svc:9092` +- TLS: `cluster-kafka-bootstrap.kafka.svc:9093` + +## Step 3: Smoke test + +Create a topic: + +```sh +kubectl apply -f examples/hello-test-topic.yaml +kubectl get kafkatopic -n kafka +``` + +Produce 3 messages (the `quay.io/strimzi/kafka` image ships with `kafka-console-*` scripts): + +```sh +kubectl run kafka-producer-test -n kafka --rm -i --restart=Never \ + --image=quay.io/strimzi/kafka:latest-kafka-4.0.0 \ + -- bash -c 'echo -e "msg-1\nmsg-2\nmsg-3" | /opt/kafka/bin/kafka-console-producer.sh \ + --bootstrap-server cluster-kafka-bootstrap:9092 --topic hello-test' +``` + +Consume them back: + +```sh +kubectl run kafka-consumer-test -n kafka --rm -i --restart=Never \ + --image=quay.io/strimzi/kafka:latest-kafka-4.0.0 \ + -- bash -c '/opt/kafka/bin/kafka-console-consumer.sh \ + --bootstrap-server cluster-kafka-bootstrap:9092 --topic hello-test \ + --from-beginning --timeout-ms 10000' +``` + +Expected output: + +```console +msg-1 +msg-2 +msg-3 +``` + +## Optional: Cruise Control rebalance + +The `Kafka` CR enables Cruise Control, which can rebalance partitions across brokers. To trigger an analysis: + +```sh +kubectl apply -f examples/kafka-rebalance.yaml +kubectl get kafkarebalance rebalance -n kafka -w +``` + +The CR transitions through `PendingProposal` → `ProposalReady`. When `ProposalReady`, you can approve it: + +```sh +kubectl annotate kafkarebalance rebalance -n kafka \ + strimzi.io/rebalance=approve --overwrite +``` + +It then transitions to `Rebalancing` → `Ready`. + +## Step 4: Tear down + +When you're done with the lab: + +```sh +./cleanup.sh +``` + +This removes all Kafka topics/users, the Kafka CR, the KafkaNodePools, the broker/controller PVCs (broker data is deleted), the Strimzi operator, and the `kafka` namespace. + +## What you learned + +- How to install the Strimzi operator on an existing EKS Auto Mode cluster. +- How to declare a KRaft-mode Kafka cluster with `KafkaNodePool` (the post-StatefulSet abstraction Strimzi uses for elastic broker pools). +- How rack awareness + topology spread constraints + pod anti-affinity combine to spread brokers across AZs. +- How to produce/consume from inside the cluster. +- How Cruise Control fits into a Strimzi-managed Kafka cluster. + +## Customizing + +- **Storage size**: change `spec.storage.volumes[0].size` in each `KafkaNodePool` (default 100 Gi). +- **Broker count**: change `spec.replicas` on the `broker` and `controller` node pools (defaults 3 + 3). Keep controller replicas odd to maintain KRaft quorum. +- **Resource request/limits**: edit `spec.kafka.resources` on the `Kafka` CR. Smaller dev sizes (e.g. 2 CPU / 8 Gi) work fine for hello-world testing and let Karpenter consolidate onto smaller instances. +- **Storage class**: the manifest uses `class: gp3`. To experiment with NVMe-backed local storage, look at the [EC2 Instance Store CSI driver](https://docs.aws.amazon.com/eks/latest/userguide/lis-csi.html) — but note it does **not** work with Auto Mode today. diff --git a/analytics/kafka/cleanup.sh b/analytics/kafka/cleanup.sh new file mode 100755 index 000000000..c640f9800 --- /dev/null +++ b/analytics/kafka/cleanup.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +set -euo pipefail + +NAMESPACE=kafka + +echo "Deleting KafkaTopics, KafkaUsers, the Kafka CR, and KafkaNodePools..." +kubectl delete kafkatopic --all -n "${NAMESPACE}" --ignore-not-found +kubectl delete kafkauser --all -n "${NAMESPACE}" --ignore-not-found +kubectl delete kafka cluster -n "${NAMESPACE}" --ignore-not-found +kubectl delete kafkanodepool --all -n "${NAMESPACE}" --ignore-not-found + +echo "Deleting kafka-metrics ConfigMap..." +kubectl delete configmap kafka-metrics -n "${NAMESPACE}" --ignore-not-found + +echo "" +echo "Waiting up to 3 minutes for broker/controller pods to terminate..." +kubectl wait --for=delete pod -l strimzi.io/cluster=cluster -n "${NAMESPACE}" --timeout=180s || true + +echo "" +echo "Deleting persistent volume claims (broker + controller data)..." +kubectl delete pvc -l strimzi.io/cluster=cluster -n "${NAMESPACE}" --ignore-not-found + +echo "" +echo "Uninstalling Strimzi operator..." +kubectl delete -f "https://strimzi.io/install/latest?namespace=${NAMESPACE}" -n "${NAMESPACE}" --ignore-not-found + +echo "" +echo "Deleting namespace ${NAMESPACE}..." +kubectl delete namespace "${NAMESPACE}" --ignore-not-found + +echo "" +echo "Cleanup complete." diff --git a/analytics/kafka/deploy-kafka.sh b/analytics/kafka/deploy-kafka.sh new file mode 100755 index 000000000..e3b2651d8 --- /dev/null +++ b/analytics/kafka/deploy-kafka.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +NAMESPACE=kafka + +echo "Applying Kafka cluster + KafkaNodePools (controller, broker)..." +kubectl apply -f "${SCRIPT_DIR}/kafka-cluster.yaml" + +echo "" +echo "Waiting for the Kafka cluster to become Ready (3-5 minutes typical)..." +kubectl wait --for=condition=Ready kafka/cluster -n "${NAMESPACE}" --timeout=600s + +echo "" +kubectl get kafka,kafkanodepool -n "${NAMESPACE}" +echo "" +kubectl get pods -n "${NAMESPACE}" +echo "" +echo "Bootstrap servers:" +kubectl get kafka cluster -n "${NAMESPACE}" -o jsonpath='{range .status.listeners[*]} {.name}: {.bootstrapServers}{"\n"}{end}' diff --git a/analytics/kafka/examples/hello-test-topic.yaml b/analytics/kafka/examples/hello-test-topic.yaml new file mode 100644 index 000000000..bb85bfd1b --- /dev/null +++ b/analytics/kafka/examples/hello-test-topic.yaml @@ -0,0 +1,12 @@ +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + name: hello-test + namespace: kafka + labels: + strimzi.io/cluster: cluster +spec: + partitions: 3 + replicas: 3 + config: + retention.ms: 3600000 diff --git a/analytics/kafka/examples/kafka-rebalance.yaml b/analytics/kafka/examples/kafka-rebalance.yaml new file mode 100644 index 000000000..8e0c0c571 --- /dev/null +++ b/analytics/kafka/examples/kafka-rebalance.yaml @@ -0,0 +1,25 @@ +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaRebalance +metadata: + name: rebalance + namespace: kafka + labels: + strimzi.io/cluster: cluster +spec: + goals: + - RackAwareGoal + - ReplicaCapacityGoal + - DiskCapacityGoal + - NetworkInboundCapacityGoal + - NetworkOutboundCapacityGoal + - CpuCapacityGoal + - ReplicaDistributionGoal + - PotentialNwOutGoal + - DiskUsageDistributionGoal + - NetworkInboundUsageDistributionGoal + - NetworkOutboundUsageDistributionGoal + - CpuUsageDistributionGoal + - TopicReplicaDistributionGoal + - LeaderReplicaDistributionGoal + - LeaderBytesInDistributionGoal + - PreferredLeaderElectionGoal diff --git a/analytics/kafka/install-strimzi.sh b/analytics/kafka/install-strimzi.sh new file mode 100755 index 000000000..fae1384f8 --- /dev/null +++ b/analytics/kafka/install-strimzi.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Install latest Strimzi Cluster Operator into the kafka namespace +# Reference: https://strimzi.io/quickstarts/ + +NAMESPACE=kafka + +echo "Creating namespace: ${NAMESPACE}" +kubectl create namespace "${NAMESPACE}" --dry-run=client -o yaml | kubectl apply -f - + +echo "Installing latest Strimzi operator into ${NAMESPACE}..." +# 'kubectl create' is intentional: the install bundle contains CRDs that +# can fail with 'kubectl apply' due to Kubernetes annotation size limits. +kubectl create -f "https://strimzi.io/install/latest?namespace=${NAMESPACE}" -n "${NAMESPACE}" + +echo "" +echo "Waiting for the Strimzi cluster operator to become Ready..." +kubectl -n "${NAMESPACE}" rollout status deploy/strimzi-cluster-operator --timeout=180s + +echo "" +kubectl get pods -n "${NAMESPACE}" +echo "" +echo "Strimzi operator is ready. Next: run ./deploy-kafka.sh" diff --git a/analytics/kafka/kafka-cluster.yaml b/analytics/kafka/kafka-cluster.yaml new file mode 100644 index 000000000..ab6035b45 --- /dev/null +++ b/analytics/kafka/kafka-cluster.yaml @@ -0,0 +1,158 @@ +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: cluster + namespace: kafka + annotations: + strimzi.io/kraft: enabled + strimzi.io/node-pools: enabled +spec: + kafka: + version: 4.0.0 + metadataVersion: 4.0-IV3 + listeners: + - name: plain + port: 9092 + type: internal + tls: false + - name: tls + port: 9093 + type: internal + tls: true + config: + offsets.topic.replication.factor: 3 + transaction.state.log.replication.factor: 3 + transaction.state.log.min.isr: 2 + default.replication.factor: 3 + min.insync.replicas: 2 + resources: + requests: + memory: 58Gi + cpu: "6" + limits: + memory: 64Gi + cpu: "8" + jvmOptions: + "-Xmx": "6g" + "-Xms": "4g" + template: + pod: + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: "strimzi.io/pool-name" + operator: In + values: + - broker + - controller + topologyKey: "kubernetes.io/hostname" + topologySpreadConstraints: + - labelSelector: + matchExpressions: + - key: "strimzi.io/pool-name" + operator: In + values: + - broker + - controller + maxSkew: 1 + topologyKey: topology.kubernetes.io/zone + whenUnsatisfiable: ScheduleAnyway + metricsConfig: + type: jmxPrometheusExporter + valueFrom: + configMapKeyRef: + name: kafka-metrics + key: kafka-metrics-config.yml + rack: + topologyKey: topology.kubernetes.io/zone + entityOperator: + topicOperator: {} + userOperator: {} + cruiseControl: {} + kafkaExporter: + topicRegex: ".*" + groupRegex: ".*" +--- +kind: ConfigMap +apiVersion: v1 +metadata: + name: kafka-metrics + namespace: kafka + labels: + app: strimzi +data: + kafka-metrics-config.yml: | + lowercaseOutputName: true + rules: + - pattern: kafka.server<>Value + name: kafka_server_$1_$2 + type: GAUGE + labels: + clientId: "$3" + topic: "$4" + partition: "$5" + - pattern: kafka.server<>Value + name: kafka_server_$1_$2 + type: GAUGE + labels: + clientId: "$3" + broker: "$4:$5" + - pattern: kafka.(\w+)<>Count + name: kafka_$1_$2_$3_total + type: COUNTER + - pattern: kafka.(\w+)<>Value + name: kafka_$1_$2_$3 + type: GAUGE + - pattern: kafka.(\w+)<>Count + name: kafka_$1_$2_$3_count + type: COUNTER + cruise-control-config.yml: | + lowercaseOutputName: true + rules: + - pattern: kafka.cruisecontrol<>(\w+) + name: kafka_cruisecontrol_$1_$2 + type: GAUGE +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaNodePool +metadata: + name: controller + namespace: kafka + labels: + strimzi.io/cluster: cluster +spec: + replicas: 3 + roles: + - controller + storage: + type: jbod + volumes: + - id: 0 + type: persistent-claim + size: 100Gi + class: gp3 + kraftMetadata: shared + deleteClaim: true +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaNodePool +metadata: + name: broker + namespace: kafka + labels: + strimzi.io/cluster: cluster +spec: + replicas: 3 + roles: + - broker + storage: + type: jbod + volumes: + - id: 0 + type: persistent-claim + size: 100Gi + class: gp3 + kraftMetadata: shared + deleteClaim: true