diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilder.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilder.java
new file mode 100644
index 000000000000..1053ed414845
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilder.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.helix;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.exception.ZkException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.ZooDefs;
+
+
+/**
+ * Fluent builder for an atomic ZooKeeper {@code multi()} transaction over Helix property-store
+ * paths. Accumulates ops via {@link #set}/{@link #create}/{@link #delete}/{@link #check} and
+ * submits them as a single all-or-nothing batch on {@link #execute()}.
+ *
+ *
Paths passed to op methods are property-store-relative — i.e. the same path you would pass to
+ * {@code ZkHelixPropertyStore.set(...)}, e.g. {@code /SEGMENTS/{table}/{segment}}. The builder
+ * prepends the configured property-store root (e.g. {@code /{cluster}/PROPERTYSTORE}) before
+ * submitting to ZK. Multi-path writes outside the property store are intentionally not supported.
+ *
+ *
On atomic rollback (e.g. version mismatch, node missing, node already exists), {@link #execute()}
+ * throws the underlying {@link KeeperException} subtype ({@code BadVersionException},
+ * {@code NoNodeException}, {@code NodeExistsException}, ...). Callers branch on the subtype to
+ * distinguish retryable concurrent-state changes from hard errors. Per-op offender info is reachable
+ * via {@link KeeperException#getResults()}.
+ *
+ *
Connectivity / session failures (timeout, interrupt, session expiry) are not atomic outcomes
+ * and propagate as the original {@link ZkException}.
+ *
+ *
Single-use: each instance can be executed at most once. Obtain a fresh builder per transaction
+ * (typically via {@code PinotHelixResourceManager.multiWriteZK()}).
+ *
+ *
Not thread-safe: instance state ({@code _ops}, {@code _executed}) is mutated by every fluent
+ * call. A single builder must not be shared across threads; use a fresh builder per thread.
+ */
+public final class ZkMultiWriteBuilder {
+
+ /** Pass as {@code expectedVersion} to skip the version check on a {@link #set}/{@link #delete}. */
+ public static final int ANY_VERSION = -1;
+
+ private final ZkClient _zkClient;
+ private final String _propertyStoreRoot;
+ private final List _ops = new ArrayList<>();
+ private boolean _executed;
+
+ /**
+ * @param zkClient ZK client used to serialize records and submit the transaction.
+ * @param propertyStoreRoot absolute ZK path that all op paths are prefixed with (typically
+ * {@code /{clusterName}/PROPERTYSTORE}). Must start with {@code /} and not end with {@code /}.
+ * Pass {@code ""} to operate on raw absolute paths (test-only).
+ */
+ public ZkMultiWriteBuilder(ZkClient zkClient, String propertyStoreRoot) {
+ _zkClient = Preconditions.checkNotNull(zkClient, "zkClient");
+ Preconditions.checkNotNull(propertyStoreRoot, "propertyStoreRoot");
+ Preconditions.checkArgument(
+ propertyStoreRoot.isEmpty() || (propertyStoreRoot.startsWith("/") && !propertyStoreRoot.endsWith("/")),
+ "propertyStoreRoot must be empty or start with '/' and not end with '/': %s", propertyStoreRoot);
+ _propertyStoreRoot = propertyStoreRoot;
+ }
+
+ /**
+ * Set (overwrite) the znode at {@code path} (property-store-relative) to {@code record}, with a
+ * CAS check on {@code expectedVersion}. Pass {@link #ANY_VERSION} to skip the check.
+ */
+ public ZkMultiWriteBuilder set(String path, ZNRecord record, int expectedVersion) {
+ checkNotExecuted();
+ Preconditions.checkNotNull(record, "record");
+ String fullPath = resolve(path);
+ _ops.add(Op.setData(fullPath, _zkClient.serialize(record, fullPath), expectedVersion));
+ return this;
+ }
+
+ /** Set without a version check; equivalent to {@code set(path, record, ANY_VERSION)}. */
+ public ZkMultiWriteBuilder set(String path, ZNRecord record) {
+ return set(path, record, ANY_VERSION);
+ }
+
+ /**
+ * Create a persistent znode at {@code path} (property-store-relative) with {@code record} as its
+ * data.
+ */
+ public ZkMultiWriteBuilder create(String path, ZNRecord record) {
+ checkNotExecuted();
+ Preconditions.checkNotNull(record, "record");
+ String fullPath = resolve(path);
+ _ops.add(
+ Op.create(fullPath, _zkClient.serialize(record, fullPath), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+ return this;
+ }
+
+ /**
+ * Delete the znode at {@code path} (property-store-relative), with a CAS check on
+ * {@code expectedVersion}. Pass {@link #ANY_VERSION} to skip the check.
+ */
+ public ZkMultiWriteBuilder delete(String path, int expectedVersion) {
+ checkNotExecuted();
+ _ops.add(Op.delete(resolve(path), expectedVersion));
+ return this;
+ }
+
+ /** Delete without a version check; equivalent to {@code delete(path, ANY_VERSION)}. */
+ public ZkMultiWriteBuilder delete(String path) {
+ return delete(path, ANY_VERSION);
+ }
+
+ /**
+ * Assert the version of the znode at {@code path} (property-store-relative). No mutation; gates
+ * other ops in the batch atomically — the whole transaction fails with
+ * {@link KeeperException.BadVersionException} if the version no longer matches.
+ */
+ public ZkMultiWriteBuilder check(String path, int expectedVersion) {
+ checkNotExecuted();
+ _ops.add(Op.check(resolve(path), expectedVersion));
+ return this;
+ }
+
+ /**
+ * Submit the accumulated ops as a single atomic ZK {@code multi()} transaction. Throws
+ * {@link KeeperException} on atomic rollback (subtype identifies the cause). Throws
+ * {@link IllegalStateException} if called more than once or if no ops have been added.
+ * Connectivity / session failures propagate as the original {@link ZkException}.
+ */
+ public void execute()
+ throws KeeperException {
+ checkNotExecuted();
+ _executed = true;
+ Preconditions.checkState(!_ops.isEmpty(), "no ops to execute");
+ try {
+ _zkClient.multi(_ops);
+ } catch (ZkException ze) {
+ Throwable cause = ze.getCause();
+ if (cause instanceof KeeperException) {
+ throw (KeeperException) cause;
+ }
+ throw ze;
+ }
+ }
+
+ private void checkNotExecuted() {
+ Preconditions.checkState(!_executed, "ZkMultiWriteBuilder already executed");
+ }
+
+ private String resolve(String path) {
+ Preconditions.checkNotNull(path, "path");
+ Preconditions.checkArgument(path.startsWith("/"), "path must start with '/': %s", path);
+ return _propertyStoreRoot + path;
+ }
+}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilderTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilderTest.java
new file mode 100644
index 000000000000..b09e316a44eb
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilderTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.helix;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class ZkMultiWriteBuilderTest {
+
+ private static final String ROOT = "/ZkMultiWriteBuilderTest";
+
+ private ZkStarter.ZookeeperInstance _zk;
+ private ZkClient _client;
+
+ @BeforeClass
+ public void beforeClass() {
+ _zk = ZkStarter.startLocalZkServer();
+ _client = new ZkClient.Builder()
+ .setZkServer(_zk.getZkUrl())
+ .setZkSerializer(new ZNRecordSerializer())
+ .build();
+ Assert.assertTrue(_client.waitUntilConnected(10_000, TimeUnit.MILLISECONDS));
+ }
+
+ @AfterClass
+ public void afterClass() {
+ if (_client != null) {
+ _client.close();
+ }
+ if (_zk != null) {
+ ZkStarter.stopLocalZkServer(_zk);
+ }
+ }
+
+ @BeforeMethod
+ public void cleanRoot() {
+ if (_client.exists(ROOT)) {
+ _client.deleteRecursively(ROOT);
+ }
+ _client.createPersistent(ROOT, true);
+ }
+
+ // -----------------------------------------------------------------------
+ // Helpers
+ // -----------------------------------------------------------------------
+
+ private static ZNRecord record(String id, String key, String value) {
+ ZNRecord r = new ZNRecord(id);
+ r.setSimpleField(key, value);
+ return r;
+ }
+
+ private void seed(String path, ZNRecord rec) {
+ _client.createPersistent(path, rec);
+ }
+
+ private ZNRecord read(String path) {
+ return _client.readData(path, true);
+ }
+
+ private int version(String path) {
+ Stat s = new Stat();
+ _client.readData(path, s);
+ return s.getVersion();
+ }
+
+ private ZkMultiWriteBuilder builder() {
+ // Tests use absolute paths under ROOT, so pass empty prefix (no property-store rebase).
+ return new ZkMultiWriteBuilder(_client, "");
+ }
+
+ // -----------------------------------------------------------------------
+ // Tests
+ // -----------------------------------------------------------------------
+
+ @Test
+ public void testAllSetSuccess()
+ throws KeeperException {
+ String pA = ROOT + "/a";
+ String pB = ROOT + "/b";
+ seed(pA, record("a", "v", "1"));
+ seed(pB, record("b", "v", "1"));
+
+ builder()
+ .set(pA, record("a", "v", "2"), 0)
+ .set(pB, record("b", "v", "2"), 0)
+ .execute();
+
+ Assert.assertEquals(read(pA).getSimpleField("v"), "2");
+ Assert.assertEquals(read(pB).getSimpleField("v"), "2");
+ Assert.assertEquals(version(pA), 1);
+ Assert.assertEquals(version(pB), 1);
+ }
+
+ @Test
+ public void testMixedCreateSetDeleteSuccess()
+ throws KeeperException {
+ String pExisting = ROOT + "/existing";
+ String pNew = ROOT + "/new";
+ String pStale = ROOT + "/stale";
+ seed(pExisting, record("existing", "v", "1"));
+ seed(pStale, record("stale", "v", "x"));
+
+ builder()
+ .set(pExisting, record("existing", "v", "2"), 0)
+ .create(pNew, record("new", "v", "1"))
+ .delete(pStale, 0)
+ .execute();
+
+ Assert.assertEquals(read(pExisting).getSimpleField("v"), "2");
+ Assert.assertEquals(read(pNew).getSimpleField("v"), "1");
+ Assert.assertFalse(_client.exists(pStale));
+ }
+
+ @Test
+ public void testBadVersionAtomicRollback() {
+ String pA = ROOT + "/a";
+ String pB = ROOT + "/b";
+ seed(pA, record("a", "v", "1"));
+ seed(pB, record("b", "v", "1"));
+
+ // Bump version on pB so the expected-0 check on it will fail.
+ _client.writeData(pB, record("b", "v", "bumped"));
+ Assert.assertEquals(version(pB), 1);
+
+ Assert.expectThrows(KeeperException.BadVersionException.class, () ->
+ builder()
+ .set(pA, record("a", "v", "2"), 0)
+ .set(pB, record("b", "v", "2"), 0) // stale version -> BADVERSION
+ .execute());
+
+ // Atomic rollback — pA must NOT have been updated.
+ Assert.assertEquals(read(pA).getSimpleField("v"), "1");
+ Assert.assertEquals(version(pA), 0);
+ Assert.assertEquals(read(pB).getSimpleField("v"), "bumped");
+ }
+
+ @Test
+ public void testCheckOpGatesSet() {
+ String pGate = ROOT + "/gate";
+ String pTarget = ROOT + "/target";
+ seed(pGate, record("gate", "v", "1"));
+ seed(pTarget, record("target", "v", "1"));
+
+ // Bump gate's version; check(gate, 0) should fail and prevent the set.
+ _client.writeData(pGate, record("gate", "v", "bumped"));
+
+ Assert.expectThrows(KeeperException.BadVersionException.class, () ->
+ builder()
+ .check(pGate, 0)
+ .set(pTarget, record("target", "v", "2"), 0)
+ .execute());
+
+ Assert.assertEquals(read(pTarget).getSimpleField("v"), "1", "target must not have been mutated");
+ }
+
+ @Test
+ public void testDeleteNonExistentRollback() {
+ String pExisting = ROOT + "/existing";
+ String pMissing = ROOT + "/missing";
+ seed(pExisting, record("existing", "v", "1"));
+
+ Assert.expectThrows(KeeperException.NoNodeException.class, () ->
+ builder()
+ .set(pExisting, record("existing", "v", "2"), 0)
+ .delete(pMissing)
+ .execute());
+
+ // pExisting must NOT have been updated.
+ Assert.assertEquals(read(pExisting).getSimpleField("v"), "1");
+ }
+
+ @Test
+ public void testCreateExistingNodeRollback() {
+ String pA = ROOT + "/a";
+ String pB = ROOT + "/b";
+ seed(pA, record("a", "v", "1"));
+ seed(pB, record("b", "v", "existing"));
+
+ Assert.expectThrows(KeeperException.NodeExistsException.class, () ->
+ builder()
+ .set(pA, record("a", "v", "2"), 0)
+ .create(pB, record("b", "v", "fresh"))
+ .execute());
+
+ Assert.assertEquals(read(pA).getSimpleField("v"), "1");
+ Assert.assertEquals(read(pB).getSimpleField("v"), "existing");
+ }
+
+ @Test
+ public void testAnyVersionSetSucceedsRegardlessOfVersion()
+ throws KeeperException {
+ String pA = ROOT + "/a";
+ seed(pA, record("a", "v", "1"));
+ _client.writeData(pA, record("a", "v", "bumped"));
+ _client.writeData(pA, record("a", "v", "bumped-again"));
+ Assert.assertEquals(version(pA), 2);
+
+ builder().set(pA, record("a", "v", "final")).execute();
+
+ Assert.assertEquals(read(pA).getSimpleField("v"), "final");
+ }
+
+ @Test
+ public void testBuilderRejectsDoubleExecute()
+ throws KeeperException {
+ String pA = ROOT + "/a";
+ seed(pA, record("a", "v", "1"));
+
+ // After a successful execute(), the builder rejects further calls.
+ ZkMultiWriteBuilder b = builder().set(pA, record("a", "v", "2"), 0);
+ b.execute();
+ Assert.expectThrows(IllegalStateException.class, b::execute);
+ Assert.expectThrows(IllegalStateException.class, () -> b.set(pA, record("a", "v", "3"), 1));
+
+ // After a failed execute() (atomic rollback), the builder is also burned — no retry through
+ // the same instance. Caller must obtain a fresh builder for the retry tick.
+ ZkMultiWriteBuilder failed = builder().set(pA, record("a", "v", "x"), 99); // stale version
+ Assert.expectThrows(KeeperException.BadVersionException.class, failed::execute);
+ Assert.expectThrows(IllegalStateException.class, failed::execute);
+ Assert.expectThrows(IllegalStateException.class, () -> failed.set(pA, record("a", "v", "y"), 1));
+
+ // Empty execute() also burns the builder.
+ ZkMultiWriteBuilder empty = builder();
+ Assert.expectThrows(IllegalStateException.class, empty::execute);
+ Assert.expectThrows(IllegalStateException.class, empty::execute);
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ddab54f6bb9c..a923ec3721a6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -87,6 +87,8 @@
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
@@ -128,12 +130,14 @@
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.LogicalTableConfigUtils;
+import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils;
import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.config.TierConfigUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvider;
+import org.apache.pinot.common.utils.helix.ZkMultiWriteBuilder;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
@@ -231,6 +235,8 @@ private enum LineageUpdateType {
private final boolean _enableBatchMessageMode;
private final int _deletedSegmentsRetentionInDays;
private final boolean _enableTieredSegmentAssignment;
+ @Nullable
+ private final ControllerConf _controllerConf;
private HelixManager _helixZkManager;
private HelixAdmin _helixAdmin;
@@ -243,16 +249,25 @@ private enum LineageUpdateType {
private TableCache _tableCache;
private final LineageManager _lineageManager;
private final QueryWorkloadManager _queryWorkloadManager;
+ // Dedicated ZkClient for transactional multi-path writes (atomic ZK multi()). Lazily built on
+ // first multiWriteZK call. A dedicated session is used because Helix 1.3.2 does not expose
+ // multi() on BaseDataAccessor, and the underlying ZkClient inside ZKHelixManager is not publicly
+ // reachable — reusing it would require reflection, which breaks on Helix point-release field
+ // renames. The resulting extra session is consistent with the controller's existing footprint
+ // (_propertyStore cache client, _leadControllerManager manager client are each distinct).
+ private volatile ZkClient _zkClient;
public PinotHelixResourceManager(String helixClusterName, @Nullable String dataDir,
boolean isSingleTenantCluster, boolean enableBatchMessageMode, int deletedSegmentsRetentionInDays,
- boolean enableTieredSegmentAssignment, LineageManager lineageManager) {
+ boolean enableTieredSegmentAssignment, LineageManager lineageManager,
+ @Nullable ControllerConf controllerConf) {
_helixClusterName = helixClusterName;
_dataDir = dataDir;
_isSingleTenantCluster = isSingleTenantCluster;
_enableBatchMessageMode = enableBatchMessageMode;
_deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
_enableTieredSegmentAssignment = enableTieredSegmentAssignment;
+ _controllerConf = controllerConf;
_instanceAdminEndpointCache =
CacheBuilder.newBuilder().expireAfterWrite(CACHE_ENTRY_EXPIRE_TIME_HOURS, TimeUnit.HOURS)
.build(new CacheLoader<>() {
@@ -275,7 +290,7 @@ public PinotHelixResourceManager(ControllerConf controllerConf) {
this(controllerConf.getHelixClusterName(), controllerConf.getDataDir(),
controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(),
controllerConf.getDeletedSegmentsRetentionInDays(), controllerConf.tieredSegmentAssignmentEnabled(),
- LineageManagerFactory.create(controllerConf));
+ LineageManagerFactory.create(controllerConf), controllerConf);
}
/**
@@ -337,6 +352,12 @@ public void onInstanceConfigChange(List instanceConfigs, Notific
*/
public synchronized void stop() {
_segmentDeletionManager.stop();
+ ZkClient zkClient = _zkClient;
+ if (zkClient != null) {
+ _zkClient = null;
+ LOGGER.info("Closing dedicated multiWriteZK ZkClient");
+ ZkStarter.closeAsync(zkClient);
+ }
}
/**
@@ -2081,6 +2102,68 @@ public boolean setZKData(String path, ZNRecord record, int expectedVersion, int
return _helixDataAccessor.getBaseDataAccessor().set(path, record, expectedVersion, accessOption);
}
+ /**
+ * Returns a fresh {@link ZkMultiWriteBuilder} for submitting an atomic ZooKeeper {@code multi()}
+ * transaction over Helix property-store paths (set / create / delete / version-check ops on any
+ * combination of property-store znodes). Either every op commits or none do.
+ * Op paths are property-store-relative (e.g. {@code /SEGMENTS/{table}/{segment}}); the builder
+ * prepends {@code /{cluster}/PROPERTYSTORE} before submitting to ZK. Multi-path writes outside
+ * the property store are intentionally not supported.
+ *
Requires {@link #start} to have been called (so the ZK address is reachable via the Helix
+ * manager); throws {@link IllegalStateException} otherwise.
+ *
The builder's {@code execute()} throws {@link org.apache.zookeeper.KeeperException} on atomic
+ * rollback (the subtype identifies the cause: {@code BadVersionException}, {@code NoNodeException},
+ * {@code NodeExistsException}, ...). Connectivity / session failures propagate as the original
+ * {@link org.apache.helix.zookeeper.zkclient.exception.ZkException}.
+ *
The dedicated underlying {@link ZkClient} honors the controller's
+ * {@value CommonConstants.Helix.ZkClient#ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG} and
+ * {@value CommonConstants.Helix.ZkClient#ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG} overrides; JVM-level
+ * ZooKeeper system properties (e.g. {@code jute.maxbuffer}) are picked up automatically by the
+ * ZooKeeper client library itself.
+ */
+ public ZkMultiWriteBuilder multiWriteZK() {
+ return new ZkMultiWriteBuilder(getOrBuildMultiWriteZkClient(),
+ PropertyPathBuilder.propertyStore(_helixClusterName));
+ }
+
+ private ZkClient getOrBuildMultiWriteZkClient() {
+ ZkClient c = _zkClient;
+ if (c != null) {
+ return c;
+ }
+ synchronized (this) {
+ if (_zkClient == null) {
+ Preconditions.checkState(_helixZkManager != null,
+ "multiWriteZK unavailable: PinotHelixResourceManager has not been started");
+ String zkAddress = _helixZkManager.getMetadataStoreConnectionString();
+ int sessionTimeoutMs = _controllerConf != null
+ ? _controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
+ CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS)
+ : CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS;
+ int connectTimeoutMs = _controllerConf != null
+ ? _controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
+ CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS)
+ : CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS;
+ LOGGER.info("Building dedicated multiWriteZK ZkClient at {} (session={}ms, connect={}ms)",
+ zkAddress, sessionTimeoutMs, connectTimeoutMs);
+ ZkClient built = new ZkClient.Builder()
+ .setZkServer(zkAddress)
+ .setSessionTimeout(sessionTimeoutMs)
+ .setConnectionTimeout(connectTimeoutMs)
+ .setZkSerializer(new ZNRecordSerializer())
+ .build();
+ if (!built.waitUntilConnected(connectTimeoutMs, TimeUnit.MILLISECONDS)) {
+ ZkStarter.closeAsync(built);
+ throw new RuntimeException(
+ "Timed out connecting to ZK at " + zkAddress + " after " + connectTimeoutMs
+ + "ms for multiWriteZK");
+ }
+ _zkClient = built;
+ }
+ return _zkClient;
+ }
+ }
+
public boolean createZKNode(String path, ZNRecord record, int accessOption, long ttl) {
return _helixDataAccessor.getBaseDataAccessor().create(path, record, accessOption, ttl);
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java
index c7552479cfd5..a2b9f78d4054 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java
@@ -52,7 +52,8 @@ public class PinotHelixResourceManagerConfigValidationTest {
public void setUp()
throws Exception {
LineageManager lineageManager = Mockito.mock(LineageManager.class);
- _resourceManager = new PinotHelixResourceManager("testCluster", null, false, false, 7, false, lineageManager);
+ _resourceManager =
+ new PinotHelixResourceManager("testCluster", null, false, false, 7, false, lineageManager, null);
_helixAdmin = Mockito.mock(HelixAdmin.class);
_helixDataAccessor = Mockito.mock(HelixDataAccessor.class);
@@ -162,6 +163,19 @@ public void testAddInstanceSucceedsWithPassingValidator() {
verify(_helixAdmin).addInstance(eq("testCluster"), any(InstanceConfig.class));
}
+ @Test
+ public void testMultiWriteZkThrowsBeforeStart() {
+ // The fixture never calls start(), so _helixZkManager is null. multiWriteZK() must refuse to
+ // build a client — the ZK address is derived from the Helix manager.
+ try {
+ _resourceManager.multiWriteZK();
+ fail("Expected IllegalStateException");
+ } catch (IllegalStateException expected) {
+ assertTrue(expected.getMessage().contains("not been started"),
+ "Unexpected message: " + expected.getMessage());
+ }
+ }
+
private void setField(String fieldName, Object value)
throws Exception {
Field field = PinotHelixResourceManager.class.getDeclaredField(fieldName);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index bc643b8c92e4..cd70ff00e999 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -1741,6 +1741,53 @@ public void testGetConsumerWatermarks()
deleteSchema(rawTableName);
}
+ /**
+ * Happy-path coverage for {@link PinotHelixResourceManager#multiWriteZK()}: pre-creates two
+ * segment ZK metadata znodes, atomically updates both via a single multi() transaction, then
+ * reads back through the property store and asserts the mutated fields round-tripped. Verifies
+ * the dedicated multi-write ZkClient is built correctly and the ZNRecord serialization /
+ * deserialization path matches what the rest of the controller uses.
+ */
+ @Test
+ public void testMultiWriteZkSegmentMetadataUpdates()
+ throws Exception {
+ String segName1 = "multiWriteZk_seg_1";
+ String segName2 = "multiWriteZk_seg_2";
+ SegmentZKMetadata seg1 = new SegmentZKMetadata(segName1);
+ seg1.setCrc(1L);
+ seg1.setTotalDocs(10L);
+ SegmentZKMetadata seg2 = new SegmentZKMetadata(segName2);
+ seg2.setCrc(2L);
+ seg2.setTotalDocs(20L);
+ assertTrue(ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, seg1));
+ assertTrue(ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, seg2));
+
+ // Mutate both and submit as a single atomic transaction.
+ seg1.setCrc(11L);
+ seg1.setTotalDocs(110L);
+ seg2.setCrc(22L);
+ seg2.setTotalDocs(220L);
+ String path1 = ZKMetadataProvider.constructPropertyStorePathForSegment(OFFLINE_TABLE_NAME, segName1);
+ String path2 = ZKMetadataProvider.constructPropertyStorePathForSegment(OFFLINE_TABLE_NAME, segName2);
+ _helixResourceManager.multiWriteZK()
+ .set(path1, seg1.toZNRecord())
+ .set(path2, seg2.toZNRecord())
+ .execute();
+
+ SegmentZKMetadata read1 = ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, segName1);
+ SegmentZKMetadata read2 = ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, segName2);
+ assertNotNull(read1);
+ assertNotNull(read2);
+ assertEquals(read1.getCrc(), 11L);
+ assertEquals(read1.getTotalDocs(), 110L);
+ assertEquals(read2.getCrc(), 22L);
+ assertEquals(read2.getTotalDocs(), 220L);
+
+ // Cleanup
+ assertTrue(ZKMetadataProvider.removeSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, segName1));
+ assertTrue(ZKMetadataProvider.removeSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, segName2));
+ }
+
@AfterClass
public void tearDown() {
stopFakeInstances();