Skip to content

Commit 4836c7d

Browse files
committed
Fix default Kafka Connetor state (#12666)
Signed-off-by: Jakub Scholz <www@scholzj.com>
1 parent 34ca797 commit 4836c7d

2 files changed

Lines changed: 94 additions & 1 deletion

File tree

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ private Future<List<Condition>> updateState(Reconciliation reconciliation, Strin
479479
if (!(path instanceof String state)) {
480480
return Future.failedFuture("JSON response lacked $.connector.state");
481481
} else {
482-
ConnectorState targetState = connectorSpec.getState();
482+
ConnectorState targetState = connectorSpec.getState() != null ? connectorSpec.getState() : ConnectorState.RUNNING;
483483
Future<Void> future = Future.succeededFuture();
484484

485485
switch (state) {

cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/ConnectorMockTest.java

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1282,6 +1282,99 @@ public void testConnectorStopResume() {
12821282
eq(connectorName));
12831283
}
12841284

1285+
/** Create connect, create connector, stop connector, resume connector by removing the state */
1286+
@Test
1287+
public void testConnectorStopResumeByUnset() {
1288+
String connectName = "cluster";
1289+
String connectorName = "connector";
1290+
1291+
// Create KafkaConnect cluster and wait till it's ready
1292+
KafkaConnect connect = new KafkaConnectBuilder()
1293+
.withNewMetadata()
1294+
.withNamespace(namespace)
1295+
.withName(connectName)
1296+
.addToAnnotations(Annotations.STRIMZI_IO_USE_CONNECTOR_RESOURCES, "true")
1297+
.endMetadata()
1298+
.withNewSpec()
1299+
.withReplicas(1)
1300+
.withBootstrapServers("my-kafka:9092")
1301+
.withGroupId("my-group")
1302+
.withConfigStorageTopic("my-config-topic")
1303+
.withOffsetStorageTopic("my-offset-topic")
1304+
.withStatusStorageTopic("my-status-topic")
1305+
.endSpec()
1306+
.build();
1307+
Crds.kafkaConnectOperation(client).inNamespace(namespace).resource(connect).create();
1308+
waitForConnectReady(connectName);
1309+
1310+
// could be triggered twice (creation followed by status update) but waitForConnectReady could be satisfied with single
1311+
verify(api, atLeastOnce()).list(any(),
1312+
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT));
1313+
verify(api, never()).createOrUpdatePutRequest(any(),
1314+
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT),
1315+
eq(connectorName), any());
1316+
1317+
// Create KafkaConnector and wait till it's ready
1318+
KafkaConnector connector = new KafkaConnectorBuilder()
1319+
.withNewMetadata()
1320+
.withName(connectorName)
1321+
.withNamespace(namespace)
1322+
.addToLabels(Labels.STRIMZI_CLUSTER_LABEL, connectName)
1323+
.endMetadata()
1324+
.withNewSpec()
1325+
.withTasksMax(1)
1326+
.withClassName("Dummy")
1327+
.endSpec()
1328+
.build();
1329+
Crds.kafkaConnectorOperation(client).inNamespace(namespace).resource(connector).create();
1330+
waitForConnectorReady(connectorName);
1331+
waitForConnectorState(connectorName, "RUNNING");
1332+
1333+
verify(api, times(2)).list(any(),
1334+
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT));
1335+
verify(api, times(1)).createOrUpdatePutRequest(any(),
1336+
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT),
1337+
eq(connectorName), any());
1338+
assertThat(connectors.keySet(), is(Collections.singleton(key("cluster-connect-api.testconnectorstopresumebyunset.svc", connectorName))));
1339+
1340+
verify(api, never()).stop(any(),
1341+
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT),
1342+
eq(connectorName));
1343+
verify(api, never()).resume(any(),
1344+
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT),
1345+
eq(connectorName));
1346+
1347+
Crds.kafkaConnectorOperation(client).inNamespace(namespace).withName(connectorName).edit(spec -> new KafkaConnectorBuilder(spec)
1348+
.editSpec()
1349+
.withState(ConnectorState.STOPPED)
1350+
.endSpec()
1351+
.build());
1352+
1353+
waitForConnectorState(connectorName, "STOPPED");
1354+
1355+
verify(api, times(1)).stop(any(),
1356+
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT),
1357+
eq(connectorName));
1358+
verify(api, never()).resume(any(),
1359+
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT),
1360+
eq(connectorName));
1361+
1362+
Crds.kafkaConnectorOperation(client).inNamespace(namespace).withName(connectorName).edit(sp -> new KafkaConnectorBuilder(sp)
1363+
.editSpec()
1364+
.withState(null)
1365+
.endSpec()
1366+
.build());
1367+
1368+
waitForConnectorState(connectorName, "RUNNING");
1369+
1370+
verify(api, times(1)).stop(any(),
1371+
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT),
1372+
eq(connectorName));
1373+
verify(api, times(1)).resume(any(),
1374+
eq(KafkaConnectResources.qualifiedServiceName(connectName, namespace)), eq(KafkaConnectCluster.REST_API_PORT),
1375+
eq(connectorName));
1376+
}
1377+
12851378
/** Create connect, create connector, restart connector */
12861379
@Test
12871380
public void testConnectorRestart() {

0 commit comments

Comments
 (0)