Skip to content

Commit dce9b79

Browse files
committed
WIP: Add cert-manager support to Kafka Exporter
Signed-off-by: Kate Stanley <11195226+katheris@users.noreply.github.com>
1 parent a1f68e7 commit dce9b79

6 files changed

Lines changed: 624 additions & 15 deletions

File tree

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CertManagerUtils.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,26 @@ public static Certificate buildCertManagerCertificate(String namespace,
107107
* Used when cert-manager has issued the CA Secret.
108108
*
109109
* @param clusterCa The Cluster CA
110+
* @param certManagerSecret cert-manager Secret containing the Cluster CA
111+
* @param namespace Namespace for the Secret
112+
* @param secretName Name of the Secret
113+
* @param keyCertName Key under which the certificate will be stored in the new Secret
114+
* @param labels Labels
115+
* @param ownerReference Owner reference
116+
* @return Newly built Secret
117+
*/
118+
public static Secret buildTrustedCertificateSecretFromCertManager(ClusterCa clusterCa, Secret certManagerSecret, String namespace,
119+
String secretName, String keyCertName, Labels labels,
120+
OwnerReference ownerReference) {
121+
// TO, UO, KE etc do not track the clientsCa generation annotation on their Secret, so pass null
122+
return buildTrustedCertificateSecretFromCertManager(clusterCa, null, certManagerSecret, namespace, secretName, keyCertName, labels, ownerReference);
123+
}
124+
125+
/**
126+
* Builds a certificate Secret based on the cert-manager provided Secret.
127+
* Used when cert-manager has issued the CA Secret.
128+
*
129+
* @param clusterCa The Cluster CA
110130
* @param clientsCa The Clients CA. If this is not null the Clients CA generation is added
111131
* @param certManagerSecret cert-manager Secret containing the Cluster CA
112132
* @param namespace Namespace for the Secret

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaExporter.java

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
*/
55
package io.strimzi.operator.cluster.model;
66

7+
import io.fabric8.certmanager.api.model.v1.Certificate;
78
import io.fabric8.kubernetes.api.model.Container;
89
import io.fabric8.kubernetes.api.model.ContainerPort;
910
import io.fabric8.kubernetes.api.model.EnvVar;
@@ -33,14 +34,19 @@
3334
import io.strimzi.operator.cluster.model.securityprofiles.PodSecurityProviderContextImpl;
3435
import io.strimzi.operator.common.Reconciliation;
3536
import io.strimzi.operator.common.Util;
37+
import io.strimzi.operator.common.auth.PemTrustSet;
38+
import io.strimzi.operator.common.model.Ca;
3639
import io.strimzi.plugin.security.profiles.PodSecurityProviderContext;
3740

41+
import java.security.cert.CertificateException;
42+
import java.security.cert.X509Certificate;
3843
import java.util.ArrayList;
3944
import java.util.HashMap;
4045
import java.util.List;
4146
import java.util.Map;
4247

4348
import static io.strimzi.api.kafka.model.common.template.DeploymentStrategy.ROLLING_UPDATE;
49+
import static io.strimzi.operator.common.model.Ca.x509Certificate;
4450

4551
/**
4652
* Kafka Exporter model
@@ -283,9 +289,25 @@ private List<VolumeMount> getVolumeMounts() {
283289
return volumeList;
284290
}
285291

292+
/**
293+
* Creates the Certificate resource for the Kafka Exporter used when cert-manager is issuing certificates
294+
*
295+
* @param clusterCa The CA for cluster certificates
296+
*
297+
* @return List of Certificate resources
298+
*/
299+
public Certificate generateCertificateResource(ClusterCa clusterCa) {
300+
return CertManagerUtils.buildCertManagerCertificate(namespace,
301+
KafkaExporterResources.secretName(cluster),
302+
clusterCa.getCertManagerCert(componentName, Ca.IO_STRIMZI),
303+
labels,
304+
ownerReference);
305+
}
306+
286307
/**
287308
* Generate the Secret containing the Kafka Exporter certificate signed by the cluster CA certificate used for TLS based
288309
* internal communication with Kafka. It also contains the related Kafka Exporter private key.
310+
* Used when Strimzi is issuing certificates.
289311
*
290312
* @param clusterCa The cluster CA.
291313
* @param existingSecret The existing secret with Kafka certificates
@@ -294,11 +316,60 @@ private List<VolumeMount> getVolumeMounts() {
294316
*
295317
* @return The generated Secret.
296318
*/
297-
public Secret generateCertificatesSecret(ClusterCa clusterCa, Secret existingSecret, boolean isMaintenanceTimeWindowsSatisfied) {
319+
public Secret generateCertificatesSecretForStrimziCa(ClusterCa clusterCa, Secret existingSecret, boolean isMaintenanceTimeWindowsSatisfied) {
298320
return CertUtils.buildTrustedCertificateSecret(reconciliation, clusterCa, existingSecret, namespace, KafkaExporterResources.secretName(cluster), componentName,
299321
COMPONENT_TYPE, labels, ownerReference, isMaintenanceTimeWindowsSatisfied);
300322
}
301323

324+
/**
325+
* Generate the Secret containing the Kafka Exporter certificate signed by the cluster CA certificate used for TLS based
326+
* internal communication with Kafka. It also contains the related Kafka Exporter private key.
327+
* Used when cert-manager is issuing certificates.
328+
*
329+
* @param clusterCa The cluster CA.
330+
* @param existingSecret Existing Secret.
331+
* @param certManagerSecret Secret managed by cert-manager, may be null.
332+
* @param pemTrustSet Trust set to use to determine if new certificates are trusted
333+
*
334+
* @return The generated Secret.
335+
*/
336+
public Secret generateCertificatesSecretForCertManagerCA(ClusterCa clusterCa, Secret existingSecret, Secret certManagerSecret, PemTrustSet pemTrustSet) {
337+
Secret newSecret = CertManagerUtils.buildTrustedCertificateSecretFromCertManager(clusterCa, certManagerSecret, namespace, KafkaExporterResources.secretName(cluster),
338+
COMPONENT_TYPE, labels, ownerReference);
339+
if (existingSecret == null) {
340+
return newSecret;
341+
} else if (CertManagerUtils.certManagerCertUpdated(existingSecret, newSecret)) {
342+
if (certManagerSecretNotTrusted(pemTrustSet, existingSecret)) {
343+
LOGGER.infoCr(reconciliation, "New certificate for Kafka Exporter, but not trusted yet so keeping existing certificate Secret.");
344+
return existingSecret;
345+
} else {
346+
LOGGER.infoCr(reconciliation, "New certificate for Kafka Exporter, updating Secret {}/{}", namespace, existingSecret.getMetadata().getName());
347+
return newSecret;
348+
}
349+
} else {
350+
// Certificate has no changed
351+
return existingSecret;
352+
}
353+
}
354+
355+
/**
356+
* Checks if the cert-manager Secret is trusted by the current CA cert
357+
*
358+
* @param certManagerSecret Secret containing cert-manager provided cert
359+
* @return Whether the cert is trusted
360+
*/
361+
private boolean certManagerSecretNotTrusted(PemTrustSet pemTrustSet, Secret certManagerSecret) {
362+
X509Certificate x509CaCert;
363+
X509Certificate certManagerCert;
364+
try {
365+
x509CaCert = x509Certificate(pemTrustSet.trustedCertificatesPemBytes());
366+
certManagerCert = x509Certificate(Util.decodeBytesFromBase64(certManagerSecret.getData().get("tls.crt")));
367+
} catch (CertificateException e) {
368+
throw new RuntimeException(e);
369+
}
370+
return !CertUtils.certIsTrusted(reconciliation, certManagerCert, x509CaCert);
371+
}
372+
302373
/**
303374
* Generates the NetworkPolicies relevant for Kafka Exporter
304375
*

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/CaReconciler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,6 @@ Future<Void> reconcileClusterOperatorSecret(Clock clock, Secret certManagerSecre
430430
if (clusterCaCertManagerType == CertificateManagerType.CERT_MANAGER_IO) {
431431
Secret newCoSecret = CertManagerUtils.buildTrustedCertificateSecretFromCertManager(
432432
clusterCa,
433-
null, //we don't need the Clients CA generation present on this Secret
434433
certManagerSecret,
435434
reconciliation.namespace(),
436435
KafkaResources.clusterOperatorCertsSecretName(reconciliation.name()),

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaExporterReconciler.java

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,18 @@
77
import io.fabric8.kubernetes.api.model.LocalObjectReference;
88
import io.fabric8.kubernetes.api.model.Secret;
99
import io.fabric8.kubernetes.api.model.apps.Deployment;
10+
import io.strimzi.api.kafka.model.common.CertificateManagerType;
1011
import io.strimzi.api.kafka.model.kafka.Kafka;
1112
import io.strimzi.api.kafka.model.kafka.exporter.KafkaExporterResources;
1213
import io.strimzi.operator.cluster.ClusterOperatorConfig;
14+
import io.strimzi.operator.cluster.model.CertManagerUtils;
1315
import io.strimzi.operator.cluster.model.CertUtils;
1416
import io.strimzi.operator.cluster.model.ClusterCa;
1517
import io.strimzi.operator.cluster.model.ImagePullPolicy;
1618
import io.strimzi.operator.cluster.model.KafkaExporter;
1719
import io.strimzi.operator.cluster.model.KafkaVersion;
1820
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
21+
import io.strimzi.operator.cluster.operator.resource.kubernetes.CertManagerCertificateOperator;
1922
import io.strimzi.operator.cluster.operator.resource.kubernetes.DeploymentOperator;
2023
import io.strimzi.operator.cluster.operator.resource.kubernetes.NetworkPolicyOperator;
2124
import io.strimzi.operator.cluster.operator.resource.kubernetes.PodDisruptionBudgetOperator;
@@ -49,6 +52,7 @@ public class KafkaExporterReconciler {
4952
private final ServiceAccountOperator serviceAccountOperator;
5053
private final NetworkPolicyOperator networkPolicyOperator;
5154
private final PodDisruptionBudgetOperator podDisruptionBudgetOperator;
55+
private final CertManagerCertificateOperator certManagerCertificateOperator;
5256

5357
private String certificateHash = "";
5458

@@ -83,6 +87,7 @@ public KafkaExporterReconciler(
8387
this.serviceAccountOperator = supplier.serviceAccountOperations;
8488
this.networkPolicyOperator = supplier.networkPolicyOperator;
8589
this.podDisruptionBudgetOperator = supplier.podDisruptionBudgetOperator;
90+
this.certManagerCertificateOperator = supplier.certManagerCertificateOperator;
8691
}
8792

8893
/**
@@ -99,7 +104,8 @@ public KafkaExporterReconciler(
99104
*/
100105
public Future<Void> reconcile(boolean isOpenShift, ImagePullPolicy imagePullPolicy, List<LocalObjectReference> imagePullSecrets, Clock clock) {
101106
return serviceAccount()
102-
.compose(i -> certificatesSecret(clock))
107+
.compose(i -> maybeReconcileCertManagerCertificates())
108+
.compose(secret -> certificatesSecret(clock, secret))
103109
.compose(i -> networkPolicy())
104110
.compose(i -> podDisruptionBudget())
105111
.compose(i -> deployment(isOpenShift, imagePullPolicy, imagePullSecrets))
@@ -121,27 +127,51 @@ private Future<Void> serviceAccount() {
121127
).mapEmpty();
122128
}
123129

130+
/**
131+
* Manages the Certificate object that is used when cert-manager is the Certificate issuer
132+
*
133+
* @return Completes when the Certificate object was successfully created, deleted or updated and returns the related Secret
134+
*/
135+
protected Future<Secret> maybeReconcileCertManagerCertificates() {
136+
//TODO handle empty reconciles when kafka exporter not enabled
137+
if (CertificateManagerType.CERT_MANAGER_IO.equals(clusterCa.getType())) {
138+
return certManagerCertificateOperator.reconcile(reconciliation, reconciliation.namespace(), KafkaExporterResources.secretName(reconciliation.name()), kafkaExporter.generateCertificateResource(clusterCa))
139+
.compose(v -> certManagerCertificateOperator.waitForReady(reconciliation, reconciliation.namespace(), KafkaExporterResources.secretName(reconciliation.name())))
140+
.compose(v -> secretOperator.getAsync(reconciliation.namespace(), CertManagerUtils.certManagerSecretName(KafkaExporterResources.secretName(reconciliation.name()))));
141+
} else {
142+
return Future.succeededFuture();
143+
}
144+
}
145+
124146
/**
125147
* Manages the Kafka Exporter Secret with certificates.
126148
*
127-
* @param clock The clock for supplying the reconciler with the time instant of each reconciliation cycle.
128-
* That time is used for checking maintenance windows
149+
* @param clock The clock for supplying the reconciler with the time instant of each reconciliation cycle.
150+
* That time is used for checking maintenance windows
151+
* @param certManagerSecret Secret managed by cert-manager containing the Kafka Exporter certificate, may be null if Strimzi is issuing certificates.
129152
*
130-
* @return Future which completes when the reconciliation is done
153+
* @return Future which completes when the reconciliation is done
131154
*/
132-
private Future<Void> certificatesSecret(Clock clock) {
155+
private Future<Void> certificatesSecret(Clock clock, Secret certManagerSecret) {
133156
if (kafkaExporter != null) {
134157
return secretOperator.getAsync(reconciliation.namespace(), KafkaExporterResources.secretName(reconciliation.name()))
135158
.compose(oldSecret -> {
136-
Secret newSecret = kafkaExporter.generateCertificatesSecret(clusterCa, oldSecret, Util.isMaintenanceTimeWindowsSatisfied(reconciliation, maintenanceWindows, clock.instant()));
137-
138-
return secretOperator
139-
.reconcile(reconciliation, reconciliation.namespace(), KafkaExporterResources.secretName(reconciliation.name()), newSecret)
140-
.compose(i -> {
141-
certificateHash = CertUtils.getCertificateShortThumbprint(newSecret, Ca.SecretEntry.CRT.asKey(KafkaExporter.COMPONENT_TYPE));
159+
Future<Secret> secretFuture;
160+
if (CertificateManagerType.CERT_MANAGER_IO.equals(clusterCa.getType())) {
161+
secretFuture = ReconcilerUtils.clusterCaPemTrustSet(reconciliation, secretOperator)
162+
.map(pemTrustSet -> kafkaExporter.generateCertificatesSecretForCertManagerCA(clusterCa, oldSecret, certManagerSecret, pemTrustSet));
163+
} else {
164+
secretFuture = Future.succeededFuture(kafkaExporter.generateCertificatesSecretForStrimziCa(clusterCa, oldSecret, Util.isMaintenanceTimeWindowsSatisfied(reconciliation, maintenanceWindows, clock.instant())));
165+
}
142166

167+
return secretFuture.compose(secret -> secretOperator.reconcile(reconciliation,
168+
reconciliation.namespace(),
169+
KafkaExporterResources.secretName(reconciliation.name()),
170+
secret)
171+
.compose(result -> {
172+
certificateHash = CertUtils.getCertificateShortThumbprint(secret, Ca.SecretEntry.CRT.asKey(KafkaExporter.COMPONENT_TYPE));
143173
return Future.succeededFuture();
144-
});
174+
}));
145175
});
146176
} else {
147177
return secretOperator

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/ReconcilerUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public static Future<TlsPemIdentity> coTlsPemIdentity(Reconciliation reconciliat
137137
*
138138
* @return Future containing the trust set to use for client authentication.
139139
*/
140-
private static Future<PemTrustSet> clusterCaPemTrustSet(Reconciliation reconciliation, SecretOperator secretOperator) {
140+
public static Future<PemTrustSet> clusterCaPemTrustSet(Reconciliation reconciliation, SecretOperator secretOperator) {
141141
return getSecret(secretOperator, reconciliation.namespace(), KafkaResources.clusterCaCertificateSecretName(reconciliation.name()))
142142
.map(PemTrustSet::new);
143143
}

0 commit comments

Comments
 (0)