5656import io .strimzi .api .kafka .model .kafka .Kafka ;
5757import io .strimzi .api .kafka .model .kafka .KafkaAuthorization ;
5858import io .strimzi .api .kafka .model .kafka .KafkaAuthorizationKeycloak ;
59+ import io .strimzi .api .kafka .model .kafka .KafkaAuthorizationOpa ;
5960import io .strimzi .api .kafka .model .kafka .KafkaClusterSpec ;
6061import io .strimzi .api .kafka .model .kafka .KafkaClusterTemplate ;
6162import io .strimzi .api .kafka .model .kafka .KafkaResources ;
6465import io .strimzi .api .kafka .model .kafka .cruisecontrol .CruiseControlResources ;
6566import io .strimzi .api .kafka .model .kafka .exporter .KafkaExporterResources ;
6667import io .strimzi .api .kafka .model .kafka .listener .GenericKafkaListener ;
68+ import io .strimzi .api .kafka .model .kafka .listener .KafkaListenerAuthentication ;
6769import io .strimzi .api .kafka .model .kafka .listener .KafkaListenerAuthenticationCustom ;
6870import io .strimzi .api .kafka .model .kafka .listener .KafkaListenerAuthenticationOAuth ;
71+ import io .strimzi .api .kafka .model .kafka .listener .KafkaListenerAuthenticationTls ;
6972import io .strimzi .api .kafka .model .kafka .listener .KafkaListenerType ;
7073import io .strimzi .api .kafka .model .kafka .quotas .QuotasPlugin ;
7174import io .strimzi .api .kafka .model .kafka .quotas .QuotasPluginStrimzi ;
100103import java .util .ArrayList ;
101104import java .util .Collections ;
102105import java .util .HashMap ;
106+ import java .util .HashSet ;
103107import java .util .LinkedHashSet ;
104108import java .util .List ;
105109import java .util .Map ;
@@ -145,8 +149,6 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp
145149
146150 protected static final String ENV_VAR_KAFKA_INIT_EXTERNAL_ADDRESS = "EXTERNAL_ADDRESS" ;
147151 private static final String ENV_VAR_KAFKA_JMX_EXPORTER_ENABLED = "KAFKA_JMX_EXPORTER_ENABLED" ;
148- private static final String ENV_VAR_STRIMZI_OPA_AUTHZ_TRUSTED_CERTS = "STRIMZI_OPA_AUTHZ_TRUSTED_CERTS" ;
149- private static final String ENV_VAR_STRIMZI_KEYCLOAK_AUTHZ_TRUSTED_CERTS = "STRIMZI_KEYCLOAK_AUTHZ_TRUSTED_CERTS" ;
150152 private static final String ENV_VAR_KAFKA_CLUSTER_NAME = "KAFKA_CLUSTER_NAME" ;
151153
152154 // For port names in services, a 'tcp-' prefix is added to support Istio protocol selection
@@ -1479,7 +1481,7 @@ private List<VolumeMount> getVolumeMounts(Storage storage, ContainerTemplate con
14791481 volumeMountList .add (VolumeUtils .createVolumeMount ("custom-" + identifier + "-certs" , "/opt/kafka/certificates/custom-" + identifier + "-certs" ));
14801482 }
14811483
1482- if (ListenersUtils .isListenerWithOAuth (listener )) {
1484+ if (ListenersUtils .isListenerWithOAuth (listener ) && listener . getAuth () instanceof KafkaListenerAuthenticationOAuth oauth && oauth . getTlsTrustedCertificates () != null ) {
14831485 String oauthTrustedCertsSecret = KafkaResources .internalOauthTrustedCertsSecretName (cluster );
14841486 volumeMountList .add (VolumeUtils .createVolumeMount (oauthTrustedCertsSecret , TRUSTED_CERTS_BASE_VOLUME_MOUNT + "/" + oauthTrustedCertsSecret ));
14851487 }
@@ -1678,23 +1680,40 @@ public ClusterRoleBinding generateClusterRoleBinding(String assemblyNamespace) {
16781680 **
16791681 * @return role for the Kafka Cluster
16801682 */
1683+ @ SuppressWarnings ("deprecation" ) // OPA Authorization is deprecated
16811684 public Role generateRole () {
1682- List <String > certSecretNames = new ArrayList <>();
1685+ Set <String > certSecretNames = new HashSet <>();
16831686 certSecretNames .add (KafkaResources .clusterCaCertificateSecretName (cluster ));
1684- certSecretNames .add (KafkaResources .clientsCaCertificateSecretName (cluster ));
1685- certSecretNames .add (KafkaResources .internalAuthzTrustedCertsSecretName (cluster ));
1686- certSecretNames .add (KafkaResources .internalOauthTrustedCertsSecretName (cluster ));
16871687 certSecretNames .addAll (nodes ().stream ().map (NodeRef ::podName ).toList ());
16881688
1689+ for (GenericKafkaListener listener : listeners ) {
1690+ if (listener .isTls ()) {
1691+ if (listener .getConfiguration () != null ) {
1692+ certSecretNames .add (listener .getConfiguration ().getBrokerCertChainAndKey ().getSecretName ());
1693+ }
1694+ }
1695+
1696+ KafkaListenerAuthentication auth = listener .getAuth ();
1697+ if (auth instanceof KafkaListenerAuthenticationOAuth ) {
1698+ certSecretNames .add (KafkaResources .internalOauthTrustedCertsSecretName (cluster ));
1699+ } else if (auth instanceof KafkaListenerAuthenticationTls ) {
1700+ certSecretNames .add (KafkaResources .clientsCaCertificateSecretName (cluster ));
1701+ }
1702+ }
1703+
1704+ if ((authorization instanceof KafkaAuthorizationOpa opa && opa .getTlsTrustedCertificates () != null && !opa .getTlsTrustedCertificates ().isEmpty ())
1705+ || (authorization instanceof KafkaAuthorizationKeycloak kc && kc .getTlsTrustedCertificates () != null && !kc .getTlsTrustedCertificates ().isEmpty ())) {
1706+ certSecretNames .add (KafkaResources .internalAuthzTrustedCertsSecretName (cluster ));
1707+ }
1708+
16891709 List <PolicyRule > rules = List .of (new PolicyRuleBuilder ()
16901710 .withApiGroups ("" )
16911711 .withResources ("secrets" )
16921712 .withVerbs ("get" )
1693- .withResourceNames (certSecretNames )
1713+ .withResourceNames (certSecretNames . stream (). toList () )
16941714 .build ());
16951715
1696- Role role = RbacUtils .createRole (componentName , namespace , rules , labels , ownerReference , null );
1697- return role ;
1716+ return RbacUtils .createRole (componentName , namespace , rules , labels , ownerReference , null );
16981717 }
16991718
17001719 /**
@@ -1715,10 +1734,8 @@ public RoleBinding generateRoleBindingForRole() {
17151734 .withKind ("Role" )
17161735 .build ();
17171736
1718- RoleBinding rb = RbacUtils
1737+ return RbacUtils
17191738 .createRoleBinding (KafkaResources .kafkaRoleBindingName (cluster ), namespace , roleRef , List .of (subject ), labels , ownerReference , null );
1720-
1721- return rb ;
17221739 }
17231740
17241741 /**
0 commit comments