Skip to content

Commit 8985df4

Browse files
committed
Use PEM certificates loaded from secrets for Kafka
Signed-off-by: Gantigmaa Selenge <tina.selenge@gmail.com>
1 parent d3be1f2 commit 8985df4

16 files changed

Lines changed: 1008 additions & 931 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
* Additional OAuth configuration options have been added for 'oauth' authentication on the listener and the client.
4343
On the listener `clientGrantType` has been added.
4444
On the client `grantType` has been added.
45+
* Kafka nodes are now configured with PEM certificates instead of P12/JKS for keystore and truststore.
46+
4547
### Major changes, deprecations and removals
4648

4749
* Fix RBAC naming for `KafkaMirrorMaker2` to avoid `RoleBinding` collisions when a `KafkaConnect` with the same name exists in the same namespace. `KafkaMirrorMaker2` now uses dedicated `RoleBinding` names.

api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaResources.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,17 @@ public static String clientsCaKeySecretName(String clusterName) {
4949
return clusterName + "-clients-ca";
5050
}
5151

52+
/**
53+
* Get the name of the Kafka role binding given the name of the {@code cluster}.
54+
*
55+
* @param clusterName The cluster name.
56+
*
57+
* @return The name of Kafka role binding.
58+
*/
59+
public static String kafkaRoleBindingName(String clusterName) {
60+
return kafkaComponentName(clusterName) + "-role";
61+
}
62+
5263
////////
5364
// Kafka methods
5465
////////

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java

Lines changed: 44 additions & 44 deletions
Large diffs are not rendered by default.

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java

Lines changed: 75 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,16 @@
3232
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyPeer;
3333
import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget;
3434
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
35+
import io.fabric8.kubernetes.api.model.rbac.PolicyRule;
36+
import io.fabric8.kubernetes.api.model.rbac.PolicyRuleBuilder;
37+
import io.fabric8.kubernetes.api.model.rbac.Role;
38+
import io.fabric8.kubernetes.api.model.rbac.RoleBinding;
3539
import io.fabric8.kubernetes.api.model.rbac.RoleRef;
3640
import io.fabric8.kubernetes.api.model.rbac.RoleRefBuilder;
3741
import io.fabric8.kubernetes.api.model.rbac.Subject;
3842
import io.fabric8.kubernetes.api.model.rbac.SubjectBuilder;
3943
import io.fabric8.openshift.api.model.Route;
4044
import io.fabric8.openshift.api.model.RouteBuilder;
41-
import io.strimzi.api.kafka.model.common.CertAndKeySecretSource;
4245
import io.strimzi.api.kafka.model.common.Condition;
4346
import io.strimzi.api.kafka.model.common.Rack;
4447
import io.strimzi.api.kafka.model.common.metrics.JmxPrometheusExporterMetrics;
@@ -63,6 +66,7 @@
6366
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener;
6467
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationCustom;
6568
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationOAuth;
69+
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationTls;
6670
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType;
6771
import io.strimzi.api.kafka.model.kafka.quotas.QuotasPlugin;
6872
import io.strimzi.api.kafka.model.kafka.quotas.QuotasPluginStrimzi;
@@ -97,6 +101,7 @@
97101
import java.util.ArrayList;
98102
import java.util.Collections;
99103
import java.util.HashMap;
104+
import java.util.HashSet;
100105
import java.util.LinkedHashSet;
101106
import java.util.List;
102107
import java.util.Map;
@@ -142,6 +147,7 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp
142147

143148
protected static final String ENV_VAR_KAFKA_INIT_EXTERNAL_ADDRESS = "EXTERNAL_ADDRESS";
144149
private static final String ENV_VAR_KAFKA_JMX_EXPORTER_ENABLED = "KAFKA_JMX_EXPORTER_ENABLED";
150+
private static final String ENV_VAR_KAFKA_CLUSTER_NAME = "KAFKA_CLUSTER_NAME";
145151
private static final String ENV_VAR_STRIMZI_OPA_AUTHZ_TRUSTED_CERTS = "STRIMZI_OPA_AUTHZ_TRUSTED_CERTS";
146152
private static final String ENV_VAR_STRIMZI_KEYCLOAK_AUTHZ_TRUSTED_CERTS = "STRIMZI_KEYCLOAK_AUTHZ_TRUSTED_CERTS";
147153

@@ -172,12 +178,6 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp
172178
public static final int INGRESS_PORT = 443;
173179

174180
protected static final String KAFKA_NAME = "kafka";
175-
protected static final String CLUSTER_CA_CERTS_VOLUME = "cluster-ca";
176-
protected static final String BROKER_CERTS_VOLUME = "broker-certs";
177-
protected static final String CLIENT_CA_CERTS_VOLUME = "client-ca-cert";
178-
protected static final String CLUSTER_CA_CERTS_VOLUME_MOUNT = "/opt/kafka/cluster-ca-certs";
179-
protected static final String BROKER_CERTS_VOLUME_MOUNT = "/opt/kafka/broker-certs";
180-
protected static final String CLIENT_CA_CERTS_VOLUME_MOUNT = "/opt/kafka/client-ca-certs";
181181
protected static final String TRUSTED_CERTS_BASE_VOLUME_MOUNT = "/opt/kafka/certificates";
182182
protected static final String CUSTOM_AUTHN_SECRETS_VOLUME_MOUNT = "/opt/kafka/custom-authn-secrets";
183183
private static final String LOG_AND_METRICS_CONFIG_VOLUME_NAME = "kafka-metrics-and-logging";
@@ -1368,9 +1368,6 @@ private List<Volume> getNonDataVolumes(boolean isOpenShift, NodeRef node, PodTem
13681368
List<Volume> volumeList = new ArrayList<>();
13691369

13701370
volumeList.add(VolumeUtils.createTempDirVolume(templatePod));
1371-
volumeList.add(VolumeUtils.createSecretVolume(CLUSTER_CA_CERTS_VOLUME, AbstractModel.clusterCaCertSecretName(cluster), isOpenShift));
1372-
volumeList.add(VolumeUtils.createSecretVolume(BROKER_CERTS_VOLUME, node.podName(), isOpenShift));
1373-
volumeList.add(VolumeUtils.createSecretVolume(CLIENT_CA_CERTS_VOLUME, KafkaResources.clientsCaCertificateSecretName(cluster), isOpenShift));
13741371
volumeList.add(VolumeUtils.createConfigMapVolume(LOG_AND_METRICS_CONFIG_VOLUME_NAME, node.podName()));
13751372
volumeList.add(VolumeUtils.createEmptyDirVolume("ready-files", "1Ki", "Memory"));
13761373

@@ -1383,25 +1380,6 @@ private List<Volume> getNonDataVolumes(boolean isOpenShift, NodeRef node, PodTem
13831380

13841381
// Listener specific volumes related to their specific authentication or encryption settings
13851382
for (GenericKafkaListener listener : listeners) {
1386-
if (listener.isTls()
1387-
&& listener.getConfiguration() != null
1388-
&& listener.getConfiguration().getBrokerCertChainAndKey() != null) {
1389-
CertAndKeySecretSource secretSource = listener.getConfiguration().getBrokerCertChainAndKey();
1390-
1391-
Map<String, String> items = new HashMap<>(2);
1392-
items.put(secretSource.getKey(), "tls.key");
1393-
items.put(secretSource.getCertificate(), "tls.crt");
1394-
1395-
volumeList.add(
1396-
VolumeUtils.createSecretVolume(
1397-
"custom-" + ListenersUtils.identifier(listener) + "-certs",
1398-
secretSource.getSecretName(),
1399-
items,
1400-
isOpenShift
1401-
)
1402-
);
1403-
}
1404-
14051383
if (ListenersUtils.isListenerWithOAuth(listener)) {
14061384
KafkaListenerAuthenticationOAuth oauth = (KafkaListenerAuthenticationOAuth) listener.getAuth();
14071385
CertUtils.createTrustedCertificatesVolumes(volumeList, oauth.getTlsTrustedCertificates(), isOpenShift, "oauth-" + ListenersUtils.identifier(listener));
@@ -1460,9 +1438,6 @@ private List<Volume> getPodSetVolumes(NodeRef node, Storage storage, PodTemplate
14601438
private List<VolumeMount> getVolumeMounts(Storage storage, ContainerTemplate containerTemplate, boolean isBroker) {
14611439
List<VolumeMount> volumeMountList = new ArrayList<>(VolumeUtils.createVolumeMounts(storage, false));
14621440
volumeMountList.add(VolumeUtils.createTempDirVolumeMount());
1463-
volumeMountList.add(VolumeUtils.createVolumeMount(CLUSTER_CA_CERTS_VOLUME, CLUSTER_CA_CERTS_VOLUME_MOUNT));
1464-
volumeMountList.add(VolumeUtils.createVolumeMount(BROKER_CERTS_VOLUME, BROKER_CERTS_VOLUME_MOUNT));
1465-
volumeMountList.add(VolumeUtils.createVolumeMount(CLIENT_CA_CERTS_VOLUME, CLIENT_CA_CERTS_VOLUME_MOUNT));
14661441
volumeMountList.add(VolumeUtils.createVolumeMount(LOG_AND_METRICS_CONFIG_VOLUME_NAME, LOG_AND_METRICS_CONFIG_VOLUME_MOUNT));
14671442
volumeMountList.add(VolumeUtils.createVolumeMount("ready-files", "/var/opt/kafka"));
14681443

@@ -1477,12 +1452,6 @@ private List<VolumeMount> getVolumeMounts(Storage storage, ContainerTemplate con
14771452
for (GenericKafkaListener listener : listeners) {
14781453
String identifier = ListenersUtils.identifier(listener);
14791454

1480-
if (listener.isTls()
1481-
&& listener.getConfiguration() != null
1482-
&& listener.getConfiguration().getBrokerCertChainAndKey() != null) {
1483-
volumeMountList.add(VolumeUtils.createVolumeMount("custom-" + identifier + "-certs", "/opt/kafka/certificates/custom-" + identifier + "-certs"));
1484-
}
1485-
14861455
if (ListenersUtils.isListenerWithOAuth(listener)) {
14871456
KafkaListenerAuthenticationOAuth oauth = (KafkaListenerAuthenticationOAuth) listener.getAuth();
14881457
CertUtils.createTrustedCertificatesVolumeMounts(volumeMountList, oauth.getTlsTrustedCertificates(), TRUSTED_CERTS_BASE_VOLUME_MOUNT + "/oauth-" + identifier + "-certs/", "oauth-" + identifier);
@@ -1626,6 +1595,7 @@ private List<EnvVar> getEnvVars(KafkaPool pool) {
16261595
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_JMX_EXPORTER_ENABLED,
16271596
String.valueOf(metrics instanceof JmxPrometheusExporterModel)));
16281597
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_KAFKA_GC_LOG_ENABLED, String.valueOf(pool.gcLoggingEnabled)));
1598+
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_CLUSTER_NAME, cluster));
16291599

16301600
JvmOptionUtils.heapOptions(varList, 50, 5L * 1024L * 1024L * 1024L, pool.jvmOptions, pool.resources);
16311601
JvmOptionUtils.jvmPerformanceOptions(varList, pool.jvmOptions);
@@ -1699,6 +1669,61 @@ public ClusterRoleBinding generateClusterRoleBinding(String assemblyNamespace) {
16991669
}
17001670
}
17011671

1672+
/**
1673+
* Creates a Role for reading TLS certificate secrets in the same namespace as the resource.
1674+
* This is used for loading certificates from secrets directly.
1675+
**
1676+
* @return role for the Kafka Cluster
1677+
*/
1678+
public Role generateRole() {
1679+
Set<String> certSecretNames = new HashSet<>();
1680+
certSecretNames.add(KafkaResources.clusterCaCertificateSecretName(cluster));
1681+
certSecretNames.addAll(nodes().stream().map(NodeRef::podName).toList());
1682+
1683+
for (GenericKafkaListener listener : listeners) {
1684+
if (listener.isTls()) {
1685+
if (listener.getConfiguration() != null && listener.getConfiguration().getBrokerCertChainAndKey() != null) {
1686+
certSecretNames.add(listener.getConfiguration().getBrokerCertChainAndKey().getSecretName());
1687+
}
1688+
}
1689+
1690+
if (listener.getAuth() instanceof KafkaListenerAuthenticationTls) {
1691+
certSecretNames.add(KafkaResources.clientsCaCertificateSecretName(cluster));
1692+
}
1693+
}
1694+
1695+
List<PolicyRule> rules = List.of(new PolicyRuleBuilder()
1696+
.withApiGroups("")
1697+
.withResources("secrets")
1698+
.withVerbs("get")
1699+
.withResourceNames(certSecretNames.stream().toList())
1700+
.build());
1701+
1702+
return RbacUtils.createRole(componentName, namespace, rules, labels, ownerReference, null);
1703+
}
1704+
1705+
/**
1706+
* Generates the Kafka Cluster Role Binding
1707+
*
1708+
* @return Role Binding for the Kafka Cluster
1709+
*/
1710+
public RoleBinding generateRoleBindingForRole() {
1711+
Subject subject = new SubjectBuilder()
1712+
.withKind("ServiceAccount")
1713+
.withName(componentName)
1714+
.withNamespace(namespace)
1715+
.build();
1716+
1717+
RoleRef roleRef = new RoleRefBuilder()
1718+
.withName(componentName)
1719+
.withApiGroup("rbac.authorization.k8s.io")
1720+
.withKind("Role")
1721+
.build();
1722+
1723+
return RbacUtils
1724+
.createRoleBinding(KafkaResources.kafkaRoleBindingName(cluster), namespace, roleRef, List.of(subject), labels, ownerReference, null);
1725+
}
1726+
17021727
/**
17031728
* Generates the NetworkPolicies relevant for Kafka brokers
17041729
*
@@ -1828,7 +1853,6 @@ private String generatePerBrokerConfiguration(NodeRef node, KafkaPool pool, Map<
18281853
.withKRaftMetadataLogDir(VolumeUtils.kraftMetadataPath(pool.storage))
18291854
.withLogDirs(VolumeUtils.createVolumeMounts(pool.storage, false))
18301855
.withListeners(cluster,
1831-
kafkaVersion,
18321856
namespace,
18331857
listeners,
18341858
listenerId -> advertisedHostnames.get(node.nodeId()).get(listenerId),
@@ -1896,6 +1920,18 @@ public List<ConfigMap> generatePerBrokerConfigurationConfigMaps(MetricsAndLoggin
18961920
return configMaps;
18971921
}
18981922

1923+
/**
1924+
* Generates a Secret with the given name and data in Kafka Cluster's namespace
1925+
*
1926+
* @param secretData Secret data
1927+
* @param secretName Secret name
1928+
*
1929+
* @return Secret that is generated
1930+
*/
1931+
public Secret generateSecret(Map<String, String> secretData, String secretName) {
1932+
return ModelUtils.createSecret(secretName, namespace, labels, ownerReference, secretData, Map.of(), Map.of());
1933+
}
1934+
18991935
/**
19001936
* @return Kafka version
19011937
*/

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@
5959
import io.strimzi.operator.cluster.operator.resource.kubernetes.PodDisruptionBudgetOperator;
6060
import io.strimzi.operator.cluster.operator.resource.kubernetes.PodOperator;
6161
import io.strimzi.operator.cluster.operator.resource.kubernetes.PvcOperator;
62+
import io.strimzi.operator.cluster.operator.resource.kubernetes.RoleBindingOperator;
63+
import io.strimzi.operator.cluster.operator.resource.kubernetes.RoleOperator;
6264
import io.strimzi.operator.cluster.operator.resource.kubernetes.RouteOperator;
6365
import io.strimzi.operator.cluster.operator.resource.kubernetes.SecretOperator;
6466
import io.strimzi.operator.cluster.operator.resource.kubernetes.ServiceAccountOperator;
@@ -143,6 +145,8 @@ public class KafkaReconciler {
143145
private final PodDisruptionBudgetOperator podDisruptionBudgetOperator;
144146
private final PodOperator podOperator;
145147
private final ClusterRoleBindingOperator clusterRoleBindingOperator;
148+
private final RoleOperator roleOperator;
149+
private final RoleBindingOperator roleBindingOperator;
146150
private final RouteOperator routeOperator;
147151
private final IngressOperator ingressOperator;
148152
private final NodeOperator nodeOperator;
@@ -218,6 +222,8 @@ public KafkaReconciler(
218222
this.podDisruptionBudgetOperator = supplier.podDisruptionBudgetOperator;
219223
this.podOperator = supplier.podOperations;
220224
this.clusterRoleBindingOperator = supplier.clusterRoleBindingOperator;
225+
this.roleBindingOperator = supplier.roleBindingOperations;
226+
this.roleOperator = supplier.roleOperations;
221227
this.routeOperator = supplier.routeOperations;
222228
this.ingressOperator = supplier.ingressOperations;
223229
this.nodeOperator = supplier.nodeOperator;
@@ -248,6 +254,8 @@ public Future<Void> reconcile(KafkaStatus kafkaStatus, Clock clock) {
248254
.compose(i -> pvcs(kafkaStatus))
249255
.compose(i -> serviceAccount())
250256
.compose(i -> initClusterRoleBinding())
257+
.compose(i -> kafkaRole())
258+
.compose(i -> kafkaRoleBinding())
251259
.compose(i -> scaleDown())
252260
.compose(i -> updateNodePoolStatuses(kafkaStatus))
253261
.compose(i -> listeners())
@@ -537,6 +545,40 @@ protected Future<Void> initClusterRoleBinding() {
537545
).mapEmpty();
538546
}
539547

548+
/**
549+
* Manages the Kafka cluster role. When the desired Cluster Role Binding is null, and we get an RBAC error,
550+
* we ignore it. This is to allow users to run the operator only inside a namespace when no features requiring
551+
* Cluster Role are needed.
552+
*
553+
* @return Completes when the Cluster Role was successfully created or updated
554+
*/
555+
protected Future<Void> kafkaRole() {
556+
return roleOperator
557+
.reconcile(
558+
reconciliation,
559+
reconciliation.namespace(),
560+
kafka.getComponentName(),
561+
kafka.generateRole()
562+
).mapEmpty();
563+
}
564+
565+
/**
566+
* Manages the Kafka cluster role binding. When the desired Cluster Role Binding is null, and we get an RBAC error,
567+
* we ignore it. This is to allow users to run the operator only inside a namespace when no features requiring
568+
* Cluster Role Bindings are needed.
569+
*
570+
* @return Completes when the Cluster Role Binding was successfully created or updated
571+
*/
572+
protected Future<Void> kafkaRoleBinding() {
573+
return roleBindingOperator
574+
.reconcile(
575+
reconciliation,
576+
reconciliation.namespace(),
577+
KafkaResources.kafkaRoleBindingName(reconciliation.name()),
578+
kafka.generateRoleBindingForRole())
579+
.mapEmpty();
580+
}
581+
540582
/**
541583
* Scales down the Kafka cluster if needed. Kafka scale-down is done in one go.
542584
*
@@ -878,7 +920,7 @@ private Future<Void> waitForNewNodes() {
878920
}
879921

880922
/**
881-
* Roles the Kafka brokers (if needed).
923+
* Rolls the Kafka brokers (if needed).
882924
*
883925
* @param podSetDiffs Map with the PodSet reconciliation results
884926
*

0 commit comments

Comments
 (0)