Skip to content

Commit 6c17f27

Browse files
authored
Use PEM certificates loaded from secrets for Kafka (#11447)
Signed-off-by: Gantigmaa Selenge <tina.selenge@gmail.com>
1 parent 3b85d2e commit 6c17f27

17 files changed

Lines changed: 1139 additions & 960 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
If you want to deploy and run the Heartbeat connector, you can use separate `KafkaConnect` and `KafkaConnector` custom resources.
2828
* The `.spec.build.output.additionalKanikoOptions` field in the `KafkaConnect` custom resource is deprecated and will be removed in the future.
2929
* Use `.spec.build.output.additionalBuildOptions` field instead.
30+
* Kafka nodes are now configured with PEM certificates instead of P12/JKS for keystore and truststore.
3031

3132
## 0.48.0
3233

@@ -50,6 +51,7 @@
5051
* Additional OAuth configuration options have been added for 'oauth' authentication on the listener and the client.
5152
On the listener `clientGrantType` has been added.
5253
On the client `grantType` has been added.
54+
5355
### Major changes, deprecations and removals
5456

5557
* Fix RBAC naming for `KafkaMirrorMaker2` to avoid `RoleBinding` collisions when a `KafkaConnect` with the same name exists in the same namespace. `KafkaMirrorMaker2` now uses dedicated `RoleBinding` names.

api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaResources.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,17 @@ public static String clientsCaKeySecretName(String clusterName) {
4949
return clusterName + "-clients-ca";
5050
}
5151

52+
/**
53+
* Get the name of the Kafka role binding given the name of the {@code cluster}.
54+
*
55+
* @param clusterName The cluster name.
56+
*
57+
* @return The name of Kafka role binding.
58+
*/
59+
public static String kafkaRoleBindingName(String clusterName) {
60+
return kafkaComponentName(clusterName) + "-role";
61+
}
62+
5263
////////
5364
// Kafka methods
5465
////////

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java

Lines changed: 44 additions & 44 deletions
Large diffs are not rendered by default.

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java

Lines changed: 75 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,16 @@
3232
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyPeer;
3333
import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget;
3434
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
35+
import io.fabric8.kubernetes.api.model.rbac.PolicyRule;
36+
import io.fabric8.kubernetes.api.model.rbac.PolicyRuleBuilder;
37+
import io.fabric8.kubernetes.api.model.rbac.Role;
38+
import io.fabric8.kubernetes.api.model.rbac.RoleBinding;
3539
import io.fabric8.kubernetes.api.model.rbac.RoleRef;
3640
import io.fabric8.kubernetes.api.model.rbac.RoleRefBuilder;
3741
import io.fabric8.kubernetes.api.model.rbac.Subject;
3842
import io.fabric8.kubernetes.api.model.rbac.SubjectBuilder;
3943
import io.fabric8.openshift.api.model.Route;
4044
import io.fabric8.openshift.api.model.RouteBuilder;
41-
import io.strimzi.api.kafka.model.common.CertAndKeySecretSource;
4245
import io.strimzi.api.kafka.model.common.Condition;
4346
import io.strimzi.api.kafka.model.common.Rack;
4447
import io.strimzi.api.kafka.model.common.metrics.JmxPrometheusExporterMetrics;
@@ -63,6 +66,7 @@
6366
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener;
6467
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationCustom;
6568
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationOAuth;
69+
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationTls;
6670
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType;
6771
import io.strimzi.api.kafka.model.kafka.quotas.QuotasPlugin;
6872
import io.strimzi.api.kafka.model.kafka.quotas.QuotasPluginStrimzi;
@@ -97,6 +101,7 @@
97101
import java.util.ArrayList;
98102
import java.util.Collections;
99103
import java.util.HashMap;
104+
import java.util.HashSet;
100105
import java.util.LinkedHashSet;
101106
import java.util.List;
102107
import java.util.Map;
@@ -142,6 +147,7 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp
142147

143148
protected static final String ENV_VAR_KAFKA_INIT_EXTERNAL_ADDRESS = "EXTERNAL_ADDRESS";
144149
private static final String ENV_VAR_KAFKA_JMX_EXPORTER_ENABLED = "KAFKA_JMX_EXPORTER_ENABLED";
150+
private static final String ENV_VAR_KAFKA_CLUSTER_NAME = "KAFKA_CLUSTER_NAME";
145151
private static final String ENV_VAR_STRIMZI_OPA_AUTHZ_TRUSTED_CERTS = "STRIMZI_OPA_AUTHZ_TRUSTED_CERTS";
146152
private static final String ENV_VAR_STRIMZI_KEYCLOAK_AUTHZ_TRUSTED_CERTS = "STRIMZI_KEYCLOAK_AUTHZ_TRUSTED_CERTS";
147153

@@ -172,12 +178,6 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp
172178
public static final int INGRESS_PORT = 443;
173179

174180
protected static final String KAFKA_NAME = "kafka";
175-
protected static final String CLUSTER_CA_CERTS_VOLUME = "cluster-ca";
176-
protected static final String BROKER_CERTS_VOLUME = "broker-certs";
177-
protected static final String CLIENT_CA_CERTS_VOLUME = "client-ca-cert";
178-
protected static final String CLUSTER_CA_CERTS_VOLUME_MOUNT = "/opt/kafka/cluster-ca-certs";
179-
protected static final String BROKER_CERTS_VOLUME_MOUNT = "/opt/kafka/broker-certs";
180-
protected static final String CLIENT_CA_CERTS_VOLUME_MOUNT = "/opt/kafka/client-ca-certs";
181181
protected static final String TRUSTED_CERTS_BASE_VOLUME_MOUNT = "/opt/kafka/certificates";
182182
protected static final String CUSTOM_AUTHN_SECRETS_VOLUME_MOUNT = "/opt/kafka/custom-authn-secrets";
183183
private static final String LOG_AND_METRICS_CONFIG_VOLUME_NAME = "kafka-metrics-and-logging";
@@ -1363,9 +1363,6 @@ private List<Volume> getNonDataVolumes(boolean isOpenShift, NodeRef node, PodTem
13631363
List<Volume> volumeList = new ArrayList<>();
13641364

13651365
volumeList.add(VolumeUtils.createTempDirVolume(templatePod));
1366-
volumeList.add(VolumeUtils.createSecretVolume(CLUSTER_CA_CERTS_VOLUME, AbstractModel.clusterCaCertSecretName(cluster), isOpenShift));
1367-
volumeList.add(VolumeUtils.createSecretVolume(BROKER_CERTS_VOLUME, node.podName(), isOpenShift));
1368-
volumeList.add(VolumeUtils.createSecretVolume(CLIENT_CA_CERTS_VOLUME, KafkaResources.clientsCaCertificateSecretName(cluster), isOpenShift));
13691366
volumeList.add(VolumeUtils.createConfigMapVolume(LOG_AND_METRICS_CONFIG_VOLUME_NAME, node.podName()));
13701367
volumeList.add(VolumeUtils.createEmptyDirVolume("ready-files", "1Ki", "Memory"));
13711368

@@ -1378,25 +1375,6 @@ private List<Volume> getNonDataVolumes(boolean isOpenShift, NodeRef node, PodTem
13781375

13791376
// Listener specific volumes related to their specific authentication or encryption settings
13801377
for (GenericKafkaListener listener : listeners) {
1381-
if (listener.isTls()
1382-
&& listener.getConfiguration() != null
1383-
&& listener.getConfiguration().getBrokerCertChainAndKey() != null) {
1384-
CertAndKeySecretSource secretSource = listener.getConfiguration().getBrokerCertChainAndKey();
1385-
1386-
Map<String, String> items = new HashMap<>(2);
1387-
items.put(secretSource.getKey(), "tls.key");
1388-
items.put(secretSource.getCertificate(), "tls.crt");
1389-
1390-
volumeList.add(
1391-
VolumeUtils.createSecretVolume(
1392-
"custom-" + ListenersUtils.identifier(listener) + "-certs",
1393-
secretSource.getSecretName(),
1394-
items,
1395-
isOpenShift
1396-
)
1397-
);
1398-
}
1399-
14001378
if (ListenersUtils.isListenerWithOAuth(listener)) {
14011379
KafkaListenerAuthenticationOAuth oauth = (KafkaListenerAuthenticationOAuth) listener.getAuth();
14021380
CertUtils.createTrustedCertificatesVolumes(volumeList, oauth.getTlsTrustedCertificates(), isOpenShift, "oauth-" + ListenersUtils.identifier(listener));
@@ -1455,9 +1433,6 @@ private List<Volume> getPodSetVolumes(NodeRef node, Storage storage, PodTemplate
14551433
private List<VolumeMount> getVolumeMounts(Storage storage, ContainerTemplate containerTemplate, boolean isBroker) {
14561434
List<VolumeMount> volumeMountList = new ArrayList<>(VolumeUtils.createVolumeMounts(storage, false));
14571435
volumeMountList.add(VolumeUtils.createTempDirVolumeMount());
1458-
volumeMountList.add(VolumeUtils.createVolumeMount(CLUSTER_CA_CERTS_VOLUME, CLUSTER_CA_CERTS_VOLUME_MOUNT));
1459-
volumeMountList.add(VolumeUtils.createVolumeMount(BROKER_CERTS_VOLUME, BROKER_CERTS_VOLUME_MOUNT));
1460-
volumeMountList.add(VolumeUtils.createVolumeMount(CLIENT_CA_CERTS_VOLUME, CLIENT_CA_CERTS_VOLUME_MOUNT));
14611436
volumeMountList.add(VolumeUtils.createVolumeMount(LOG_AND_METRICS_CONFIG_VOLUME_NAME, LOG_AND_METRICS_CONFIG_VOLUME_MOUNT));
14621437
volumeMountList.add(VolumeUtils.createVolumeMount("ready-files", "/var/opt/kafka"));
14631438

@@ -1472,12 +1447,6 @@ private List<VolumeMount> getVolumeMounts(Storage storage, ContainerTemplate con
14721447
for (GenericKafkaListener listener : listeners) {
14731448
String identifier = ListenersUtils.identifier(listener);
14741449

1475-
if (listener.isTls()
1476-
&& listener.getConfiguration() != null
1477-
&& listener.getConfiguration().getBrokerCertChainAndKey() != null) {
1478-
volumeMountList.add(VolumeUtils.createVolumeMount("custom-" + identifier + "-certs", "/opt/kafka/certificates/custom-" + identifier + "-certs"));
1479-
}
1480-
14811450
if (ListenersUtils.isListenerWithOAuth(listener)) {
14821451
KafkaListenerAuthenticationOAuth oauth = (KafkaListenerAuthenticationOAuth) listener.getAuth();
14831452
CertUtils.createTrustedCertificatesVolumeMounts(volumeMountList, oauth.getTlsTrustedCertificates(), TRUSTED_CERTS_BASE_VOLUME_MOUNT + "/oauth-" + identifier + "-certs/", "oauth-" + identifier);
@@ -1621,6 +1590,7 @@ private List<EnvVar> getEnvVars(KafkaPool pool) {
16211590
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_JMX_EXPORTER_ENABLED,
16221591
String.valueOf(metrics instanceof JmxPrometheusExporterModel)));
16231592
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_KAFKA_GC_LOG_ENABLED, String.valueOf(pool.gcLoggingEnabled)));
1593+
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_CLUSTER_NAME, cluster));
16241594

16251595
JvmOptionUtils.heapOptions(varList, 50, 5L * 1024L * 1024L * 1024L, pool.jvmOptions, pool.resources);
16261596
JvmOptionUtils.jvmPerformanceOptions(varList, pool.jvmOptions);
@@ -1694,6 +1664,61 @@ public ClusterRoleBinding generateClusterRoleBinding(String assemblyNamespace) {
16941664
}
16951665
}
16961666

1667+
/**
1668+
* Creates a Role for reading TLS certificate secrets in the same namespace as the resource.
1669+
* This is used for loading certificates from secrets directly.
1670+
**
1671+
* @return role for the Kafka Cluster
1672+
*/
1673+
public Role generateRole() {
1674+
Set<String> certSecretNames = new HashSet<>();
1675+
certSecretNames.add(KafkaResources.clusterCaCertificateSecretName(cluster));
1676+
certSecretNames.addAll(nodes().stream().map(NodeRef::podName).toList());
1677+
1678+
for (GenericKafkaListener listener : listeners) {
1679+
if (listener.isTls()) {
1680+
if (listener.getConfiguration() != null && listener.getConfiguration().getBrokerCertChainAndKey() != null) {
1681+
certSecretNames.add(listener.getConfiguration().getBrokerCertChainAndKey().getSecretName());
1682+
}
1683+
}
1684+
1685+
if (listener.getAuth() instanceof KafkaListenerAuthenticationTls) {
1686+
certSecretNames.add(KafkaResources.clientsCaCertificateSecretName(cluster));
1687+
}
1688+
}
1689+
1690+
List<PolicyRule> rules = List.of(new PolicyRuleBuilder()
1691+
.withApiGroups("")
1692+
.withResources("secrets")
1693+
.withVerbs("get")
1694+
.withResourceNames(certSecretNames.stream().toList())
1695+
.build());
1696+
1697+
return RbacUtils.createRole(componentName, namespace, rules, labels, ownerReference, null);
1698+
}
1699+
1700+
/**
1701+
* Generates the Kafka Cluster Role Binding
1702+
*
1703+
* @return Role Binding for the Kafka Cluster
1704+
*/
1705+
public RoleBinding generateRoleBindingForRole() {
1706+
Subject subject = new SubjectBuilder()
1707+
.withKind("ServiceAccount")
1708+
.withName(componentName)
1709+
.withNamespace(namespace)
1710+
.build();
1711+
1712+
RoleRef roleRef = new RoleRefBuilder()
1713+
.withName(componentName)
1714+
.withApiGroup("rbac.authorization.k8s.io")
1715+
.withKind("Role")
1716+
.build();
1717+
1718+
return RbacUtils
1719+
.createRoleBinding(KafkaResources.kafkaRoleBindingName(cluster), namespace, roleRef, List.of(subject), labels, ownerReference, null);
1720+
}
1721+
16971722
/**
16981723
* Generates the NetworkPolicies relevant for Kafka brokers
16991724
*
@@ -1823,7 +1848,6 @@ private String generatePerBrokerConfiguration(NodeRef node, KafkaPool pool, Map<
18231848
.withKRaftMetadataLogDir(VolumeUtils.kraftMetadataPath(pool.storage))
18241849
.withLogDirs(VolumeUtils.createVolumeMounts(pool.storage, false))
18251850
.withListeners(cluster,
1826-
kafkaVersion,
18271851
namespace,
18281852
listeners,
18291853
listenerId -> advertisedHostnames.get(node.nodeId()).get(listenerId),
@@ -1881,6 +1905,18 @@ public List<ConfigMap> generatePerBrokerConfigurationConfigMaps(MetricsAndLoggin
18811905
return configMaps;
18821906
}
18831907

1908+
/**
1909+
* Generates a Secret with the given name and data in Kafka Cluster's namespace
1910+
*
1911+
* @param secretData Secret data
1912+
* @param secretName Secret name
1913+
*
1914+
* @return Secret that is generated
1915+
*/
1916+
public Secret generateSecret(Map<String, String> secretData, String secretName) {
1917+
return ModelUtils.createSecret(secretName, namespace, labels, ownerReference, secretData, Map.of(), Map.of());
1918+
}
1919+
18841920
/**
18851921
* @return Kafka version
18861922
*/

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@
5959
import io.strimzi.operator.cluster.operator.resource.kubernetes.PodDisruptionBudgetOperator;
6060
import io.strimzi.operator.cluster.operator.resource.kubernetes.PodOperator;
6161
import io.strimzi.operator.cluster.operator.resource.kubernetes.PvcOperator;
62+
import io.strimzi.operator.cluster.operator.resource.kubernetes.RoleBindingOperator;
63+
import io.strimzi.operator.cluster.operator.resource.kubernetes.RoleOperator;
6264
import io.strimzi.operator.cluster.operator.resource.kubernetes.RouteOperator;
6365
import io.strimzi.operator.cluster.operator.resource.kubernetes.SecretOperator;
6466
import io.strimzi.operator.cluster.operator.resource.kubernetes.ServiceAccountOperator;
@@ -143,6 +145,8 @@ public class KafkaReconciler {
143145
private final PodDisruptionBudgetOperator podDisruptionBudgetOperator;
144146
private final PodOperator podOperator;
145147
private final ClusterRoleBindingOperator clusterRoleBindingOperator;
148+
private final RoleOperator roleOperator;
149+
private final RoleBindingOperator roleBindingOperator;
146150
private final RouteOperator routeOperator;
147151
private final IngressOperator ingressOperator;
148152
private final NodeOperator nodeOperator;
@@ -218,6 +222,8 @@ public KafkaReconciler(
218222
this.podDisruptionBudgetOperator = supplier.podDisruptionBudgetOperator;
219223
this.podOperator = supplier.podOperations;
220224
this.clusterRoleBindingOperator = supplier.clusterRoleBindingOperator;
225+
this.roleBindingOperator = supplier.roleBindingOperations;
226+
this.roleOperator = supplier.roleOperations;
221227
this.routeOperator = supplier.routeOperations;
222228
this.ingressOperator = supplier.ingressOperations;
223229
this.nodeOperator = supplier.nodeOperator;
@@ -248,6 +254,8 @@ public Future<Void> reconcile(KafkaStatus kafkaStatus, Clock clock) {
248254
.compose(i -> pvcs(kafkaStatus))
249255
.compose(i -> serviceAccount())
250256
.compose(i -> initClusterRoleBinding())
257+
.compose(i -> kafkaRole())
258+
.compose(i -> kafkaRoleBinding())
251259
.compose(i -> scaleDown())
252260
.compose(i -> updateNodePoolStatuses(kafkaStatus))
253261
.compose(i -> listeners())
@@ -537,6 +545,39 @@ protected Future<Void> initClusterRoleBinding() {
537545
).mapEmpty();
538546
}
539547

548+
/**
549+
* Manages the Kafka role. This Role is always created and lives in
550+
* the same namespace as the Kafka Cluster resource. This is used to load
551+
* certificates from secrets directly.
552+
*
553+
* @return Completes when the Role was successfully created or updated
554+
*/
555+
protected Future<Void> kafkaRole() {
556+
return roleOperator
557+
.reconcile(
558+
reconciliation,
559+
reconciliation.namespace(),
560+
kafka.getComponentName(),
561+
kafka.generateRole()
562+
).mapEmpty();
563+
}
564+
565+
/**
566+
* Manages the Kafka Role Bindings.
567+
* The Role Binding is in the namespace where the Kafka Cluster resource exists.
568+
*
569+
* @return Completes when the Role Binding was successfully created or updated
570+
*/
571+
protected Future<Void> kafkaRoleBinding() {
572+
return roleBindingOperator
573+
.reconcile(
574+
reconciliation,
575+
reconciliation.namespace(),
576+
KafkaResources.kafkaRoleBindingName(reconciliation.name()),
577+
kafka.generateRoleBindingForRole())
578+
.mapEmpty();
579+
}
580+
540581
/**
541582
* Scales down the Kafka cluster if needed. Kafka scale-down is done in one go.
542583
*
@@ -877,7 +918,7 @@ private Future<Void> waitForNewNodes() {
877918
}
878919

879920
/**
880-
* Roles the Kafka brokers (if needed).
921+
* Rolls the Kafka brokers (if needed).
881922
*
882923
* @param podSetDiffs Map with the PodSet reconciliation results
883924
*

0 commit comments

Comments
 (0)