Skip to content

Commit 1f8cbb1

Browse files
authored
Stop using the Connector pause field (#12488)
Signed-off-by: Jakub Scholz <www@scholzj.com>
1 parent 29c4e18 commit 1f8cbb1

5 files changed

Lines changed: 2 additions & 210 deletions

File tree

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,15 +115,13 @@ public List<KafkaConnector> generateConnectorDefinitions() {
115115
KafkaMirrorMaker2ConnectorSpec mm2ConnectorSpec = connectorType.getValue().apply(mirror);
116116

117117
if (mm2ConnectorSpec != null) {
118-
@SuppressWarnings("deprecation") // getPause() is deprecated
119118
KafkaConnector connector = new KafkaConnectorBuilder()
120119
.withNewMetadata()
121120
.withName(mirror.getSource().getAlias() + "->" + target.getAlias() + connectorType.getKey())
122121
.endMetadata()
123122
.withNewSpec()
124123
.withClassName(CONNECTOR_JAVA_PACKAGE + connectorType.getKey())
125124
.withConfig(prepareMirrorMaker2ConnectorConfig(mirror, mm2ConnectorSpec))
126-
.withPause(mm2ConnectorSpec.getPause())
127125
.withState(mm2ConnectorSpec.getState())
128126
.withAutoRestart(mm2ConnectorSpec.getAutoRestart())
129127
.withTasksMax(mm2ConnectorSpec.getTasksMax())

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -508,18 +508,9 @@ private Future<List<Condition>> updateState(Reconciliation reconciliation, Strin
508508
if (!(path instanceof String state)) {
509509
return Future.failedFuture("JSON response lacked $.connector.state");
510510
} else {
511-
ConnectorState desiredState = connectorSpec.getState();
512-
@SuppressWarnings("deprecation")
513-
Boolean shouldPause = connectorSpec.getPause();
514-
ConnectorState targetState = desiredState != null ? desiredState :
515-
Boolean.TRUE.equals(shouldPause) ? ConnectorState.PAUSED : ConnectorState.RUNNING;
516-
if (desiredState != null && shouldPause != null) {
517-
String message = "Both pause and state are set. Since pause is deprecated, state takes precedence " +
518-
"so the connector will be " + targetState.toValue();
519-
LOGGER.warnCr(reconciliation, message);
520-
conditions.add(StatusUtils.buildWarningCondition("UpdateState", message));
521-
}
511+
ConnectorState targetState = connectorSpec.getState();
522512
Future<Void> future = Future.succeededFuture();
513+
523514
switch (state) {
524515
case "RUNNING" -> {
525516
if (targetState == ConnectorState.PAUSED) {

cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ConnectorsTest.java

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package io.strimzi.operator.cluster.model;
66

77
import io.strimzi.api.kafka.model.common.CertSecretSourceBuilder;
8-
import io.strimzi.api.kafka.model.common.ConnectorState;
98
import io.strimzi.api.kafka.model.common.metrics.StrimziMetricsReporterBuilder;
109
import io.strimzi.api.kafka.model.connector.KafkaConnector;
1110
import io.strimzi.api.kafka.model.mirrormaker2.KafkaMirrorMaker2;
@@ -31,7 +30,6 @@
3130
import static org.hamcrest.CoreMatchers.nullValue;
3231
import static org.hamcrest.MatcherAssert.assertThat;
3332

34-
@SuppressWarnings("deprecation") // Uses deprecated getPause() field in tests
3533
public class KafkaMirrorMaker2ConnectorsTest {
3634
private static final String PREFIX = "prefix.";
3735

@@ -103,23 +101,20 @@ public void testConnectors() {
103101
assertThat(kc.getMetadata().getName(), is("source->target.MirrorSourceConnector"));
104102
assertThat(kc.getSpec().getClassName(), is("org.apache.kafka.connect.mirror.MirrorSourceConnector"));
105103
assertThat(kc.getSpec().getTasksMax(), is(5));
106-
assertThat(kc.getSpec().getPause(), is(nullValue()));
107104
assertThat(kc.getSpec().getState(), is(nullValue()));
108105
assertThat(kc.getSpec().getConfig(), is(expectedSource));
109106

110107
kc = kcs.stream().filter(k -> k.getMetadata().getName().contains("source->target.MirrorCheckpointConnector")).findFirst().orElseThrow();
111108
assertThat(kc.getMetadata().getName(), is("source->target.MirrorCheckpointConnector"));
112109
assertThat(kc.getSpec().getClassName(), is("org.apache.kafka.connect.mirror.MirrorCheckpointConnector"));
113110
assertThat(kc.getSpec().getTasksMax(), is(3));
114-
assertThat(kc.getSpec().getPause(), is(nullValue()));
115111
assertThat(kc.getSpec().getState(), is(nullValue()));
116112
assertThat(kc.getSpec().getConfig(), is(expectedCheckpoint));
117113

118114
kc = kcs.stream().filter(k -> k.getMetadata().getName().contains("source->target.MirrorHeartbeatConnector")).findFirst().orElseThrow();
119115
assertThat(kc.getMetadata().getName(), is("source->target.MirrorHeartbeatConnector"));
120116
assertThat(kc.getSpec().getClassName(), is("org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"));
121117
assertThat(kc.getSpec().getTasksMax(), is(1));
122-
assertThat(kc.getSpec().getPause(), is(nullValue()));
123118
assertThat(kc.getSpec().getState(), is(nullValue()));
124119
assertThat(kc.getSpec().getConfig(), is(expectedAll));
125120
}
@@ -160,7 +155,6 @@ public void testOverridingSourceAndTargetConfiguration() {
160155
assertThat(kc.getMetadata().getName(), is("source->target.MirrorSourceConnector"));
161156
assertThat(kc.getSpec().getClassName(), is("org.apache.kafka.connect.mirror.MirrorSourceConnector"));
162157
assertThat(kc.getSpec().getTasksMax(), is(nullValue()));
163-
assertThat(kc.getSpec().getPause(), is(nullValue()));
164158
assertThat(kc.getSpec().getState(), is(nullValue()));
165159
assertThat(kc.getSpec().getConfig(), is(expectedSource));
166160
}
@@ -234,47 +228,41 @@ public void testConnectorsWithMultipleSources() {
234228
assertThat(kc.getMetadata().getName(), is("source->target.MirrorSourceConnector"));
235229
assertThat(kc.getSpec().getClassName(), is("org.apache.kafka.connect.mirror.MirrorSourceConnector"));
236230
assertThat(kc.getSpec().getTasksMax(), is(5));
237-
assertThat(kc.getSpec().getPause(), is(nullValue()));
238231
assertThat(kc.getSpec().getState(), is(nullValue()));
239232
assertThat(kc.getSpec().getConfig(), is(expectedSource));
240233

241234
kc = kcs.stream().filter(k -> k.getMetadata().getName().contains("source->target.MirrorCheckpointConnector")).findFirst().orElseThrow();
242235
assertThat(kc.getMetadata().getName(), is("source->target.MirrorCheckpointConnector"));
243236
assertThat(kc.getSpec().getClassName(), is("org.apache.kafka.connect.mirror.MirrorCheckpointConnector"));
244237
assertThat(kc.getSpec().getTasksMax(), is(3));
245-
assertThat(kc.getSpec().getPause(), is(nullValue()));
246238
assertThat(kc.getSpec().getState(), is(nullValue()));
247239
assertThat(kc.getSpec().getConfig(), is(expectedCheckpoint));
248240

249241
kc = kcs.stream().filter(k -> k.getMetadata().getName().contains("source->target.MirrorHeartbeatConnector")).findFirst().orElseThrow();
250242
assertThat(kc.getMetadata().getName(), is("source->target.MirrorHeartbeatConnector"));
251243
assertThat(kc.getSpec().getClassName(), is("org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"));
252244
assertThat(kc.getSpec().getTasksMax(), is(1));
253-
assertThat(kc.getSpec().getPause(), is(nullValue()));
254245
assertThat(kc.getSpec().getState(), is(nullValue()));
255246
assertThat(kc.getSpec().getConfig(), is(expectedAll));
256247

257248
kc = kcs.stream().filter(k -> k.getMetadata().getName().contains("other-source->target.MirrorSourceConnector")).findFirst().orElseThrow();
258249
assertThat(kc.getMetadata().getName(), is("other-source->target.MirrorSourceConnector"));
259250
assertThat(kc.getSpec().getClassName(), is("org.apache.kafka.connect.mirror.MirrorSourceConnector"));
260251
assertThat(kc.getSpec().getTasksMax(), is(15));
261-
assertThat(kc.getSpec().getPause(), is(nullValue()));
262252
assertThat(kc.getSpec().getState(), is(nullValue()));
263253
assertThat(kc.getSpec().getConfig(), is(expectedSource2));
264254

265255
kc = kcs.stream().filter(k -> k.getMetadata().getName().contains("other-source->target.MirrorCheckpointConnector")).findFirst().orElseThrow();
266256
assertThat(kc.getMetadata().getName(), is("other-source->target.MirrorCheckpointConnector"));
267257
assertThat(kc.getSpec().getClassName(), is("org.apache.kafka.connect.mirror.MirrorCheckpointConnector"));
268258
assertThat(kc.getSpec().getTasksMax(), is(13));
269-
assertThat(kc.getSpec().getPause(), is(nullValue()));
270259
assertThat(kc.getSpec().getState(), is(nullValue()));
271260
assertThat(kc.getSpec().getConfig(), is(expectedCheckpoint2));
272261

273262
kc = kcs.stream().filter(k -> k.getMetadata().getName().contains("other-source->target.MirrorHeartbeatConnector")).findFirst().orElseThrow();
274263
assertThat(kc.getMetadata().getName(), is("other-source->target.MirrorHeartbeatConnector"));
275264
assertThat(kc.getSpec().getClassName(), is("org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"));
276265
assertThat(kc.getSpec().getTasksMax(), is(11));
277-
assertThat(kc.getSpec().getPause(), is(nullValue()));
278266
assertThat(kc.getSpec().getState(), is(nullValue()));
279267
assertThat(kc.getSpec().getConfig(), is(expectedAll2));
280268
}
@@ -303,49 +291,6 @@ public void testConnectorsOnlySome() {
303291
assertThat(kc.getSpec().getClassName(), is("org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"));
304292
}
305293

306-
@Test
307-
public void testConnectorsPauseState() {
308-
KafkaMirrorMaker2 kmm2 = new KafkaMirrorMaker2Builder(KMM2)
309-
.editSpec()
310-
.editMirror(0)
311-
.editSourceConnector()
312-
.withState(ConnectorState.PAUSED)
313-
.endSourceConnector()
314-
.editCheckpointConnector()
315-
.withPause(true)
316-
.endCheckpointConnector()
317-
.editHeartbeatConnector()
318-
.withState(ConnectorState.STOPPED)
319-
.withPause(true)
320-
.endHeartbeatConnector()
321-
.endMirror()
322-
.endSpec()
323-
.build();
324-
325-
KafkaMirrorMaker2Connectors connectors = KafkaMirrorMaker2Connectors.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kmm2);
326-
List<KafkaConnector> kcs = connectors.generateConnectorDefinitions();
327-
328-
assertThat(kcs.size(), is(3));
329-
330-
KafkaConnector kc = kcs.stream().filter(k -> k.getMetadata().getName().contains("source->target.MirrorSourceConnector")).findFirst().orElseThrow();
331-
assertThat(kc.getMetadata().getName(), is("source->target.MirrorSourceConnector"));
332-
assertThat(kc.getSpec().getClassName(), is("org.apache.kafka.connect.mirror.MirrorSourceConnector"));
333-
assertThat(kc.getSpec().getPause(), is(nullValue()));
334-
assertThat(kc.getSpec().getState(), is(ConnectorState.PAUSED));
335-
336-
kc = kcs.stream().filter(k -> k.getMetadata().getName().contains("source->target.MirrorCheckpointConnector")).findFirst().orElseThrow();
337-
assertThat(kc.getMetadata().getName(), is("source->target.MirrorCheckpointConnector"));
338-
assertThat(kc.getSpec().getClassName(), is("org.apache.kafka.connect.mirror.MirrorCheckpointConnector"));
339-
assertThat(kc.getSpec().getPause(), is(true));
340-
assertThat(kc.getSpec().getState(), is(nullValue()));
341-
342-
kc = kcs.stream().filter(k -> k.getMetadata().getName().contains("source->target.MirrorHeartbeatConnector")).findFirst().orElseThrow();
343-
assertThat(kc.getMetadata().getName(), is("source->target.MirrorHeartbeatConnector"));
344-
assertThat(kc.getSpec().getClassName(), is("org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"));
345-
assertThat(kc.getSpec().getPause(), is(true));
346-
assertThat(kc.getSpec().getState(), is(ConnectorState.STOPPED));
347-
}
348-
349294
@Test
350295
public void testConnectorsWithAutoRestart() {
351296
KafkaMirrorMaker2 kmm2 = new KafkaMirrorMaker2Builder(KMM2)

cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/ConnectorMockTest.java

Lines changed: 0 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import io.strimzi.operator.common.metrics.MetricsHolder;
4848
import io.strimzi.operator.common.model.Labels;
4949
import io.strimzi.platform.KubernetesVersion;
50-
import io.strimzi.test.ReadWriteUtils;
5150
import io.strimzi.test.mockkube3.MockKube3;
5251
import io.vertx.core.Future;
5352
import io.vertx.core.Promise;
@@ -1109,95 +1108,6 @@ public void testConnectorNotReadyWhenExceptionFromConnectRestApi() {
11091108
assertThat(connectors.keySet(), is(empty()));
11101109
}
11111110

1112-
/** Create connect, create connector, pause connector via deprecated pause field, resume connector */
1113-
@Test
1114-
public void testConnectorDeprecatedPauseResume() {
1115-
String connectName = "cluster";
1116-
String connectorName = "connector";
1117-
1118-
// Create KafkaConnect cluster and wait till it's ready
1119-
KafkaConnect connect = new KafkaConnectBuilder()
1120-
.withNewMetadata()
1121-
.withNamespace(namespace)
1122-
.withName(connectName)
1123-
.addToAnnotations(Annotations.STRIMZI_IO_USE_CONNECTOR_RESOURCES, "true")
1124-
.endMetadata()
1125-
.withNewSpec()
1126-
.withReplicas(1)
1127-
.withBootstrapServers("my-kafka:9092")
1128-
.endSpec()
1129-
.build();
1130-
Crds.kafkaConnectOperation(client).inNamespace(namespace).resource(connect).create();
1131-
waitForConnectReady(connectName);
1132-
1133-
// could be triggered twice (creation followed by status update) but waitForConnectReady could be satisfied with single
1134-
verify(api, atLeastOnce()).list(any(),
1135-
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT));
1136-
verify(api, never()).createOrUpdatePutRequest(any(),
1137-
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT),
1138-
eq(connectorName), any());
1139-
1140-
// Create KafkaConnector and wait till it's ready
1141-
KafkaConnector connector = new KafkaConnectorBuilder()
1142-
.withNewMetadata()
1143-
.withName(connectorName)
1144-
.withNamespace(namespace)
1145-
.addToLabels(Labels.STRIMZI_CLUSTER_LABEL, connectName)
1146-
.endMetadata()
1147-
.withNewSpec()
1148-
.withTasksMax(1)
1149-
.withClassName("Dummy")
1150-
.endSpec()
1151-
.build();
1152-
Crds.kafkaConnectorOperation(client).inNamespace(namespace).resource(connector).create();
1153-
waitForConnectorReady(connectorName);
1154-
waitForConnectorState(connectorName, "RUNNING");
1155-
1156-
verify(api, times(2)).list(any(),
1157-
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT));
1158-
verify(api, times(1)).createOrUpdatePutRequest(any(),
1159-
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT),
1160-
eq(connectorName), any());
1161-
assertThat(connectors.keySet(), is(Collections.singleton(key("cluster-connect-api.testconnectordeprecatedpauseresume.svc", connectorName))));
1162-
1163-
verify(api, never()).pause(any(),
1164-
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT),
1165-
eq(connectorName));
1166-
verify(api, never()).resume(any(),
1167-
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT),
1168-
eq(connectorName));
1169-
1170-
Crds.kafkaConnectorOperation(client).inNamespace(namespace).withName(connectorName).edit(spec -> new KafkaConnectorBuilder(spec)
1171-
.editSpec()
1172-
.withPause(true)
1173-
.endSpec()
1174-
.build());
1175-
1176-
waitForConnectorState(connectorName, "PAUSED");
1177-
1178-
verify(api, times(1)).pause(any(),
1179-
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT),
1180-
eq(connectorName));
1181-
verify(api, never()).resume(any(),
1182-
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT),
1183-
eq(connectorName));
1184-
1185-
Crds.kafkaConnectorOperation(client).inNamespace(namespace).withName(connectorName).edit(sp -> new KafkaConnectorBuilder(sp)
1186-
.editSpec()
1187-
.withPause(false)
1188-
.endSpec()
1189-
.build());
1190-
1191-
waitForConnectorState(connectorName, "RUNNING");
1192-
1193-
verify(api, times(1)).pause(any(),
1194-
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT),
1195-
eq(connectorName));
1196-
verify(api, times(1)).resume(any(),
1197-
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT),
1198-
eq(connectorName));
1199-
}
1200-
12011111
/** Create connect, create connector, pause connector, resume connector */
12021112
@Test
12031113
public void testConnectorPauseResume() {
@@ -1376,52 +1286,6 @@ public void testConnectorStopResume() {
13761286
eq(connectorName));
13771287
}
13781288

1379-
/** Create connect, create connector, stop connector but Connect does not support it */
1380-
@Test
1381-
public void testConnectorBothStateAndPause() {
1382-
String connectName = "cluster";
1383-
String connectorName = "connector";
1384-
1385-
// Create KafkaConnect cluster and wait till it's ready
1386-
Crds.kafkaConnectOperation(client).inNamespace(namespace).resource(new KafkaConnectBuilder()
1387-
.withNewMetadata()
1388-
.withNamespace(namespace)
1389-
.withName(connectName)
1390-
.addToAnnotations(Annotations.STRIMZI_IO_USE_CONNECTOR_RESOURCES, "true")
1391-
.endMetadata()
1392-
.withNewSpec()
1393-
.withReplicas(1)
1394-
.withBootstrapServers("my-kafka:9092")
1395-
.endSpec()
1396-
.build())
1397-
.create();
1398-
waitForConnectReady(connectName);
1399-
1400-
String yaml = "apiVersion: kafka.strimzi.io/v1beta2\n" +
1401-
"kind: KafkaConnector\n" +
1402-
"metadata:\n" +
1403-
" name: " + connectorName + "\n" +
1404-
" namespace: " + namespace + "\n" +
1405-
" labels:\n" +
1406-
" strimzi.io/cluster: " + connectName + "\n" +
1407-
"spec:\n" +
1408-
" class: EchoSink\n" +
1409-
" tasksMax: 1\n" +
1410-
" pause: true\n" +
1411-
" state: \"stopped\"\n" +
1412-
" config:\n" +
1413-
" level: INFO\n" +
1414-
" topics: timer-topic";
1415-
1416-
KafkaConnector kcr = ReadWriteUtils.readObjectFromYamlString(yaml, KafkaConnector.class);
1417-
Crds.kafkaConnectorOperation(client).inNamespace(namespace).resource(kcr).create();
1418-
1419-
waitForConnectorReady(connectorName);
1420-
waitForConnectorState(connectorName, "STOPPED");
1421-
waitForConnectorCondition(connectorName, "Warning", "DeprecatedFields");
1422-
waitForConnectorCondition(connectorName, "Warning", "UpdateState");
1423-
}
1424-
14251289
/** Create connect, create connector, restart connector */
14261290
@Test
14271291
public void testConnectorRestart() {

systemtest/src/test/java/io/strimzi/systemtest/connect/ConnectST.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -235,12 +235,6 @@ void testKafkaConnectAndConnectorStateWithFileSinkPlugin() {
235235
.endSpec()
236236
.build());
237237

238-
// TODO in future releases can be removed completely as 'spec.pause' property is becoming deprecated.
239-
LOGGER.info("Verify pausing and running KafkaConnector, by setting 'spec.pause' property to 'true' and 'false'");
240-
verifySinkConnectorByBlockAndUnblock(testStorage, connectPodName,
241-
connector -> connector.getSpec().setPause(true),
242-
connector -> connector.getSpec().setPause(false));
243-
244238
LOGGER.info("Verify stopping and running KafkaConnector, by setting 'spec.state' property to '' and false");
245239
verifySinkConnectorByBlockAndUnblock(testStorage, connectPodName,
246240
connector -> connector.getSpec().setState(ConnectorState.STOPPED),

0 commit comments

Comments
 (0)