Skip to content

Commit 85c3896

Browse files
committed
feat: improving connector restart args
Signed-off-by: Ronaldo Lanhellas <ronaldo.lanhellas@gmail.com>
1 parent 5deae11 commit 85c3896

4 files changed

Lines changed: 43 additions & 37 deletions

File tree

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.fasterxml.jackson.databind.ObjectMapper;
88
import io.fabric8.kubernetes.api.model.ConfigMap;
99
import io.fabric8.kubernetes.api.model.DefaultKubernetesResourceList;
10+
import io.fabric8.kubernetes.api.model.HasMetadata;
1011
import io.fabric8.kubernetes.api.model.LocalObjectReference;
1112
import io.fabric8.kubernetes.api.model.ObjectMeta;
1213
import io.fabric8.kubernetes.api.model.ServiceAccount;
@@ -131,6 +132,17 @@ public abstract class AbstractConnectOperator<C extends KubernetesClient, T exte
131132
protected final SharedEnvironmentProvider sharedEnvironmentProvider;
132133
protected final int port;
133134

135+
/**
136+
* This optional argument can be used to include tasks in the restart connector operation.
137+
* */
138+
protected static final String STRIMZI_IO_RESTART_INCLUDE_TASKS_ARG = "includeTasks";
139+
140+
/**
141+
* This optional argument can be used to restart connector only failed tasks.
142+
* */
143+
protected static final String STRIMZI_IO_RESTART_ONLY_FAILED_ARG = "onlyFailed";
144+
145+
134146
/**
135147
* Constructor
136148
*
@@ -746,14 +758,17 @@ private static int nextAutoRestartBackOffIntervalInMinutes(int restartCount)
746758
@SuppressWarnings({ "rawtypes" })
747759
private Future<List<Condition>> maybeRestartConnector(Reconciliation reconciliation, String host, KafkaConnectApi apiClient, String connectorName, CustomResource resource, List<Condition> conditions) {
748760
if (hasRestartAnnotation(resource, connectorName)) {
761+
749762
if (!restartAnnotationIsValid(resource)) {
750763
LOGGER.warnCr(reconciliation, "Invalid annotation format");
751764
conditions.add(StatusUtils.buildWarningCondition("RestartConnector", "Invalid annotation format"));
752765
return Future.succeededFuture(conditions);
753766
}
767+
754768
boolean restartIncludeTasks = restartAnnotationHasIncludeTasksArg(resource);
755769
boolean restartOnlyFailedTasks = restartAnnotationHasOnlyFailedTasksArg(resource);
756770
LOGGER.infoCr(reconciliation, "Restarting connector {}", connectorName);
771+
757772
return VertxUtil.completableFutureToVertxFuture(apiClient.restart(host, port, connectorName, restartIncludeTasks, restartOnlyFailedTasks))
758773
.compose(ignored -> removeRestartAnnotation(reconciliation, resource)
759774
.compose(v -> Future.succeededFuture(conditions)),
@@ -1094,7 +1109,7 @@ private Future<ConnectorStatusAndConditions> updateConnectorTopics(Reconciliatio
10941109
* @return True if the provided resource has argument onlyFailedTasks in restart annotation. False otherwise.
10951110
*/
10961111
@SuppressWarnings({ "rawtypes" })
1097-
abstract boolean restartAnnotationHasOnlyFailedTasksArg(CustomResource resource);
1112+
abstract boolean restartAnnotationHasOnlyFailedTasksArg(HasMetadata resource);
10981113

10991114
/**
11001115
* Returns the ID of the connector task to be restarted from the (a KafkaConnector or KafkaMirrorMaker2) custom resource.

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectAssemblyOperator.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
*/
55
package io.strimzi.operator.cluster.operator.assembly;
66

7+
import io.fabric8.kubernetes.api.model.HasMetadata;
78
import io.fabric8.kubernetes.api.model.LabelSelector;
89
import io.fabric8.kubernetes.api.model.LabelSelectorBuilder;
910
import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
@@ -62,14 +63,12 @@
6263
import java.util.Set;
6364
import java.util.concurrent.atomic.AtomicReference;
6465
import java.util.function.Function;
66+
import java.util.regex.Pattern;
6567
import java.util.stream.Collectors;
6668

6769
import static io.strimzi.api.ResourceAnnotations.ANNO_STRIMZI_IO_CONNECTOR_OFFSETS;
6870
import static io.strimzi.api.ResourceAnnotations.ANNO_STRIMZI_IO_RESTART_TASK;
6971
import static io.strimzi.operator.common.Annotations.ANNO_STRIMZI_IO_RESTART;
70-
import static io.strimzi.operator.common.Annotations.ANNO_STRIMZI_IO_RESTART_ARGS_PATTERN;
71-
import static io.strimzi.operator.common.Annotations.ANNO_STRIMZI_IO_RESTART_INCLUDE_TASKS_ARG;
72-
import static io.strimzi.operator.common.Annotations.ANNO_STRIMZI_IO_RESTART_ONLY_FAILED_ARG;
7372

7473
/**
7574
* <p>Assembly operator for a "Kafka Connect" assembly, which manages:</p>
@@ -83,6 +82,13 @@ public class KafkaConnectAssemblyOperator extends AbstractConnectOperator<Kubern
8382
private final CrdOperator<KubernetesClient, KafkaConnector, KafkaConnectorList> connectorOperator;
8483
private final ConnectBuildOperator connectBuildOperator;
8584

85+
/**
86+
* Pattern for validation of restart connector annotation value.
87+
* */
88+
private static final Pattern STRIMZI_IO_RESTART_ARGS_PATTERN = Pattern.compile("^includeTasks," +
89+
"onlyFailed$|^onlyFailed,includeTasks$|^includeTasks$|^onlyFailed$|^true$");
90+
91+
8692
/**
8793
* Constructor
8894
*
@@ -636,7 +642,7 @@ protected boolean hasRestartAnnotation(CustomResource resource, String connector
636642
@SuppressWarnings({ "rawtypes" })
637643
protected boolean restartAnnotationIsValid(CustomResource resource) {
638644
String restartValue = Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART, "");
639-
return ANNO_STRIMZI_IO_RESTART_ARGS_PATTERN.matcher(restartValue).matches();
645+
return STRIMZI_IO_RESTART_ARGS_PATTERN.matcher(restartValue).matches();
640646
}
641647

642648
/**
@@ -650,7 +656,7 @@ protected boolean restartAnnotationIsValid(CustomResource resource) {
650656
@Override
651657
protected boolean restartAnnotationHasIncludeTasksArg(CustomResource resource) {
652658
return Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART, "")
653-
.contains(ANNO_STRIMZI_IO_RESTART_INCLUDE_TASKS_ARG);
659+
.contains(STRIMZI_IO_RESTART_INCLUDE_TASKS_ARG);
654660
}
655661

656662

@@ -663,9 +669,9 @@ protected boolean restartAnnotationHasIncludeTasksArg(CustomResource resource) {
663669
*/
664670
@SuppressWarnings({ "rawtypes" })
665671
@Override
666-
protected boolean restartAnnotationHasOnlyFailedTasksArg(CustomResource resource) {
672+
protected boolean restartAnnotationHasOnlyFailedTasksArg(HasMetadata resource) {
667673
return Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART, "")
668-
.contains(ANNO_STRIMZI_IO_RESTART_ONLY_FAILED_ARG);
674+
.contains(STRIMZI_IO_RESTART_ONLY_FAILED_ARG);
669675
}
670676

671677
/**

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperator.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
*/
55
package io.strimzi.operator.cluster.operator.assembly;
66

7+
import io.fabric8.kubernetes.api.model.HasMetadata;
78
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
89
import io.fabric8.kubernetes.client.CustomResource;
910
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -42,18 +43,16 @@
4243
import java.util.concurrent.atomic.AtomicReference;
4344
import java.util.function.Function;
4445
import java.util.regex.Matcher;
46+
import java.util.regex.Pattern;
4547
import java.util.stream.Collectors;
4648

4749
import static io.strimzi.api.ResourceAnnotations.ANNO_STRIMZI_IO_CONNECTOR_OFFSETS;
4850
import static io.strimzi.api.ResourceAnnotations.ANNO_STRIMZI_IO_MIRRORMAKER_CONNECTOR;
4951
import static io.strimzi.operator.common.Annotations.ANNO_STRIMZI_IO_RESTART_CONNECTOR;
50-
import static io.strimzi.operator.common.Annotations.ANNO_STRIMZI_IO_RESTART_CONNECTOR_MM2_ARGS_PATTERN;
5152
import static io.strimzi.operator.common.Annotations.ANNO_STRIMZI_IO_RESTART_CONNECTOR_TASK;
5253
import static io.strimzi.operator.common.Annotations.ANNO_STRIMZI_IO_RESTART_CONNECTOR_TASK_PATTERN;
5354
import static io.strimzi.operator.common.Annotations.ANNO_STRIMZI_IO_RESTART_CONNECTOR_TASK_PATTERN_CONNECTOR;
5455
import static io.strimzi.operator.common.Annotations.ANNO_STRIMZI_IO_RESTART_CONNECTOR_TASK_PATTERN_TASK;
55-
import static io.strimzi.operator.common.Annotations.ANNO_STRIMZI_IO_RESTART_INCLUDE_TASKS_ARG;
56-
import static io.strimzi.operator.common.Annotations.ANNO_STRIMZI_IO_RESTART_ONLY_FAILED_ARG;
5756
import static java.util.Collections.emptyMap;
5857

5958
/**
@@ -66,6 +65,14 @@
6665
public class KafkaMirrorMaker2AssemblyOperator extends AbstractConnectOperator<KubernetesClient, KafkaMirrorMaker2, KafkaMirrorMaker2List, KafkaMirrorMaker2Spec, KafkaMirrorMaker2Status> {
6766
private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaMirrorMaker2AssemblyOperator.class.getName());
6867

68+
/**
69+
* Pattern for validation of MirrorMaker2 restart connector annotation value.
70+
* */
71+
private static final Pattern STRIMZI_IO_RESTART_CONNECTOR_MM2_ARGS_PATTERN = Pattern
72+
.compile("^([a-zA-Z0-9-_]+):includeTasks,onlyFailed$|^([a-zA-Z0-9-_]+):onlyFailed,includeTasks$" +
73+
"|^([a-zA-Z0-9-_]+):includeTasks$|^([a-zA-Z0-9-_]+):onlyFailed$|^([a-zA-Z0-9-_]+)$");
74+
75+
6976
/**
7077
* Constructor
7178
*
@@ -332,7 +339,7 @@ protected boolean hasRestartAnnotation(CustomResource resource, String connector
332339
@SuppressWarnings({ "rawtypes" })
333340
protected boolean restartAnnotationIsValid(CustomResource resource) {
334341
String restartValue = Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART_CONNECTOR, "");
335-
return ANNO_STRIMZI_IO_RESTART_CONNECTOR_MM2_ARGS_PATTERN.matcher(restartValue).matches();
342+
return STRIMZI_IO_RESTART_CONNECTOR_MM2_ARGS_PATTERN.matcher(restartValue).matches();
336343
}
337344

338345
/**
@@ -346,7 +353,7 @@ protected boolean restartAnnotationIsValid(CustomResource resource) {
346353
@Override
347354
protected boolean restartAnnotationHasIncludeTasksArg(CustomResource resource) {
348355
return Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART_CONNECTOR, "")
349-
.contains(ANNO_STRIMZI_IO_RESTART_INCLUDE_TASKS_ARG);
356+
.contains(STRIMZI_IO_RESTART_INCLUDE_TASKS_ARG);
350357
}
351358

352359

@@ -359,9 +366,9 @@ protected boolean restartAnnotationHasIncludeTasksArg(CustomResource resource) {
359366
*/
360367
@SuppressWarnings({ "rawtypes" })
361368
@Override
362-
protected boolean restartAnnotationHasOnlyFailedTasksArg(CustomResource resource) {
369+
protected boolean restartAnnotationHasOnlyFailedTasksArg(HasMetadata resource) {
363370
return Annotations.stringAnnotation(resource, ANNO_STRIMZI_IO_RESTART_CONNECTOR, "")
364-
.contains(ANNO_STRIMZI_IO_RESTART_ONLY_FAILED_ARG);
371+
.contains(STRIMZI_IO_RESTART_ONLY_FAILED_ARG);
365372
}
366373

367374
/**

operator-common/src/main/java/io/strimzi/operator/common/Annotations.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -61,28 +61,6 @@ public class Annotations extends ResourceAnnotations {
6161
ANNO_STRIMZI_IO_RESTART_CONNECTOR_TASK_PATTERN_TASK +
6262
">\\d+)$");
6363

64-
/**
65-
* Pattern for validation of restart connector annotation value.
66-
* */
67-
public static final Pattern ANNO_STRIMZI_IO_RESTART_ARGS_PATTERN = Pattern.compile("^includeTasks," +
68-
"onlyFailed$|^onlyFailed,includeTasks$|^includeTasks$|^onlyFailed$|^true$");
69-
70-
/**
71-
* Pattern for validation of MirrorMaker2 restart connector annotation value.
72-
* */
73-
public static final Pattern ANNO_STRIMZI_IO_RESTART_CONNECTOR_MM2_ARGS_PATTERN = Pattern
74-
.compile("^([a-zA-Z0-9-_]+):includeTasks,onlyFailed$|^([a-zA-Z0-9-_]+):onlyFailed,includeTasks$" +
75-
"|^([a-zA-Z0-9-_]+):includeTasks$|^([a-zA-Z0-9-_]+):onlyFailed$|^([a-zA-Z0-9-_]+)$");
76-
77-
/**
78-
* This optional argument can be used to include tasks in the restart connector operation.
79-
* */
80-
public static final String ANNO_STRIMZI_IO_RESTART_INCLUDE_TASKS_ARG = "includeTasks";
81-
82-
/**
83-
* This optional argument can be used to restart connector only failed tasks.
84-
* */
85-
public static final String ANNO_STRIMZI_IO_RESTART_ONLY_FAILED_ARG = "onlyFailed";
8664

8765
/**
8866
* Annotation on PVCs storing the original configuration. It is used to revert any illegal changes.

0 commit comments

Comments
 (0)