Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
66318c9
feat: enhance validation for restart annotations in Kafka connectors
rlanhellas Sep 1, 2025
5deae11
feat: enhance validation for restart annotations in Kafka connectors
rlanhellas Sep 1, 2025
85c3896
feat: improving connector restart args
rlanhellas Sep 9, 2025
09837fa
tests: connector restart with args
rlanhellas Sep 9, 2025
b66a848
tests: connector restart with args
rlanhellas Sep 9, 2025
df9f746
docs: connector restart with args
rlanhellas Sep 9, 2025
ae3bc01
Update Annotations.java
rlanhellas Sep 10, 2025
dce944f
Update KafkaMirrorMaker2AssemblyOperator.java
rlanhellas Sep 10, 2025
26f6ac2
Update cluster-operator/src/test/java/io/strimzi/operator/cluster/ope…
rlanhellas Sep 10, 2025
98ac310
feat: removing regex to improve readability in the code
rlanhellas Sep 10, 2025
a382a96
test: improving tests with invalid args
rlanhellas Sep 10, 2025
44d9ea4
Update KafkaConnectAssemblyOperator.java
rlanhellas Sep 10, 2025
0eccf8b
test: adding test cases for validate annotation values in restart kaf…
rlanhellas Sep 13, 2025
c0a4804
test: adding test cases for validate annotation values in restart kaf…
rlanhellas Sep 13, 2025
87aa2e2
Update KafkaConnectAssemblyOperator.java
rlanhellas Sep 13, 2025
8bafcf5
Update AbstractConnectOperator.java
rlanhellas Sep 13, 2025
90a7725
Merge branch 'main' into issue-11484
rlanhellas Sep 15, 2025
fd2bdca
test: moving tests to restart class
rlanhellas Sep 15, 2025
ccdb38e
Update CHANGELOG.md
rlanhellas Sep 15, 2025
5f7e4ac
Update cluster-operator/src/main/java/io/strimzi/operator/cluster/ope…
rlanhellas Sep 15, 2025
827a81a
Update KafkaMirrorMaker2AssemblyOperator.java
rlanhellas Sep 16, 2025
7ffac47
Update KafkaMirrorMaker2AssemblyOperator.java
rlanhellas Sep 16, 2025
6b6c85d
Update CHANGELOG.md
rlanhellas Sep 16, 2025
0fd307c
Update proc-manual-restart-mirrormaker2-connector.adoc
rlanhellas Sep 16, 2025
62ffbc6
Update proc-manual-restart-connector.adoc
rlanhellas Sep 16, 2025
36bd179
test: moving tests to parameterized tests
rlanhellas Sep 19, 2025
2451683
Update KafkaConnectAssemblyOperatorConnectorRestartTest.java
rlanhellas Sep 22, 2025
7f94597
Update cluster-operator/src/test/java/io/strimzi/operator/cluster/ope…
rlanhellas Sep 22, 2025
1f3c544
feat: adding anyBoolean() to restart tests
rlanhellas Sep 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* Extend the EntityOperator, Cruise Control and KafkaExporter deployment to support PDB via the template section in the CR spec.
* Added support for [KIP-1073](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1073:+Return+fenced+brokers+in+DescribeCluster+response)
to get the list of the registered brokers by using the Kafka Admin API. It replaces the usage of the `.status.registeredNodeIds` field in Kafka.
* Added support for [KIP-745](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308623) in Kafka Connector and Mirror Maker 2, allowing the usage of
`includeTasks` and `onlyFailed` arguments in kafka connectors restart.
Comment thread
rlanhellas marked this conversation as resolved.
Outdated

### Major changes, deprecations and removals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.DefaultKubernetesResourceList;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.LocalObjectReference;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.ServiceAccount;
Expand Down Expand Up @@ -131,6 +132,17 @@ public abstract class AbstractConnectOperator<C extends KubernetesClient, T exte
protected final SharedEnvironmentProvider sharedEnvironmentProvider;
protected final int port;

/**
* This optional argument can be used to include tasks in the restart connector operation.
* */
protected static final String STRIMZI_IO_RESTART_INCLUDE_TASKS_ARG = "includeTasks";

/**
* This optional argument can be used to restart connector only failed tasks.
* */
Comment thread
rlanhellas marked this conversation as resolved.
Outdated
protected static final String STRIMZI_IO_RESTART_ONLY_FAILED_ARG = "onlyFailed";


Comment thread
rlanhellas marked this conversation as resolved.
Outdated
/**
* Constructor
*
Expand Down Expand Up @@ -746,8 +758,18 @@ private static int nextAutoRestartBackOffIntervalInMinutes(int restartCount)
@SuppressWarnings({ "rawtypes" })
private Future<List<Condition>> maybeRestartConnector(Reconciliation reconciliation, String host, KafkaConnectApi apiClient, String connectorName, CustomResource resource, List<Condition> conditions) {
if (hasRestartAnnotation(resource, connectorName)) {
LOGGER.debugCr(reconciliation, "Restarting connector {}", connectorName);
return VertxUtil.completableFutureToVertxFuture(apiClient.restart(host, port, connectorName, false, false))

if (!restartAnnotationIsValid(resource)) {
LOGGER.warnCr(reconciliation, "Invalid annotation format");
conditions.add(StatusUtils.buildWarningCondition("RestartConnector", "Invalid annotation format"));
return Future.succeededFuture(conditions);
}

boolean restartIncludeTasks = restartAnnotationHasIncludeTasksArg(resource);
boolean restartOnlyFailedTasks = restartAnnotationHasOnlyFailedTasksArg(resource);
LOGGER.infoCr(reconciliation, "Restarting connector {}, IncludeTasks {}, OnlyFailedTasks {}", connectorName, restartIncludeTasks, restartOnlyFailedTasks);

return VertxUtil.completableFutureToVertxFuture(apiClient.restart(host, port, connectorName, restartIncludeTasks, restartOnlyFailedTasks))
.compose(ignored -> removeRestartAnnotation(reconciliation, resource)
.compose(v -> Future.succeededFuture(conditions)),
throwable -> {
Expand Down Expand Up @@ -1059,6 +1081,36 @@ private Future<ConnectorStatusAndConditions> updateConnectorTopics(Reconciliatio
@SuppressWarnings({ "rawtypes" })
abstract boolean hasRestartAnnotation(CustomResource resource, String connectorName);

/**
* Checks if restart annotation value is valid
*
* @param resource Resource instance to check
*
* @return True if the provided resource has valid restart annotation. False otherwise.
* */
@SuppressWarnings({ "rawtypes" })
abstract boolean restartAnnotationIsValid(CustomResource resource);

/**
* Checks whether the provided resource instance (a KafkaConnector or KafkaMirrorMaker2) has argument includeTasks in restart annotation.
*
* @param resource Resource instance to check
*
* @return True if the provided resource has argument includeTasks in restart annotation. False otherwise.
*/
@SuppressWarnings({ "rawtypes" })
abstract boolean restartAnnotationHasIncludeTasksArg(CustomResource resource);

/**
* Checks whether the provided resource instance (a KafkaConnector or KafkaMirrorMaker2) has argument onlyFailedTasks in restart annotation.
*
* @param resource Resource instance to check
*
* @return True if the provided resource has argument onlyFailedTasks in restart annotation. False otherwise.
*/
@SuppressWarnings({ "rawtypes" })
abstract boolean restartAnnotationHasOnlyFailedTasksArg(HasMetadata resource);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you check here and with the implementations if you can use HasMetadata instead of CustomResource?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to change the implementations, just did it


/**
* Returns the ID of the connector task to be restarted from the (a KafkaConnector or KafkaMirrorMaker2) custom resource.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package io.strimzi.operator.cluster.operator.assembly;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.api.model.LabelSelectorBuilder;
import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
Expand Down Expand Up @@ -62,6 +63,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static io.strimzi.api.ResourceAnnotations.ANNO_STRIMZI_IO_CONNECTOR_OFFSETS;
Expand All @@ -80,6 +82,13 @@ public class KafkaConnectAssemblyOperator extends AbstractConnectOperator<Kubern
private final CrdOperator<KubernetesClient, KafkaConnector, KafkaConnectorList> connectorOperator;
private final ConnectBuildOperator connectBuildOperator;

/**
* Pattern for validation of restart connector annotation value.
* */
private static final Pattern STRIMZI_IO_RESTART_ARGS_PATTERN = Pattern.compile("^includeTasks," +
"onlyFailed$|^onlyFailed,includeTasks$|^includeTasks$|^onlyFailed$|^true$");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems pretty hacky. Maybe you can split it by , into a list and check the values or something?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the validation to if-else statement and removed the regex pattern



/**
* Constructor
*
Expand Down Expand Up @@ -613,7 +622,56 @@ private Future<Void> maybeUpdateConnectorStatus(Reconciliation reconciliation, K
@SuppressWarnings({ "rawtypes" })
@Override
protected boolean hasRestartAnnotation(CustomResource resource, String connectorName) {
return Annotations.booleanAnnotation(resource, ANNO_STRIMZI_IO_RESTART, false);
return Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART, null) != null;
}

/**
* Checks if restart annotation value has valid combination of values. These are valid and not valid combinations:
* strimzi.io/restart=includeTasks,onlyFailed # restart with args: includeTasks=true and onlyFailed=true
* strimzi.io/restart=includeTasks # restart with args: includeTasks=true and onlyFailed=false
* strimzi.io/restart=onlyFailed # restart with args: includeTasks=false and onlyFailed=true
* strimzi.io/restart=true # restart with args: includeTasks=false and onlyFailed=false
* strimzi.io/restart=false,includeTasks,onlyFailed # do not restart, fail and log error because you can't set args and boolean value together
* strimzi.io/restart=true,includeTasks,onlyFailed # do not restart, fail and log error because you can't set args and boolean value together
* strimzi.io/restart=includeTasks,wrongArg # do not restart, fail and log error because wrongArg is not supported
*
* @param resource Resource instance to check
*
* @return True if the provided resource has valid restart annotation. False otherwise.
* */
@SuppressWarnings({ "rawtypes" })
protected boolean restartAnnotationIsValid(CustomResource resource) {
String restartValue = Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART, "");
return STRIMZI_IO_RESTART_ARGS_PATTERN.matcher(restartValue).matches();
}

/**
* Checks whether the provided resource instance (a KafkaConnector or KafkaMirrorMaker2) has argument includeTasks in restart annotation.
*
* @param resource Resource instance to check
*
* @return True if the provided resource has argument includeTasks in restart annotation. False otherwise.
*/
@SuppressWarnings({ "rawtypes" })
@Override
protected boolean restartAnnotationHasIncludeTasksArg(CustomResource resource) {
return Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART, "")
.contains(STRIMZI_IO_RESTART_INCLUDE_TASKS_ARG);
}


Comment thread
rlanhellas marked this conversation as resolved.
Outdated
/**
Comment thread
rlanhellas marked this conversation as resolved.
* Checks whether the provided resource instance (a KafkaConnector or KafkaMirrorMaker2) has argument onlyFailedTasks in restart annotation.
*
* @param resource Resource instance to check
*
* @return True if the provided resource has argument onlyFailedTasks in restart annotation. False otherwise.
*/
@SuppressWarnings({ "rawtypes" })
@Override
protected boolean restartAnnotationHasOnlyFailedTasksArg(HasMetadata resource) {
return Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART, "")
.contains(STRIMZI_IO_RESTART_ONLY_FAILED_ARG);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package io.strimzi.operator.cluster.operator.assembly;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.KubernetesClient;
Expand Down Expand Up @@ -42,6 +43,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static io.strimzi.api.ResourceAnnotations.ANNO_STRIMZI_IO_CONNECTOR_OFFSETS;
Expand All @@ -63,6 +65,14 @@
public class KafkaMirrorMaker2AssemblyOperator extends AbstractConnectOperator<KubernetesClient, KafkaMirrorMaker2, KafkaMirrorMaker2List, KafkaMirrorMaker2Spec, KafkaMirrorMaker2Status> {
private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaMirrorMaker2AssemblyOperator.class.getName());

/**
* Pattern for validation of MirrorMaker2 restart connector annotation value.
* */
private static final Pattern STRIMZI_IO_RESTART_CONNECTOR_MM2_ARGS_PATTERN = Pattern
.compile("^([a-zA-Z0-9-_]+):includeTasks,onlyFailed$|^([a-zA-Z0-9-_]+):onlyFailed,includeTasks$" +
"|^([a-zA-Z0-9-_]+):includeTasks$|^([a-zA-Z0-9-_]+):onlyFailed$|^([a-zA-Z0-9-_]+)$");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as in the other expression. Maybe we can try to simplify it a bit? Parse our the comnand part using the Regexp and than split it rather than including all various combiantions? This does not scale very well.


In addition, whatever the way to parse it is, it might be good to have some unit tests for how it parses the various annotations (both here as well as for connect). With some valid and invalid values etc.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the validation to if-else statement and removed the regex pattern

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, regarding testing invalid args, you can check the tests, I added a test to invalid args

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. It is probably good to test it with invalid value once in the way you have it.

But why don't you just test the methods for validating the annotations / checking the presence of the arguments? You can easily test them with 10 different valid and invalid values without the need for any mocks or anything. That lets you very quickly test all the possibly annotation forms with typos, spaces, etc.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a unit test for both kafka connect and MM2 annotation values with many cases to test the maximum of validations we can. Please take a look again.



Comment thread
rlanhellas marked this conversation as resolved.
Outdated
/**
* Constructor
*
Expand Down Expand Up @@ -311,7 +321,54 @@ private Future<Void> maybeUpdateMirrorMaker2Status(Reconciliation reconciliation
@Override
protected boolean hasRestartAnnotation(CustomResource resource, String connectorName) {
String restartAnnotationConnectorName = Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART_CONNECTOR, null);
return connectorName.equals(restartAnnotationConnectorName);
return restartAnnotationConnectorName != null && restartAnnotationConnectorName.contains(connectorName);
}

/**
* Checks if restart annotation value has valid combination of values. These are valid and not valid combinations:
* strimzi.io/restart-connector=mirrormaker_connector_name:includeTasks,onlyFailed # restart with args: includeTasks=true and onlyFailed=true
* strimzi.io/restart-connector=mirrormaker_connector_name:includeTasks # restart with args: includeTasks=true and onlyFailed=false
* strimzi.io/restart-connector=mirrormaker_connector_name:onlyFailed # restart with args: includeTasks=false and onlyFailed=true
* strimzi.io/restart-connector=mirrormaker_connector_name # restart with args: includeTasks=false and onlyFailed=false
* strimzi.io/restart-connector=includeTasks,onlyFailed # do not restart, fail and log error because connector name is required
*
* @param resource Resource instance to check
*
* @return True if the provided resource has valid restart annotation. False otherwise.
* */
@SuppressWarnings({ "rawtypes" })
protected boolean restartAnnotationIsValid(CustomResource resource) {
String restartValue = Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART_CONNECTOR, "");
return STRIMZI_IO_RESTART_CONNECTOR_MM2_ARGS_PATTERN.matcher(restartValue).matches();
}

/**
* Checks whether the provided resource instance (a KafkaConnector or KafkaMirrorMaker2) has argument includeTasks in restart annotation.
*
* @param resource Resource instance to check
*
* @return True if the provided resource has argument includeTasks in restart annotation. False otherwise.
*/
@SuppressWarnings({ "rawtypes" })
@Override
protected boolean restartAnnotationHasIncludeTasksArg(CustomResource resource) {
return Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART_CONNECTOR, "")
.contains(STRIMZI_IO_RESTART_INCLUDE_TASKS_ARG);
}


/**
* Checks whether the provided resource instance (a KafkaConnector or KafkaMirrorMaker2) has argument onlyFailedTasks in restart annotation.
*
* @param resource Resource instance to check
*
* @return True if the provided resource has argument onlyFailedTasks in restart annotation. False otherwise.
*/
@SuppressWarnings({ "rawtypes" })
@Override
protected boolean restartAnnotationHasOnlyFailedTasksArg(HasMetadata resource) {
return Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART_CONNECTOR, "")
.contains(STRIMZI_IO_RESTART_ONLY_FAILED_ARG);
}

/**
Expand Down
Loading
Loading