Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
66318c9
feat: enhance validation for restart annotations in Kafka connectors
rlanhellas Sep 1, 2025
5deae11
feat: enhance validation for restart annotations in Kafka connectors
rlanhellas Sep 1, 2025
85c3896
feat: improving connector restart args
rlanhellas Sep 9, 2025
09837fa
tests: connector restart with args
rlanhellas Sep 9, 2025
b66a848
tests: connector restart with args
rlanhellas Sep 9, 2025
df9f746
docs: connector restart with args
rlanhellas Sep 9, 2025
ae3bc01
Update Annotations.java
rlanhellas Sep 10, 2025
dce944f
Update KafkaMirrorMaker2AssemblyOperator.java
rlanhellas Sep 10, 2025
26f6ac2
Update cluster-operator/src/test/java/io/strimzi/operator/cluster/ope…
rlanhellas Sep 10, 2025
98ac310
feat: removing regex to improve readability in the code
rlanhellas Sep 10, 2025
a382a96
test: improving tests with invalid args
rlanhellas Sep 10, 2025
44d9ea4
Update KafkaConnectAssemblyOperator.java
rlanhellas Sep 10, 2025
0eccf8b
test: adding test cases for validate annotation values in restart kaf…
rlanhellas Sep 13, 2025
c0a4804
test: adding test cases for validate annotation values in restart kaf…
rlanhellas Sep 13, 2025
87aa2e2
Update KafkaConnectAssemblyOperator.java
rlanhellas Sep 13, 2025
8bafcf5
Update AbstractConnectOperator.java
rlanhellas Sep 13, 2025
90a7725
Merge branch 'main' into issue-11484
rlanhellas Sep 15, 2025
fd2bdca
test: moving tests to restart class
rlanhellas Sep 15, 2025
ccdb38e
Update CHANGELOG.md
rlanhellas Sep 15, 2025
5f7e4ac
Update cluster-operator/src/main/java/io/strimzi/operator/cluster/ope…
rlanhellas Sep 15, 2025
827a81a
Update KafkaMirrorMaker2AssemblyOperator.java
rlanhellas Sep 16, 2025
7ffac47
Update KafkaMirrorMaker2AssemblyOperator.java
rlanhellas Sep 16, 2025
6b6c85d
Update CHANGELOG.md
rlanhellas Sep 16, 2025
0fd307c
Update proc-manual-restart-mirrormaker2-connector.adoc
rlanhellas Sep 16, 2025
62ffbc6
Update proc-manual-restart-connector.adoc
rlanhellas Sep 16, 2025
36bd179
test: moving tests to parameterized tests
rlanhellas Sep 19, 2025
2451683
Update KafkaConnectAssemblyOperatorConnectorRestartTest.java
rlanhellas Sep 22, 2025
7f94597
Update cluster-operator/src/test/java/io/strimzi/operator/cluster/ope…
rlanhellas Sep 22, 2025
1f3c544
feat: adding anyBoolean() to restart tests
rlanhellas Sep 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
* Extend the EntityOperator, Cruise Control and KafkaExporter deployment to support PDB via the template section in the CR spec.
* Added support for [KIP-1073](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1073:+Return+fenced+brokers+in+DescribeCluster+response)
to get the list of the registered brokers by using the Kafka Admin API. It replaces the usage of the `.status.registeredNodeIds` field in Kafka.
* Added support for [KIP-745](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308623) in Kafka Connector and Mirror Maker 2, allowing the usage of
`includeTasks` and `onlyFailed` arguments in Kafka connectors restart.
* Update OAuth library to 0.17.0.
* Additional OAuth configuration options have been added for 'oauth' authentication on the listener and the client.
On the listener `clientGrantType` has been added.
On the client `grantType` has been added.

### Major changes, deprecations and removals

* Fix RBAC naming for `KafkaMirrorMaker2` to avoid `RoleBinding` collisions when a `KafkaConnect` with the same name exists in the same namespace. `KafkaMirrorMaker2` now uses dedicated `RoleBinding` names.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.DefaultKubernetesResourceList;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.LocalObjectReference;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.ServiceAccount;
Expand Down Expand Up @@ -131,6 +132,15 @@ public abstract class AbstractConnectOperator<C extends KubernetesClient, T exte
protected final SharedEnvironmentProvider sharedEnvironmentProvider;
protected final int port;

/**
* This optional argument can be used to include tasks in the restart connector operation.
* */
protected static final String STRIMZI_IO_RESTART_INCLUDE_TASKS_ARG = "includeTasks";

/**
* This optional argument can be used to restart connector only failed tasks.
**/
protected static final String STRIMZI_IO_RESTART_ONLY_FAILED_ARG = "onlyFailed";
/**
* Constructor
*
Expand Down Expand Up @@ -746,8 +756,18 @@ private static int nextAutoRestartBackOffIntervalInMinutes(int restartCount)
@SuppressWarnings({ "rawtypes" })
private Future<List<Condition>> maybeRestartConnector(Reconciliation reconciliation, String host, KafkaConnectApi apiClient, String connectorName, CustomResource resource, List<Condition> conditions) {
if (hasRestartAnnotation(resource, connectorName)) {
LOGGER.debugCr(reconciliation, "Restarting connector {}", connectorName);
return VertxUtil.completableFutureToVertxFuture(apiClient.restart(host, port, connectorName, false, false))

if (!restartAnnotationIsValid(resource, connectorName)) {
LOGGER.warnCr(reconciliation, "Invalid annotation format");
conditions.add(StatusUtils.buildWarningCondition("RestartConnector", "Invalid annotation format"));
return Future.succeededFuture(conditions);
}

boolean restartIncludeTasks = restartAnnotationHasIncludeTasksArg(resource);
boolean restartOnlyFailedTasks = restartAnnotationHasOnlyFailedTasksArg(resource);
LOGGER.infoCr(reconciliation, "Restarting connector {}, IncludeTasks {}, OnlyFailedTasks {}", connectorName, restartIncludeTasks, restartOnlyFailedTasks);

return VertxUtil.completableFutureToVertxFuture(apiClient.restart(host, port, connectorName, restartIncludeTasks, restartOnlyFailedTasks))
.compose(ignored -> removeRestartAnnotation(reconciliation, resource)
.compose(v -> Future.succeededFuture(conditions)),
throwable -> {
Expand Down Expand Up @@ -1056,8 +1076,35 @@ private Future<ConnectorStatusAndConditions> updateConnectorTopics(Reconciliatio
*
* @return True if the provided resource has the restart annotation. False otherwise.
*/
@SuppressWarnings({ "rawtypes" })
abstract boolean hasRestartAnnotation(CustomResource resource, String connectorName);
abstract boolean hasRestartAnnotation(HasMetadata resource, String connectorName);

/**
* Checks if restart annotation value is valid
*
* @param resource Resource instance to check
* @param connectorName Connector name of the connector to check
*
* @return True if the provided resource has valid restart annotation. False otherwise.
* */
abstract boolean restartAnnotationIsValid(HasMetadata resource, String connectorName);

/**
* Checks whether the provided resource instance (a KafkaConnector or KafkaMirrorMaker2) has argument includeTasks in restart annotation.
*
* @param resource Resource instance to check
*
* @return True if the provided resource has argument includeTasks in restart annotation. False otherwise.
*/
abstract boolean restartAnnotationHasIncludeTasksArg(HasMetadata resource);

/**
* Checks whether the provided resource instance (a KafkaConnector or KafkaMirrorMaker2) has argument onlyFailedTasks in restart annotation.
*
* @param resource Resource instance to check
*
* @return True if the provided resource has argument onlyFailedTasks in restart annotation. False otherwise.
*/
abstract boolean restartAnnotationHasOnlyFailedTasksArg(HasMetadata resource);

/**
* Returns the ID of the connector task to be restarted from the (a KafkaConnector or KafkaMirrorMaker2) custom resource.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package io.strimzi.operator.cluster.operator.assembly;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.api.model.LabelSelectorBuilder;
import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
Expand Down Expand Up @@ -80,6 +81,7 @@ public class KafkaConnectAssemblyOperator extends AbstractConnectOperator<Kubern
private final CrdOperator<KubernetesClient, KafkaConnector, KafkaConnectorList> connectorOperator;
private final ConnectBuildOperator connectBuildOperator;


/**
* Constructor
*
Expand Down Expand Up @@ -610,10 +612,80 @@ private Future<Void> maybeUpdateConnectorStatus(Reconciliation reconciliation, K
*
* @return True if the KafkaConnector resource has the strimzi.io/restart annotation. False otherwise.
*/
@Override
protected boolean hasRestartAnnotation(HasMetadata resource, String connectorName) {
String annoValue = Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART, null);
return annoValue != null && !annoValue.isBlank() && !annoValue.contains(Boolean.FALSE.toString());
}

/**
* Checks if restart annotation value has valid combination of values. These are valid and not valid combinations:
* strimzi.io/restart=includeTasks,onlyFailed # restart with args: includeTasks=true and onlyFailed=true
* strimzi.io/restart=includeTasks # restart with args: includeTasks=true and onlyFailed=false
* strimzi.io/restart=onlyFailed # restart with args: includeTasks=false and onlyFailed=true
* strimzi.io/restart=true # restart with args: includeTasks=false and onlyFailed=false
* strimzi.io/restart=false,includeTasks,onlyFailed # do not restart, fail and log error because you can't set args and boolean value together
* strimzi.io/restart=true,includeTasks,onlyFailed # do not restart, fail and log error because you can't set args and boolean value together
* strimzi.io/restart=includeTasks,wrongArg # do not restart, fail and log error because wrongArg is not supported
*
* @param resource Resource instance to check
* @param connectorName Connector name of the connector to check
*
* @return True if the provided resource has valid restart annotation. False otherwise.
* */
protected boolean restartAnnotationIsValid(HasMetadata resource, String connectorName) {
String restartValue = Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART, "");
String[] values = restartValue.split(",");
boolean hasBooleanValue = false;
boolean hasCustomArgsValue = false;

for (String value : values) {
if (Boolean.TRUE.toString().equalsIgnoreCase(value.trim())) {
hasBooleanValue = true;
} else if (STRIMZI_IO_RESTART_INCLUDE_TASKS_ARG.equalsIgnoreCase(value.trim()) || STRIMZI_IO_RESTART_ONLY_FAILED_ARG.equalsIgnoreCase(value.trim())) {
hasCustomArgsValue = true;
} else {
// unsupported value
return false;
}
}

if (!hasBooleanValue && !hasCustomArgsValue) {
// needs at least one valid value, could be boolean (true/false) or custom args
return false;
} else if (hasBooleanValue && hasCustomArgsValue) {
// cannot mix boolean value with custom args
return false;
}

return true;
Comment thread
ppatierno marked this conversation as resolved.
}

/**
* Checks whether the provided resource instance (a KafkaConnector or KafkaMirrorMaker2) has argument includeTasks in restart annotation.
*
* @param resource Resource instance to check
*
* @return True if the provided resource has argument includeTasks in restart annotation. False otherwise.
*/
@Override
protected boolean restartAnnotationHasIncludeTasksArg(HasMetadata resource) {
return Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART, "")
.contains(STRIMZI_IO_RESTART_INCLUDE_TASKS_ARG);
}

/**
* Checks whether the provided resource instance (a KafkaConnector or KafkaMirrorMaker2) has argument onlyFailedTasks in restart annotation.
*
* @param resource Resource instance to check
*
* @return True if the provided resource has argument onlyFailedTasks in restart annotation. False otherwise.
*/
@SuppressWarnings({ "rawtypes" })
@Override
protected boolean hasRestartAnnotation(CustomResource resource, String connectorName) {
return Annotations.booleanAnnotation(resource, ANNO_STRIMZI_IO_RESTART, false);
protected boolean restartAnnotationHasOnlyFailedTasksArg(HasMetadata resource) {
return Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART, "")
.contains(STRIMZI_IO_RESTART_ONLY_FAILED_ARG);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package io.strimzi.operator.cluster.operator.assembly;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.KubernetesClient;
Expand Down Expand Up @@ -307,11 +308,79 @@ private Future<Void> maybeUpdateMirrorMaker2Status(Reconciliation reconciliation
*
* @return True if the provided resource instance has the strimzi.io/restart-connector annotation. False otherwise.
*/
@SuppressWarnings({ "rawtypes" })
@Override
protected boolean hasRestartAnnotation(CustomResource resource, String connectorName) {
protected boolean hasRestartAnnotation(HasMetadata resource, String connectorName) {
String restartAnnotationConnectorName = Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART_CONNECTOR, null);
return connectorName.equals(restartAnnotationConnectorName);
return restartAnnotationConnectorName != null && restartAnnotationConnectorName.contains(connectorName);
}

/**
* Checks if restart annotation value has valid combination of values. These are valid and not valid combinations:
* strimzi.io/restart-connector=mirrormaker_connector_name:includeTasks,onlyFailed # restart with args: includeTasks=true and onlyFailed=true
* strimzi.io/restart-connector=mirrormaker_connector_name:includeTasks # restart with args: includeTasks=true and onlyFailed=false
* strimzi.io/restart-connector=mirrormaker_connector_name:onlyFailed # restart with args: includeTasks=false and onlyFailed=true
* strimzi.io/restart-connector=mirrormaker_connector_name # restart with args: includeTasks=false and onlyFailed=false
* strimzi.io/restart-connector=includeTasks,onlyFailed # do not restart, fail and log error because connector name is required
*
* @param resource Resource instance to check
*
* @return True if the provided resource has valid restart annotation. False otherwise.
* */
protected boolean restartAnnotationIsValid(HasMetadata resource, String connectorName) {
String restartValue = Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART_CONNECTOR, "");
String[] values = restartValue.split(":");

// invalid format, more than one ':' character
if (values.length != 1 && values.length != 2) {
return false;
}

// check if connector name is present and valid
if (!values[0].equalsIgnoreCase(connectorName)) {
return false;
}

// we expect that second item in array contains a list of arguments to be used
if (values.length == 2) {
String[] argValues = values[1].split(",");

for (String arg : argValues) {
if (!STRIMZI_IO_RESTART_INCLUDE_TASKS_ARG.equalsIgnoreCase(arg.trim()) && !STRIMZI_IO_RESTART_ONLY_FAILED_ARG.equalsIgnoreCase(arg.trim())) {
// invalid argument found
return false;
}
}
}

return true;
}

/**
* Checks whether the provided resource instance (a KafkaConnector or KafkaMirrorMaker2) has argument includeTasks in restart annotation.
*
* @param resource Resource instance to check
*
* @return True if the provided resource has argument includeTasks in restart annotation. False otherwise.
*/
@Override
protected boolean restartAnnotationHasIncludeTasksArg(HasMetadata resource) {
return Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART_CONNECTOR, "")
.contains(STRIMZI_IO_RESTART_INCLUDE_TASKS_ARG);
}


/**
* Checks whether the provided resource instance (a KafkaConnector or KafkaMirrorMaker2) has argument onlyFailedTasks in restart annotation.
*
* @param resource Resource instance to check
*
* @return True if the provided resource has argument onlyFailedTasks in restart annotation. False otherwise.
*/
@SuppressWarnings({ "rawtypes" })
@Override
protected boolean restartAnnotationHasOnlyFailedTasksArg(HasMetadata resource) {
return Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART_CONNECTOR, "")
.contains(STRIMZI_IO_RESTART_ONLY_FAILED_ARG);
}

/**
Expand Down
Loading
Loading