Skip to content

Allow configuring additional options when restarting connectors#11797

Merged
ppatierno merged 29 commits intostrimzi:mainfrom
rlanhellas:issue-11484
Sep 23, 2025
Merged

Allow configuring additional options when restarting connectors#11797
ppatierno merged 29 commits intostrimzi:mainfrom
rlanhellas:issue-11484

Conversation

@rlanhellas
Copy link
Copy Markdown
Contributor

@rlanhellas rlanhellas commented Sep 1, 2025

Type of change

Enhancement

Description

Fixes #11762 #11484
This PR implements this proposal: https://github.com/strimzi/proposals/blob/main/111-add-restart-parameters-to-kafka-connectors.md.

Checklist

  • Write tests
  • Make sure all tests pass
  • Update documentation
  • Check RBAC rights for Kubernetes / OpenShift roles
  • Try your changes from Pod inside your Kubernetes and OpenShift cluster, not just locally
  • Reference relevant issue(s) and close them after merging
  • Update CHANGELOG.md
  • Supply screenshots for visual changes, such as Grafana dashboards

@ppatierno ppatierno added this to the 0.49.0 milestone Sep 1, 2025
@ppatierno
Copy link
Copy Markdown
Member

@rlanhellas there are some checkstyle failures within the Azure pipeline.

Copy link
Copy Markdown
Member

@ppatierno ppatierno left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rlanhellas! I was wondering we need some tests about the new annotation values to check that also patterns are valid.

if (!restartAnnotationIsValid(resource)) {
String message = "Invalid annotation format";
LOGGER.warnCr(reconciliation, message);
conditions.add(StatusUtils.buildWarningCondition("RestartConnector", message));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just put the string in the log and condition even if it's the same. This is anyway what the compiler is going to do. Not sure how much value the message variable can bring here, considering that the two lines using the message are close each other and not spread across the codebase.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/**
* Pattern for validation of MirrorMaker2 restart connector annotation value.
* */
public static final Pattern ANNO_STRIMZI_IO_RESTART_CONNECTOR_ARGS_PATTERN = Pattern
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's just for MirrorMaker 2 maybe we should state it in the name (even just adding MM2 somewhere would be ok for me).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, just added MM2

@rlanhellas
Copy link
Copy Markdown
Contributor Author

Thanks @rlanhellas! I was wondering we need some tests about the new annotation values to check that also patterns are valid.

agree, I'm writing tests to this, will update this PR soon.

@rlanhellas
Copy link
Copy Markdown
Contributor Author

@rlanhellas there are some checkstyle failures within the Azure pipeline.

fixed

Copy link
Copy Markdown
Member

@scholzj scholzj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I left some nits. You should also:

  • fix the title of the PR
  • Add it to the CHANGELOG file
  • Add it to the docs

Comment on lines +70 to +92
/**
* Pattern for validation of restart connector annotation value.
* */
public static final Pattern ANNO_STRIMZI_IO_RESTART_ARGS_PATTERN = Pattern.compile("^includeTasks," +
"onlyFailed$|^onlyFailed,includeTasks$|^includeTasks$|^onlyFailed$|^true$");

/**
* Pattern for validation of MirrorMaker2 restart connector annotation value.
* */
public static final Pattern ANNO_STRIMZI_IO_RESTART_CONNECTOR_MM2_ARGS_PATTERN = Pattern
.compile("^([a-zA-Z0-9-_]+):includeTasks,onlyFailed$|^([a-zA-Z0-9-_]+):onlyFailed,includeTasks$" +
"|^([a-zA-Z0-9-_]+):includeTasks$|^([a-zA-Z0-9-_]+):onlyFailed$|^([a-zA-Z0-9-_]+)$");

/**
* This optional argument can be used to include tasks in the restart connector operation.
* */
public static final String ANNO_STRIMZI_IO_RESTART_INCLUDE_TASKS_ARG = "includeTasks";

/**
* This optional argument can be used to restart connector only failed tasks.
* */
public static final String ANNO_STRIMZI_IO_RESTART_ONLY_FAILED_ARG = "onlyFailed";

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As these are not really annotations, can we keep these internally in the corresponding classes?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

*/
@SuppressWarnings({ "rawtypes" })
@Override
protected boolean restartAnnotationHasOnlyFailedTasksArg(CustomResource resource) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods can probably use the HasMetadata interface instead of CustomResource? The same applies also to the Connect methods.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +772 to +780
if (!restartAnnotationIsValid(resource)) {
LOGGER.warnCr(reconciliation, "Invalid annotation format");
conditions.add(StatusUtils.buildWarningCondition("RestartConnector", "Invalid annotation format"));
return Future.succeededFuture(conditions);
}
boolean restartIncludeTasks = restartAnnotationHasIncludeTasksArg(resource);
boolean restartOnlyFailedTasks = restartAnnotationHasOnlyFailedTasksArg(resource);
LOGGER.infoCr(reconciliation, "Restarting connector {}", connectorName);
return VertxUtil.completableFutureToVertxFuture(apiClient.restart(host, port, connectorName, restartIncludeTasks, restartOnlyFailedTasks))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should probably add some spacing (empty lines) for better readability.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Sep 9, 2025

Codecov Report

❌ Patch coverage is 86.95652% with 6 lines in your changes missing coverage. Please review.
✅ Project coverage is 67.55%. Comparing base (489652d) to head (1f3c544).
⚠️ Report is 16 commits behind head on main.

Files with missing lines Patch % Lines
...or/assembly/KafkaMirrorMaker2AssemblyOperator.java 76.47% 2 Missing and 2 partials ⚠️
...perator/assembly/KafkaConnectAssemblyOperator.java 90.47% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #11797      +/-   ##
============================================
+ Coverage     67.51%   67.55%   +0.03%     
- Complexity     7080     7101      +21     
============================================
  Files           574      574              
  Lines         28134    28181      +47     
  Branches       3187     3201      +14     
============================================
+ Hits          18995    19037      +42     
+ Misses         7824     7820       -4     
- Partials       1315     1324       +9     
Files with missing lines Coverage Δ
...ter/operator/assembly/AbstractConnectOperator.java 78.53% <100.00%> (+0.30%) ⬆️
...perator/assembly/KafkaConnectAssemblyOperator.java 88.02% <90.47%> (+0.12%) ⬆️
...or/assembly/KafkaMirrorMaker2AssemblyOperator.java 84.78% <76.47%> (+4.42%) ⬆️

... and 5 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
@rlanhellas
Copy link
Copy Markdown
Contributor Author

rlanhellas commented Sep 9, 2025

everything is done @scholzj, @ppatierno , if you can take a look again

Comment on lines +88 to +89
private static final Pattern STRIMZI_IO_RESTART_ARGS_PATTERN = Pattern.compile("^includeTasks," +
"onlyFailed$|^onlyFailed,includeTasks$|^includeTasks$|^onlyFailed$|^true$");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems pretty hacky. Maybe you can split it by , into a list and check the values or something?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the validation to if-else statement and removed the regex pattern

Comment on lines +1092 to +1112
abstract boolean restartAnnotationIsValid(CustomResource resource);

/**
* Checks whether the provided resource instance (a KafkaConnector or KafkaMirrorMaker2) has argument includeTasks in restart annotation.
*
* @param resource Resource instance to check
*
* @return True if the provided resource has argument includeTasks in restart annotation. False otherwise.
*/
@SuppressWarnings({ "rawtypes" })
abstract boolean restartAnnotationHasIncludeTasksArg(CustomResource resource);

/**
* Checks whether the provided resource instance (a KafkaConnector or KafkaMirrorMaker2) has argument onlyFailedTasks in restart annotation.
*
* @param resource Resource instance to check
*
* @return True if the provided resource has argument onlyFailedTasks in restart annotation. False otherwise.
*/
@SuppressWarnings({ "rawtypes" })
abstract boolean restartAnnotationHasOnlyFailedTasksArg(HasMetadata resource);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you check here and with the implementations if you can use HasMetadata instead of CustomResource?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to change the implementations, just did it

Comment on lines +71 to +73
private static final Pattern STRIMZI_IO_RESTART_CONNECTOR_MM2_ARGS_PATTERN = Pattern
.compile("^([a-zA-Z0-9-_]+):includeTasks,onlyFailed$|^([a-zA-Z0-9-_]+):onlyFailed,includeTasks$" +
"|^([a-zA-Z0-9-_]+):includeTasks$|^([a-zA-Z0-9-_]+):onlyFailed$|^([a-zA-Z0-9-_]+)$");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as in the other expression. Maybe we can try to simplify it a bit? Parse our the comnand part using the Regexp and than split it rather than including all various combiantions? This does not scale very well.


In addition, whatever the way to parse it is, it might be good to have some unit tests for how it parses the various annotations (both here as well as for connect). With some valid and invalid values etc.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the validation to if-else statement and removed the regex pattern

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, regarding testing invalid args, you can check the tests, I added a test to invalid args

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. It is probably good to test it with invalid value once in the way you have it.

But why don't you just test the methods for validating the annotations / checking the presence of the arguments? You can easily test them with 10 different valid and invalid values without the need for any mocks or anything. That lets you very quickly test all the possibly annotation forms with typos, spaces, etc.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a unit test for both kafka connect and MM2 annotation values with many cases to test the maximum of validations we can. Please take a look again.

Comment thread operator-common/src/main/java/io/strimzi/operator/common/Annotations.java Outdated
rlanhellas and others added 5 commits September 9, 2025 21:19
Co-authored-by: Jakub Scholz <www@scholzj.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
Co-authored-by: Jakub Scholz <www@scholzj.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
…rator/assembly/KafkaMirrorMaker2AssemblyOperatorConnectorRestartTest.java

Co-authored-by: Jakub Scholz <www@scholzj.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
@rlanhellas rlanhellas requested a review from scholzj September 10, 2025 18:13
* @return True if the provided resource has valid restart annotation. False otherwise.
* */
@SuppressWarnings({ "rawtypes" })
abstract boolean restartAnnotationIsValid(CustomResource resource, String connectorName);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HasMetadata here and its implementations as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Co-authored-by: Jakub Scholz <www@scholzj.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
…ka connector and mm2

Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
…ka connector and mm2

Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
@rlanhellas rlanhellas requested a review from scholzj September 13, 2025 13:40
@scholzj scholzj changed the title feat: enhance validation for restart annotations in Kafka connectors Allow configuring additional options when restarting connectors Sep 13, 2025
Copy link
Copy Markdown
Member

@scholzj scholzj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the changes. I left few more comments. But it looks mostly good.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I did not realize how complicated this would be. I wonder if we should make the methods static. But maybe we should leave that for some future refactoring.

In any case, I do not think there is any reason to have this in a separate file. I don't think we need so many separate test classes just for a single feature like this. Just move it to cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectAssemblyOperatorConnectorRestartTest.java maybe?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just moved it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as with the other file. I would move it to cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperatorConnectorRestartTest.java.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done @scholzj

Copy link
Copy Markdown
Member

@scholzj scholzj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two more nits. LGTM otherwise. Thanks for the work an patience.

Comment thread CHANGELOG.md Outdated
@scholzj scholzj modified the milestones: 0.49.0, 0.48.0 Sep 15, 2025
rlanhellas and others added 2 commits September 15, 2025 17:38
Co-authored-by: Jakub Scholz <www@scholzj.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
…rator/assembly/AbstractConnectOperator.java

Co-authored-by: Jakub Scholz <www@scholzj.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
Comment thread CHANGELOG.md Outdated
rlanhellas and others added 3 commits September 16, 2025 08:58
Co-authored-by: Paolo Patierno <paolo.patierno@gmail.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
Co-authored-by: Paolo Patierno <paolo.patierno@gmail.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
Co-authored-by: Paolo Patierno <paolo.patierno@gmail.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
Copy link
Copy Markdown
Member

@ppatierno ppatierno left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks!

Copy link
Copy Markdown
Contributor

@PaulRMellor PaulRMellor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a suggestion on the docs to include a bit more detail on the options and to format with the step

Comment thread documentation/modules/configuring/proc-manual-restart-connector.adoc Outdated
Comment thread documentation/modules/configuring/proc-manual-restart-mirrormaker2-connector.adoc Outdated
rlanhellas and others added 2 commits September 16, 2025 19:16
Co-authored-by: PaulRMellor <47596553+PaulRMellor@users.noreply.github.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
Co-authored-by: PaulRMellor <47596553+PaulRMellor@users.noreply.github.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
Copy link
Copy Markdown
Member

@katheris katheris left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this PR @rlanhellas. The code look great, I just had a couple of test related questions which I added


op.maybeCreateOrUpdateConnector(Reconciliation.DUMMY_RECONCILIATION, "my-connect-host", mockConnectApi, "my-connector", connector.getSpec(), kafkaMirrorMaker2)
.onComplete(context.succeeding(r -> context.verify(() -> {
verify(mockConnectApi, never()).restart(any(), anyInt(), any(), eq(false), eq(false));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here as the Connect tests, I think these arguments matches shouldn't check for eq(false)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Copy Markdown
Contributor

@PaulRMellor PaulRMellor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates

@ppatierno
Copy link
Copy Markdown
Member

@rlanhellas do you have any ETA for the changes requested by Kate? The plan was to cut the 0.48.0 branch today and start the release process for RC1 and we would like to have this one included as planned. Thanks!

@rlanhellas
Copy link
Copy Markdown
Contributor Author

My plan is to have it ready tomorrow morning, may we wait until tomorrow to cut the branch ? @ppatierno

@ppatierno
Copy link
Copy Markdown
Member

My plan is to have it ready tomorrow morning, may we wait until tomorrow to cut the branch ? @ppatierno

@rlanhellas yeah sure no worries. I have got a "little" thing I would like to include in 0.48.0 so there is still time for it. But if you can do by tomorrow would be great, so when I am ready, I can just proceed with the release because yours work is already in.

Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
rlanhellas and others added 3 commits September 22, 2025 07:11
Co-authored-by: Kate Stanley <11195226+katheris@users.noreply.github.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
…rator/assembly/KafkaConnectAssemblyOperatorConnectorRestartTest.java

Co-authored-by: Kate Stanley <11195226+katheris@users.noreply.github.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
Copy link
Copy Markdown
Member

@katheris katheris left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all your work on this @rlanhellas

@ppatierno
Copy link
Copy Markdown
Member

/azp run regression

@azure-pipelines
Copy link
Copy Markdown

Azure Pipelines successfully started running 1 pipeline(s).

@ppatierno ppatierno merged commit 981aa58 into strimzi:main Sep 23, 2025
31 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Restarting connectors and connevtor tasks in Connect and MM2 should be logged at INFO level

6 participants