From e24487a54d52d31575c0de3fe946f0268545eafa Mon Sep 17 00:00:00 2001 From: Krishan Goyal Date: Thu, 23 Apr 2026 17:18:36 +0530 Subject: [PATCH 1/3] ZK multi path transaction API support --- .../utils/helix/PinotZkMultiResult.java | 126 ++++++++++ .../pinot/common/utils/helix/PinotZkOp.java | 129 ++++++++++ .../common/utils/helix/ZkMultiWriter.java | 138 +++++++++++ .../common/utils/helix/ZkMultiWriterTest.java | 233 ++++++++++++++++++ .../helix/core/PinotHelixResourceManager.java | 94 ++++++- ...ixResourceManagerConfigValidationTest.java | 34 +++ 6 files changed, 753 insertions(+), 1 deletion(-) create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotZkMultiResult.java create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotZkOp.java create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriter.java create mode 100644 pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriterTest.java diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotZkMultiResult.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotZkMultiResult.java new file mode 100644 index 000000000000..25fa73988e41 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotZkMultiResult.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.helix; + +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + + +/** + * Outcome of a transactional ZK multi-write submitted through + * {@link ZkMultiWriter#multi(org.apache.helix.zookeeper.impl.client.ZkClient, java.util.List)}. + *

Immutable. ZK multi is all-or-nothing: either every op committed (success) or none of them + * did (atomic rollback). On rollback, {@link #getFailedOpIndex()} identifies the op that caused + * the rejection and {@link #getFailureCode()} is its ZK error code. Non-atomic failures + * (connectivity / session loss) are signaled by a thrown exception, not this result. + */ +public final class PinotZkMultiResult { + + private final boolean _success; + private final List _outcomes; + private final int _failedOpIndex; + @Nullable + private final KeeperException.Code _failureCode; + + private PinotZkMultiResult(boolean success, List outcomes, int failedOpIndex, + @Nullable KeeperException.Code failureCode) { + _success = success; + _outcomes = Collections.unmodifiableList(outcomes); + _failedOpIndex = failedOpIndex; + _failureCode = failureCode; + } + + static PinotZkMultiResult success(List outcomes) { + return new PinotZkMultiResult(true, outcomes, -1, null); + } + + static PinotZkMultiResult failure(List outcomes, int failedOpIndex, KeeperException.Code code) { + return new PinotZkMultiResult(false, outcomes, failedOpIndex, code); + } + + public boolean isSuccess() { + return _success; + } + + /** One entry per submitted op, in submission order. */ + public List getOutcomes() { + return _outcomes; + } + + /** Index into the submitted op list that caused the rollback, or -1 on success. */ + public int getFailedOpIndex() { + return _failedOpIndex; + } + + /** ZK error code that triggered the rollback, or {@code null} on success. */ + @Nullable + public KeeperException.Code getFailureCode() { + return _failureCode; + } + + @Override + public String toString() { + if (_success) { + return "PinotZkMultiResult{success ops=" + _outcomes.size() + "}"; + } + return "PinotZkMultiResult{rollback failedOpIndex=" + _failedOpIndex + " code=" + _failureCode + "}"; + } + + /** + * Per-op result. On success, {@link #getStat()} carries the post-commit {@link Stat} for + * create/set ops (null for delete/check). On atomic rollback, {@link #getCode()} is non-null + * on the offending op and may be {@link KeeperException.Code#RUNTIMEINCONSISTENCY} or + * {@link KeeperException.Code#OK} on the other ops (per ZK multi semantics). + */ + public static final class OpOutcome { + private final String _path; + @Nullable + private final Stat _stat; + @Nullable + private final KeeperException.Code _code; + + OpOutcome(String path, @Nullable Stat stat, @Nullable KeeperException.Code code) { + _path = path; + _stat = stat; + _code = code; + } + + public String getPath() { + return _path; + } + + @Nullable + public Stat getStat() { + return _stat; + } + + @Nullable + public KeeperException.Code getCode() { + return _code; + } + + @Override + public String toString() { + return "OpOutcome{path=" + _path + " stat=" + _stat + " code=" + _code + "}"; + } + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotZkOp.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotZkOp.java new file mode 100644 index 000000000000..feb15871e975 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotZkOp.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.helix; + +import com.google.common.base.Preconditions; +import javax.annotation.Nullable; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.ZooDefs; + + +/** + * One operation inside a transactional ZK multi-write submitted through + * {@link ZkMultiWriter#multi(org.apache.helix.zookeeper.impl.client.ZkClient, java.util.List)}. + *

Supported kinds mirror ZooKeeper's {@code multi()} primitives: + *

+ *

Instances are immutable. Payload bytes are lazily produced via {@link #toZkOp(ZNRecordSerializer)} + * when the batch is submitted. + *

Not thread-safe for concurrent mutation (irrelevant — instances are immutable). + */ +public final class PinotZkOp { + + /** Pass as {@code expectedVersion} to skip the version check. */ + public static final int ANY_VERSION = -1; + + enum Kind { + SET, CREATE, DELETE, CHECK + } + + private final Kind _kind; + private final String _path; + @Nullable + private final ZNRecord _record; + private final int _expectedVersion; + + private PinotZkOp(Kind kind, String path, @Nullable ZNRecord record, int expectedVersion) { + _kind = kind; + _path = path; + _record = record; + _expectedVersion = expectedVersion; + } + + /** + * Set (overwrite) the znode at {@code path} to {@code record}. Pass {@link #ANY_VERSION} for + * {@code expectedVersion} to skip the CAS check. + */ + public static PinotZkOp set(String path, ZNRecord record, int expectedVersion) { + Preconditions.checkNotNull(path, "path"); + Preconditions.checkNotNull(record, "record"); + return new PinotZkOp(Kind.SET, path, record, expectedVersion); + } + + /** Create a persistent znode at {@code path} with {@code record} as its data. */ + public static PinotZkOp create(String path, ZNRecord record) { + Preconditions.checkNotNull(path, "path"); + Preconditions.checkNotNull(record, "record"); + return new PinotZkOp(Kind.CREATE, path, record, ANY_VERSION); + } + + /** Delete the znode at {@code path}. Pass {@link #ANY_VERSION} to skip the version check. */ + public static PinotZkOp delete(String path, int expectedVersion) { + Preconditions.checkNotNull(path, "path"); + return new PinotZkOp(Kind.DELETE, path, null, expectedVersion); + } + + /** + * Assert the version of the znode at {@code path}. No mutation; used to gate other ops in the + * batch atomically — the whole {@code multi()} fails if the version no longer matches. + */ + public static PinotZkOp check(String path, int expectedVersion) { + Preconditions.checkNotNull(path, "path"); + return new PinotZkOp(Kind.CHECK, path, null, expectedVersion); + } + + public Kind getKind() { + return _kind; + } + + public String getPath() { + return _path; + } + + public int getExpectedVersion() { + return _expectedVersion; + } + + Op toZkOp(ZNRecordSerializer serializer) { + switch (_kind) { + case SET: + return Op.setData(_path, serializer.serialize(_record), _expectedVersion); + case CREATE: + return Op.create(_path, serializer.serialize(_record), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + case DELETE: + return Op.delete(_path, _expectedVersion); + case CHECK: + return Op.check(_path, _expectedVersion); + default: + throw new IllegalStateException("Unknown PinotZkOp kind: " + _kind); + } + } + + @Override + public String toString() { + return "PinotZkOp{" + _kind + " path=" + _path + " expectedVersion=" + _expectedVersion + "}"; + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriter.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriter.java new file mode 100644 index 000000000000..c4b985d6eda2 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriter.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.helix; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.helix.zookeeper.zkclient.exception.ZkException; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.OpResult; + + +/** + * Stateless helper that submits a list of {@link PinotZkOp}s as a ZooKeeper atomic {@code multi()} + * transaction. The {@link RealmAwareZkClient} is owned by the caller; this class only + * serializes ops and translates the result or exception into a {@link PinotZkMultiResult}. + *

Atomic rollback (e.g. {@code BADVERSION}, {@code NONODE}, {@code NODEEXISTS}) is returned as a + * failed {@link PinotZkMultiResult} with the offending op index and ZK error code. Connectivity + * and session failures ({@code ZkTimeoutException}, {@code ZkInterruptedException}, etc.) are + * allowed to propagate. + *

Typed against the {@code RealmAwareZkClient} interface (not the concrete + * {@code org.apache.helix.zookeeper.zkclient.ZkClient}) so callers can pass in either a dedicated + * client or the Helix-managed one if/when that becomes reachable. + */ +public final class ZkMultiWriter { + + // ZNRecordSerializer is documented thread-safe by Helix (no mutable state beyond reusable buffers + // inside serialize/deserialize), so a single static instance is fine across concurrent callers. + private static final ZNRecordSerializer SERIALIZER = new ZNRecordSerializer(); + + private ZkMultiWriter() { + } + + /** + * Submit {@code ops} as a single atomic ZK {@code multi()} transaction against {@code zkClient}. + * Returns a {@link PinotZkMultiResult} describing per-op outcomes. Throws only on non-atomic + * failures (connection loss, session expiry, interrupt, timeout). + */ + public static PinotZkMultiResult multi(RealmAwareZkClient zkClient, List ops) { + Preconditions.checkNotNull(zkClient, "zkClient"); + Preconditions.checkNotNull(ops, "ops"); + Preconditions.checkArgument(!ops.isEmpty(), "ops must not be empty"); + + List zkOps = new ArrayList<>(ops.size()); + for (int i = 0; i < ops.size(); i++) { + PinotZkOp op = ops.get(i); + Preconditions.checkNotNull(op, "ops[%s] is null", i); + zkOps.add(op.toZkOp(SERIALIZER)); + } + + try { + List results = zkClient.multi(zkOps); + return buildSuccess(ops, results); + } catch (ZkException ze) { + Throwable cause = ze.getCause(); + if (cause instanceof KeeperException) { + return buildFailure(ops, (KeeperException) cause); + } + // Non-atomic failure (timeout, interrupt, etc.) or unknown wrapping — propagate. + throw ze; + } + } + + private static PinotZkMultiResult buildSuccess(List ops, List results) { + List outcomes = new ArrayList<>(ops.size()); + for (int i = 0; i < ops.size(); i++) { + PinotZkOp op = ops.get(i); + OpResult r = results.get(i); + outcomes.add(new PinotZkMultiResult.OpOutcome(op.getPath(), statFromOpResult(r), null)); + } + return PinotZkMultiResult.success(outcomes); + } + + private static PinotZkMultiResult buildFailure(List ops, KeeperException ke) { + List results = ke.getResults(); + List outcomes = new ArrayList<>(ops.size()); + int failedIndex = -1; + KeeperException.Code failedCode = ke.code(); + + if (results == null) { + // Multi failed before per-op results were populated — treat offender as unlocalizable. + for (PinotZkOp op : ops) { + outcomes.add(new PinotZkMultiResult.OpOutcome(op.getPath(), null, failedCode)); + } + return PinotZkMultiResult.failure(outcomes, -1, failedCode); + } + + for (int i = 0; i < ops.size(); i++) { + PinotZkOp op = ops.get(i); + OpResult r = i < results.size() ? results.get(i) : null; + KeeperException.Code code = null; + if (r instanceof OpResult.ErrorResult) { + int err = ((OpResult.ErrorResult) r).getErr(); + code = KeeperException.Code.get(err); + // The first op whose code is not OK / not the runtime-rollback marker is the offender. + if (failedIndex == -1 && code != KeeperException.Code.OK + && code != KeeperException.Code.RUNTIMEINCONSISTENCY) { + failedIndex = i; + failedCode = code; + } + } + outcomes.add(new PinotZkMultiResult.OpOutcome(op.getPath(), statFromOpResult(r), code)); + } + + // failedIndex == -1 here means we couldn't localize the offender from per-op results; + // caller sees the top-level code with index -1 ("unknown op"). + return PinotZkMultiResult.failure(outcomes, failedIndex, failedCode); + } + + private static org.apache.zookeeper.data.Stat statFromOpResult(OpResult r) { + if (r instanceof OpResult.SetDataResult) { + return ((OpResult.SetDataResult) r).getStat(); + } + if (r instanceof OpResult.CreateResult) { + return ((OpResult.CreateResult) r).getStat(); + } + return null; + } +} diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriterTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriterTest.java new file mode 100644 index 000000000000..9367b5b153da --- /dev/null +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriterTest.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.helix; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.helix.zookeeper.impl.client.ZkClient; +import org.apache.pinot.common.utils.ZkStarter; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class ZkMultiWriterTest { + + private static final String ROOT = "/ZkMultiWriterTest"; + + private ZkStarter.ZookeeperInstance _zk; + private ZkClient _client; + + @BeforeClass + public void beforeClass() { + _zk = ZkStarter.startLocalZkServer(); + _client = new ZkClient.Builder() + .setZkServer(_zk.getZkUrl()) + .setZkSerializer(new ZNRecordSerializer()) + .build(); + Assert.assertTrue(_client.waitUntilConnected(10_000, TimeUnit.MILLISECONDS)); + } + + @AfterClass + public void afterClass() { + if (_client != null) { + _client.close(); + } + if (_zk != null) { + ZkStarter.stopLocalZkServer(_zk); + } + } + + @BeforeMethod + public void cleanRoot() { + if (_client.exists(ROOT)) { + _client.deleteRecursively(ROOT); + } + _client.createPersistent(ROOT, true); + } + + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + private static ZNRecord record(String id, String key, String value) { + ZNRecord r = new ZNRecord(id); + r.setSimpleField(key, value); + return r; + } + + private void seed(String path, ZNRecord rec) { + _client.createPersistent(path, rec); + } + + private ZNRecord read(String path) { + return _client.readData(path, true); + } + + private int version(String path) { + Stat s = new Stat(); + _client.readData(path, s); + return s.getVersion(); + } + + // ----------------------------------------------------------------------- + // Tests + // ----------------------------------------------------------------------- + + @Test + public void testAllSetSuccess() { + String pA = ROOT + "/a"; + String pB = ROOT + "/b"; + seed(pA, record("a", "v", "1")); + seed(pB, record("b", "v", "1")); + + PinotZkMultiResult res = ZkMultiWriter.multi(_client, List.of( + PinotZkOp.set(pA, record("a", "v", "2"), 0), + PinotZkOp.set(pB, record("b", "v", "2"), 0))); + + Assert.assertTrue(res.isSuccess()); + Assert.assertEquals(res.getFailedOpIndex(), -1); + Assert.assertNull(res.getFailureCode()); + Assert.assertEquals(res.getOutcomes().size(), 2); + Assert.assertNotNull(res.getOutcomes().get(0).getStat()); + Assert.assertEquals(res.getOutcomes().get(0).getStat().getVersion(), 1); + Assert.assertEquals(read(pA).getSimpleField("v"), "2"); + Assert.assertEquals(read(pB).getSimpleField("v"), "2"); + } + + @Test + public void testMixedCreateSetDeleteSuccess() { + String pExisting = ROOT + "/existing"; + String pNew = ROOT + "/new"; + String pStale = ROOT + "/stale"; + seed(pExisting, record("existing", "v", "1")); + seed(pStale, record("stale", "v", "x")); + + PinotZkMultiResult res = ZkMultiWriter.multi(_client, List.of( + PinotZkOp.set(pExisting, record("existing", "v", "2"), 0), + PinotZkOp.create(pNew, record("new", "v", "1")), + PinotZkOp.delete(pStale, 0))); + + Assert.assertTrue(res.isSuccess(), "result: " + res); + Assert.assertEquals(read(pExisting).getSimpleField("v"), "2"); + Assert.assertEquals(read(pNew).getSimpleField("v"), "1"); + Assert.assertFalse(_client.exists(pStale)); + } + + @Test + public void testBadVersionAtomicRollback() { + String pA = ROOT + "/a"; + String pB = ROOT + "/b"; + seed(pA, record("a", "v", "1")); + seed(pB, record("b", "v", "1")); + + // Bump version on pB so the expected-0 check on it will fail. + _client.writeData(pB, record("b", "v", "bumped")); + Assert.assertEquals(version(pB), 1); + + PinotZkMultiResult res = ZkMultiWriter.multi(_client, List.of( + PinotZkOp.set(pA, record("a", "v", "2"), 0), + PinotZkOp.set(pB, record("b", "v", "2"), 0))); // stale version -> BADVERSION + + Assert.assertFalse(res.isSuccess()); + Assert.assertEquals(res.getFailedOpIndex(), 1); + Assert.assertEquals(res.getFailureCode(), KeeperException.Code.BADVERSION); + + // Atomic rollback — pA must NOT have been updated. + Assert.assertEquals(read(pA).getSimpleField("v"), "1"); + Assert.assertEquals(version(pA), 0); + Assert.assertEquals(read(pB).getSimpleField("v"), "bumped"); + } + + @Test + public void testCheckOpGatesSet() { + String pGate = ROOT + "/gate"; + String pTarget = ROOT + "/target"; + seed(pGate, record("gate", "v", "1")); + seed(pTarget, record("target", "v", "1")); + + // Bump gate's version; check(gate, 0) should fail and prevent the set. + _client.writeData(pGate, record("gate", "v", "bumped")); + + PinotZkMultiResult res = ZkMultiWriter.multi(_client, List.of( + PinotZkOp.check(pGate, 0), + PinotZkOp.set(pTarget, record("target", "v", "2"), 0))); + + Assert.assertFalse(res.isSuccess()); + Assert.assertEquals(res.getFailedOpIndex(), 0); + Assert.assertEquals(res.getFailureCode(), KeeperException.Code.BADVERSION); + Assert.assertEquals(read(pTarget).getSimpleField("v"), "1", "target must not have been mutated"); + } + + @Test + public void testDeleteNonExistentRollback() { + String pExisting = ROOT + "/existing"; + String pMissing = ROOT + "/missing"; + seed(pExisting, record("existing", "v", "1")); + + PinotZkMultiResult res = ZkMultiWriter.multi(_client, List.of( + PinotZkOp.set(pExisting, record("existing", "v", "2"), 0), + PinotZkOp.delete(pMissing, PinotZkOp.ANY_VERSION))); + + Assert.assertFalse(res.isSuccess()); + Assert.assertEquals(res.getFailedOpIndex(), 1); + Assert.assertEquals(res.getFailureCode(), KeeperException.Code.NONODE); + // pExisting must NOT have been updated. + Assert.assertEquals(read(pExisting).getSimpleField("v"), "1"); + } + + @Test + public void testCreateExistingNodeRollback() { + String pA = ROOT + "/a"; + String pB = ROOT + "/b"; + seed(pA, record("a", "v", "1")); + seed(pB, record("b", "v", "existing")); + + PinotZkMultiResult res = ZkMultiWriter.multi(_client, List.of( + PinotZkOp.set(pA, record("a", "v", "2"), 0), + PinotZkOp.create(pB, record("b", "v", "fresh")))); + + Assert.assertFalse(res.isSuccess()); + Assert.assertEquals(res.getFailedOpIndex(), 1); + Assert.assertEquals(res.getFailureCode(), KeeperException.Code.NODEEXISTS); + Assert.assertEquals(read(pA).getSimpleField("v"), "1"); + Assert.assertEquals(read(pB).getSimpleField("v"), "existing"); + } + + @Test + public void testAnyVersionSetSucceedsRegardlessOfVersion() { + String pA = ROOT + "/a"; + seed(pA, record("a", "v", "1")); + _client.writeData(pA, record("a", "v", "bumped")); + _client.writeData(pA, record("a", "v", "bumped-again")); + Assert.assertEquals(version(pA), 2); + + PinotZkMultiResult res = ZkMultiWriter.multi(_client, List.of( + PinotZkOp.set(pA, record("a", "v", "final"), PinotZkOp.ANY_VERSION))); + + Assert.assertTrue(res.isSuccess()); + Assert.assertEquals(read(pA).getSimpleField("v"), "final"); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index ddab54f6bb9c..26fae8b2f4b5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -87,6 +87,8 @@ import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.helix.zookeeper.impl.client.ZkClient; import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.assignment.InstancePartitionsUtils; @@ -128,12 +130,16 @@ import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.LogicalTableConfigUtils; +import org.apache.pinot.common.utils.ZkStarter; import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils; import org.apache.pinot.common.utils.config.InstanceUtils; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.common.utils.config.TierConfigUtils; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvider; +import org.apache.pinot.common.utils.helix.PinotZkMultiResult; +import org.apache.pinot.common.utils.helix.PinotZkOp; +import org.apache.pinot.common.utils.helix.ZkMultiWriter; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.api.exception.InvalidTableConfigException; @@ -231,6 +237,10 @@ private enum LineageUpdateType { private final boolean _enableBatchMessageMode; private final int _deletedSegmentsRetentionInDays; private final boolean _enableTieredSegmentAssignment; + @Nullable + private final String _zkAddress; + private final int _zkSessionTimeoutMs; + private final int _zkConnectTimeoutMs; private HelixManager _helixZkManager; private HelixAdmin _helixAdmin; @@ -243,16 +253,37 @@ private enum LineageUpdateType { private TableCache _tableCache; private final LineageManager _lineageManager; private final QueryWorkloadManager _queryWorkloadManager; + // Dedicated ZkClient for transactional multi-path writes (atomic ZK multi()). Lazily built on + // first multiWriteZK call. A dedicated session is used because Helix 1.3.2 does not expose + // multi() on BaseDataAccessor, and the underlying ZkClient inside ZKHelixManager is not publicly + // reachable — reusing it would require reflection, which breaks on Helix point-release field + // renames. The resulting extra session is consistent with the controller's existing footprint + // (_propertyStore cache client, _leadControllerManager manager client are each distinct). + private volatile ZkClient _zkClient; + private volatile boolean _stopped; public PinotHelixResourceManager(String helixClusterName, @Nullable String dataDir, boolean isSingleTenantCluster, boolean enableBatchMessageMode, int deletedSegmentsRetentionInDays, boolean enableTieredSegmentAssignment, LineageManager lineageManager) { + this(helixClusterName, dataDir, isSingleTenantCluster, enableBatchMessageMode, deletedSegmentsRetentionInDays, + enableTieredSegmentAssignment, lineageManager, null, + CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS, + CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS); + } + + private PinotHelixResourceManager(String helixClusterName, @Nullable String dataDir, + boolean isSingleTenantCluster, boolean enableBatchMessageMode, int deletedSegmentsRetentionInDays, + boolean enableTieredSegmentAssignment, LineageManager lineageManager, @Nullable String zkAddress, + int zkSessionTimeoutMs, int zkConnectTimeoutMs) { _helixClusterName = helixClusterName; _dataDir = dataDir; _isSingleTenantCluster = isSingleTenantCluster; _enableBatchMessageMode = enableBatchMessageMode; _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays; _enableTieredSegmentAssignment = enableTieredSegmentAssignment; + _zkAddress = zkAddress; + _zkSessionTimeoutMs = zkSessionTimeoutMs; + _zkConnectTimeoutMs = zkConnectTimeoutMs; _instanceAdminEndpointCache = CacheBuilder.newBuilder().expireAfterWrite(CACHE_ENTRY_EXPIRE_TIME_HOURS, TimeUnit.HOURS) .build(new CacheLoader<>() { @@ -275,7 +306,11 @@ public PinotHelixResourceManager(ControllerConf controllerConf) { this(controllerConf.getHelixClusterName(), controllerConf.getDataDir(), controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(), controllerConf.getDeletedSegmentsRetentionInDays(), controllerConf.tieredSegmentAssignmentEnabled(), - LineageManagerFactory.create(controllerConf)); + LineageManagerFactory.create(controllerConf), controllerConf.getZkStr(), + controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG, + CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS), + controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG, + CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS)); } /** @@ -336,7 +371,14 @@ public void onInstanceConfigChange(List instanceConfigs, Notific * Stop the Pinot controller instance. */ public synchronized void stop() { + _stopped = true; _segmentDeletionManager.stop(); + ZkClient zkClient = _zkClient; + if (zkClient != null) { + _zkClient = null; + LOGGER.info("Closing dedicated multiWriteZK ZkClient"); + ZkStarter.closeAsync(zkClient); + } } /** @@ -2081,6 +2123,56 @@ public boolean setZKData(String path, ZNRecord record, int expectedVersion, int return _helixDataAccessor.getBaseDataAccessor().set(path, record, expectedVersion, accessOption); } + /** + * Submits a list of znode operations as a single atomic ZooKeeper {@code multi()} transaction. + * Each op is independently a {@link PinotZkOp#set}, {@link PinotZkOp#create}, + * {@link PinotZkOp#delete}, or {@link PinotZkOp#check} (version assertion without mutation) on + * any path. Either every op commits or none do. Per-op version checks make this the transactional + * CAS primitive for cross-znode controller writes. + *

Returns a {@link PinotZkMultiResult}. On atomic rollback (e.g. version mismatch, node + * missing, node already exists), the result carries the failing op index and ZK error code — + * no exception is thrown. Unlike the single-znode wrappers on this class ({@code + * setZKData}, {@code createZKNode}, {@code deleteZKPath}) which return {@code boolean} on + * any failure, {@code multiWriteZK} throws on connectivity / session failures (they are not an + * atomic outcome and cannot be meaningfully rolled back into a per-op result). + *

Requires the controller to have been constructed with a non-null zkAddress (via the + * {@link ControllerConf} constructor); otherwise throws {@link IllegalStateException}. Also + * throws {@link IllegalStateException} if called after {@link #stop()}. + */ + public PinotZkMultiResult multiWriteZK(List ops) { + return ZkMultiWriter.multi(getOrBuildMultiWriteZkClient(), ops); + } + + private ZkClient getOrBuildMultiWriteZkClient() { + ZkClient c = _zkClient; + if (c != null) { + return c; + } + synchronized (this) { + if (_zkClient == null) { + Preconditions.checkState(!_stopped, "PinotHelixResourceManager is stopped"); + Preconditions.checkState(_zkAddress != null, + "multiWriteZK unavailable: zkAddress not configured on PinotHelixResourceManager"); + LOGGER.info("Building dedicated multiWriteZK ZkClient at {} (session={}ms, connect={}ms)", + _zkAddress, _zkSessionTimeoutMs, _zkConnectTimeoutMs); + ZkClient built = new ZkClient.Builder() + .setZkServer(_zkAddress) + .setSessionTimeout(_zkSessionTimeoutMs) + .setConnectionTimeout(_zkConnectTimeoutMs) + .setZkSerializer(new ZNRecordSerializer()) + .build(); + if (!built.waitUntilConnected(_zkConnectTimeoutMs, TimeUnit.MILLISECONDS)) { + ZkStarter.closeAsync(built); + throw new RuntimeException( + "Timed out connecting to ZK at " + _zkAddress + " after " + _zkConnectTimeoutMs + + "ms for multiWriteZK"); + } + _zkClient = built; + } + return _zkClient; + } + } + public boolean createZKNode(String path, ZNRecord record, int accessOption, long ttl) { return _helixDataAccessor.getBaseDataAccessor().create(path, record, accessOption, ttl); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java index c7552479cfd5..33aa04bb7b68 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java @@ -20,12 +20,15 @@ import java.lang.reflect.Field; import java.util.Collections; +import java.util.List; import java.util.TreeMap; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.PropertyKey; import org.apache.helix.model.InstanceConfig; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.utils.config.InstanceUtils; +import org.apache.pinot.common.utils.helix.PinotZkOp; import org.apache.pinot.controller.helix.core.lineage.LineageManager; import org.apache.pinot.spi.config.instance.Instance; import org.apache.pinot.spi.config.instance.InstanceConfigValidatorRegistry; @@ -162,6 +165,37 @@ public void testAddInstanceSucceedsWithPassingValidator() { verify(_helixAdmin).addInstance(eq("testCluster"), any(InstanceConfig.class)); } + @Test + public void testMultiWriteZkThrowsWhenZkAddressNotConfigured() { + // 7-arg test constructor leaves _zkAddress null; multiWriteZK must refuse to lazy-build. + List ops = List.of(PinotZkOp.set("/anything", new ZNRecord("x"), 0)); + try { + _resourceManager.multiWriteZK(ops); + fail("Expected IllegalStateException"); + } catch (IllegalStateException expected) { + assertTrue(expected.getMessage().contains("zkAddress"), + "Unexpected message: " + expected.getMessage()); + } + } + + @Test + public void testMultiWriteZkThrowsAfterStop() + throws Exception { + // _segmentDeletionManager is normally wired by start(); mock it so stop() doesn't NPE. + setField("_segmentDeletionManager", Mockito.mock(SegmentDeletionManager.class)); + _resourceManager.stop(); + List ops = List.of(PinotZkOp.set("/anything", new ZNRecord("x"), 0)); + try { + _resourceManager.multiWriteZK(ops); + fail("Expected IllegalStateException"); + } catch (IllegalStateException expected) { + // either "stopped" or "zkAddress" message is acceptable — both mean "no client available" + assertTrue( + expected.getMessage().contains("stopped") || expected.getMessage().contains("zkAddress"), + "Unexpected message: " + expected.getMessage()); + } + } + private void setField(String fieldName, Object value) throws Exception { Field field = PinotHelixResourceManager.class.getDeclaredField(fieldName); From c6234282450fd4351477d6cd9a3280f3e350ab7d Mon Sep 17 00:00:00 2001 From: Krishan Goyal Date: Wed, 29 Apr 2026 16:22:27 +0530 Subject: [PATCH 2/3] [refactor] Collapse ZK multi-op API to a fluent ZkMultiWriteBuilder Replace PinotZkOp + PinotZkMultiResult + ZkMultiWriter with a single ZkMultiWriteBuilder; PinotHelixResourceManager.multiWriteZK() returns a fresh builder. execute() returns void and throws KeeperException on atomic rollback, so callers branch on BadVersionException / NoNodeException / NodeExistsException for the retry-vs-hard-error trichotomy without a custom result type. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../utils/helix/PinotZkMultiResult.java | 126 --------------- .../pinot/common/utils/helix/PinotZkOp.java | 129 --------------- .../utils/helix/ZkMultiWriteBuilder.java | 147 ++++++++++++++++++ .../common/utils/helix/ZkMultiWriter.java | 138 ---------------- ...Test.java => ZkMultiWriteBuilderTest.java} | 117 ++++++++------ .../helix/core/PinotHelixResourceManager.java | 34 ++-- ...ixResourceManagerConfigValidationTest.java | 11 +- 7 files changed, 233 insertions(+), 469 deletions(-) delete mode 100644 pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotZkMultiResult.java delete mode 100644 pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotZkOp.java create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilder.java delete mode 100644 pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriter.java rename pinot-common/src/test/java/org/apache/pinot/common/utils/helix/{ZkMultiWriterTest.java => ZkMultiWriteBuilderTest.java} (65%) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotZkMultiResult.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotZkMultiResult.java deleted file mode 100644 index 25fa73988e41..000000000000 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotZkMultiResult.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.common.utils.helix; - -import java.util.Collections; -import java.util.List; -import javax.annotation.Nullable; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; - - -/** - * Outcome of a transactional ZK multi-write submitted through - * {@link ZkMultiWriter#multi(org.apache.helix.zookeeper.impl.client.ZkClient, java.util.List)}. - *

Immutable. ZK multi is all-or-nothing: either every op committed (success) or none of them - * did (atomic rollback). On rollback, {@link #getFailedOpIndex()} identifies the op that caused - * the rejection and {@link #getFailureCode()} is its ZK error code. Non-atomic failures - * (connectivity / session loss) are signaled by a thrown exception, not this result. - */ -public final class PinotZkMultiResult { - - private final boolean _success; - private final List _outcomes; - private final int _failedOpIndex; - @Nullable - private final KeeperException.Code _failureCode; - - private PinotZkMultiResult(boolean success, List outcomes, int failedOpIndex, - @Nullable KeeperException.Code failureCode) { - _success = success; - _outcomes = Collections.unmodifiableList(outcomes); - _failedOpIndex = failedOpIndex; - _failureCode = failureCode; - } - - static PinotZkMultiResult success(List outcomes) { - return new PinotZkMultiResult(true, outcomes, -1, null); - } - - static PinotZkMultiResult failure(List outcomes, int failedOpIndex, KeeperException.Code code) { - return new PinotZkMultiResult(false, outcomes, failedOpIndex, code); - } - - public boolean isSuccess() { - return _success; - } - - /** One entry per submitted op, in submission order. */ - public List getOutcomes() { - return _outcomes; - } - - /** Index into the submitted op list that caused the rollback, or -1 on success. */ - public int getFailedOpIndex() { - return _failedOpIndex; - } - - /** ZK error code that triggered the rollback, or {@code null} on success. */ - @Nullable - public KeeperException.Code getFailureCode() { - return _failureCode; - } - - @Override - public String toString() { - if (_success) { - return "PinotZkMultiResult{success ops=" + _outcomes.size() + "}"; - } - return "PinotZkMultiResult{rollback failedOpIndex=" + _failedOpIndex + " code=" + _failureCode + "}"; - } - - /** - * Per-op result. On success, {@link #getStat()} carries the post-commit {@link Stat} for - * create/set ops (null for delete/check). On atomic rollback, {@link #getCode()} is non-null - * on the offending op and may be {@link KeeperException.Code#RUNTIMEINCONSISTENCY} or - * {@link KeeperException.Code#OK} on the other ops (per ZK multi semantics). - */ - public static final class OpOutcome { - private final String _path; - @Nullable - private final Stat _stat; - @Nullable - private final KeeperException.Code _code; - - OpOutcome(String path, @Nullable Stat stat, @Nullable KeeperException.Code code) { - _path = path; - _stat = stat; - _code = code; - } - - public String getPath() { - return _path; - } - - @Nullable - public Stat getStat() { - return _stat; - } - - @Nullable - public KeeperException.Code getCode() { - return _code; - } - - @Override - public String toString() { - return "OpOutcome{path=" + _path + " stat=" + _stat + " code=" + _code + "}"; - } - } -} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotZkOp.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotZkOp.java deleted file mode 100644 index feb15871e975..000000000000 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotZkOp.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.common.utils.helix; - -import com.google.common.base.Preconditions; -import javax.annotation.Nullable; -import org.apache.helix.zookeeper.datamodel.ZNRecord; -import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.Op; -import org.apache.zookeeper.ZooDefs; - - -/** - * One operation inside a transactional ZK multi-write submitted through - * {@link ZkMultiWriter#multi(org.apache.helix.zookeeper.impl.client.ZkClient, java.util.List)}. - *

Supported kinds mirror ZooKeeper's {@code multi()} primitives: - *

- *

Instances are immutable. Payload bytes are lazily produced via {@link #toZkOp(ZNRecordSerializer)} - * when the batch is submitted. - *

Not thread-safe for concurrent mutation (irrelevant — instances are immutable). - */ -public final class PinotZkOp { - - /** Pass as {@code expectedVersion} to skip the version check. */ - public static final int ANY_VERSION = -1; - - enum Kind { - SET, CREATE, DELETE, CHECK - } - - private final Kind _kind; - private final String _path; - @Nullable - private final ZNRecord _record; - private final int _expectedVersion; - - private PinotZkOp(Kind kind, String path, @Nullable ZNRecord record, int expectedVersion) { - _kind = kind; - _path = path; - _record = record; - _expectedVersion = expectedVersion; - } - - /** - * Set (overwrite) the znode at {@code path} to {@code record}. Pass {@link #ANY_VERSION} for - * {@code expectedVersion} to skip the CAS check. - */ - public static PinotZkOp set(String path, ZNRecord record, int expectedVersion) { - Preconditions.checkNotNull(path, "path"); - Preconditions.checkNotNull(record, "record"); - return new PinotZkOp(Kind.SET, path, record, expectedVersion); - } - - /** Create a persistent znode at {@code path} with {@code record} as its data. */ - public static PinotZkOp create(String path, ZNRecord record) { - Preconditions.checkNotNull(path, "path"); - Preconditions.checkNotNull(record, "record"); - return new PinotZkOp(Kind.CREATE, path, record, ANY_VERSION); - } - - /** Delete the znode at {@code path}. Pass {@link #ANY_VERSION} to skip the version check. */ - public static PinotZkOp delete(String path, int expectedVersion) { - Preconditions.checkNotNull(path, "path"); - return new PinotZkOp(Kind.DELETE, path, null, expectedVersion); - } - - /** - * Assert the version of the znode at {@code path}. No mutation; used to gate other ops in the - * batch atomically — the whole {@code multi()} fails if the version no longer matches. - */ - public static PinotZkOp check(String path, int expectedVersion) { - Preconditions.checkNotNull(path, "path"); - return new PinotZkOp(Kind.CHECK, path, null, expectedVersion); - } - - public Kind getKind() { - return _kind; - } - - public String getPath() { - return _path; - } - - public int getExpectedVersion() { - return _expectedVersion; - } - - Op toZkOp(ZNRecordSerializer serializer) { - switch (_kind) { - case SET: - return Op.setData(_path, serializer.serialize(_record), _expectedVersion); - case CREATE: - return Op.create(_path, serializer.serialize(_record), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - case DELETE: - return Op.delete(_path, _expectedVersion); - case CHECK: - return Op.check(_path, _expectedVersion); - default: - throw new IllegalStateException("Unknown PinotZkOp kind: " + _kind); - } - } - - @Override - public String toString() { - return "PinotZkOp{" + _kind + " path=" + _path + " expectedVersion=" + _expectedVersion + "}"; - } -} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilder.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilder.java new file mode 100644 index 000000000000..371e3b8468af --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilder.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.helix; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.helix.zookeeper.zkclient.exception.ZkException; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.ZooDefs; + + +/** + * Fluent builder for an atomic ZooKeeper {@code multi()} transaction. Accumulates ops via + * {@link #set}/{@link #create}/{@link #delete}/{@link #check} and submits them as a single + * all-or-nothing batch on {@link #execute()}. + * + *

On atomic rollback (e.g. version mismatch, node missing, node already exists), {@link #execute()} + * throws the underlying {@link KeeperException} subtype ({@code BadVersionException}, + * {@code NoNodeException}, {@code NodeExistsException}, ...). Callers branch on the subtype to + * distinguish retryable concurrent-state changes from hard errors. Per-op offender info is reachable + * via {@link KeeperException#getResults()}. + * + *

Connectivity / session failures (timeout, interrupt, session expiry) are not atomic outcomes + * and propagate as the original {@link ZkException}. + * + *

Single-use: each instance can be executed at most once. Obtain a fresh builder per transaction + * (typically via {@code PinotHelixResourceManager.multiWriteZK()}). + */ +public final class ZkMultiWriteBuilder { + + /** Pass as {@code expectedVersion} to skip the version check on a {@link #set}/{@link #delete}. */ + public static final int ANY_VERSION = -1; + + // ZNRecordSerializer is documented thread-safe by Helix (no mutable state beyond per-call buffers), + // so a single static instance is fine across concurrent builders. + private static final ZNRecordSerializer SERIALIZER = new ZNRecordSerializer(); + + private final RealmAwareZkClient _zkClient; + private final List _ops = new ArrayList<>(); + private boolean _executed; + + public ZkMultiWriteBuilder(RealmAwareZkClient zkClient) { + _zkClient = Preconditions.checkNotNull(zkClient, "zkClient"); + } + + /** + * Set (overwrite) the znode at {@code path} to {@code record}, with a CAS check on + * {@code expectedVersion}. Pass {@link #ANY_VERSION} to skip the check. + */ + public ZkMultiWriteBuilder set(String path, ZNRecord record, int expectedVersion) { + checkNotExecuted(); + Preconditions.checkNotNull(path, "path"); + Preconditions.checkNotNull(record, "record"); + _ops.add(Op.setData(path, SERIALIZER.serialize(record), expectedVersion)); + return this; + } + + /** Set without a version check; equivalent to {@code set(path, record, ANY_VERSION)}. */ + public ZkMultiWriteBuilder set(String path, ZNRecord record) { + return set(path, record, ANY_VERSION); + } + + /** Create a persistent znode at {@code path} with {@code record} as its data. */ + public ZkMultiWriteBuilder create(String path, ZNRecord record) { + checkNotExecuted(); + Preconditions.checkNotNull(path, "path"); + Preconditions.checkNotNull(record, "record"); + _ops.add(Op.create(path, SERIALIZER.serialize(record), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + return this; + } + + /** + * Delete the znode at {@code path}, with a CAS check on {@code expectedVersion}. Pass + * {@link #ANY_VERSION} to skip the check. + */ + public ZkMultiWriteBuilder delete(String path, int expectedVersion) { + checkNotExecuted(); + Preconditions.checkNotNull(path, "path"); + _ops.add(Op.delete(path, expectedVersion)); + return this; + } + + /** Delete without a version check; equivalent to {@code delete(path, ANY_VERSION)}. */ + public ZkMultiWriteBuilder delete(String path) { + return delete(path, ANY_VERSION); + } + + /** + * Assert the version of the znode at {@code path}. No mutation; gates other ops in the batch + * atomically — the whole transaction fails with {@link KeeperException.BadVersionException} if the + * version no longer matches. + */ + public ZkMultiWriteBuilder check(String path, int expectedVersion) { + checkNotExecuted(); + Preconditions.checkNotNull(path, "path"); + _ops.add(Op.check(path, expectedVersion)); + return this; + } + + /** + * Submit the accumulated ops as a single atomic ZK {@code multi()} transaction. Throws + * {@link KeeperException} on atomic rollback (subtype identifies the cause). Throws + * {@link IllegalStateException} if called more than once or if no ops have been added. + * Connectivity / session failures propagate as the original {@link ZkException}. + */ + public void execute() + throws KeeperException { + checkNotExecuted(); + _executed = true; + Preconditions.checkState(!_ops.isEmpty(), "no ops to execute"); + try { + _zkClient.multi(_ops); + } catch (ZkException ze) { + Throwable cause = ze.getCause(); + if (cause instanceof KeeperException) { + throw (KeeperException) cause; + } + throw ze; + } + } + + private void checkNotExecuted() { + Preconditions.checkState(!_executed, "ZkMultiWriteBuilder already executed"); + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriter.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriter.java deleted file mode 100644 index c4b985d6eda2..000000000000 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriter.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.common.utils.helix; - -import com.google.common.base.Preconditions; -import java.util.ArrayList; -import java.util.List; -import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; -import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; -import org.apache.helix.zookeeper.zkclient.exception.ZkException; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Op; -import org.apache.zookeeper.OpResult; - - -/** - * Stateless helper that submits a list of {@link PinotZkOp}s as a ZooKeeper atomic {@code multi()} - * transaction. The {@link RealmAwareZkClient} is owned by the caller; this class only - * serializes ops and translates the result or exception into a {@link PinotZkMultiResult}. - *

Atomic rollback (e.g. {@code BADVERSION}, {@code NONODE}, {@code NODEEXISTS}) is returned as a - * failed {@link PinotZkMultiResult} with the offending op index and ZK error code. Connectivity - * and session failures ({@code ZkTimeoutException}, {@code ZkInterruptedException}, etc.) are - * allowed to propagate. - *

Typed against the {@code RealmAwareZkClient} interface (not the concrete - * {@code org.apache.helix.zookeeper.zkclient.ZkClient}) so callers can pass in either a dedicated - * client or the Helix-managed one if/when that becomes reachable. - */ -public final class ZkMultiWriter { - - // ZNRecordSerializer is documented thread-safe by Helix (no mutable state beyond reusable buffers - // inside serialize/deserialize), so a single static instance is fine across concurrent callers. - private static final ZNRecordSerializer SERIALIZER = new ZNRecordSerializer(); - - private ZkMultiWriter() { - } - - /** - * Submit {@code ops} as a single atomic ZK {@code multi()} transaction against {@code zkClient}. - * Returns a {@link PinotZkMultiResult} describing per-op outcomes. Throws only on non-atomic - * failures (connection loss, session expiry, interrupt, timeout). - */ - public static PinotZkMultiResult multi(RealmAwareZkClient zkClient, List ops) { - Preconditions.checkNotNull(zkClient, "zkClient"); - Preconditions.checkNotNull(ops, "ops"); - Preconditions.checkArgument(!ops.isEmpty(), "ops must not be empty"); - - List zkOps = new ArrayList<>(ops.size()); - for (int i = 0; i < ops.size(); i++) { - PinotZkOp op = ops.get(i); - Preconditions.checkNotNull(op, "ops[%s] is null", i); - zkOps.add(op.toZkOp(SERIALIZER)); - } - - try { - List results = zkClient.multi(zkOps); - return buildSuccess(ops, results); - } catch (ZkException ze) { - Throwable cause = ze.getCause(); - if (cause instanceof KeeperException) { - return buildFailure(ops, (KeeperException) cause); - } - // Non-atomic failure (timeout, interrupt, etc.) or unknown wrapping — propagate. - throw ze; - } - } - - private static PinotZkMultiResult buildSuccess(List ops, List results) { - List outcomes = new ArrayList<>(ops.size()); - for (int i = 0; i < ops.size(); i++) { - PinotZkOp op = ops.get(i); - OpResult r = results.get(i); - outcomes.add(new PinotZkMultiResult.OpOutcome(op.getPath(), statFromOpResult(r), null)); - } - return PinotZkMultiResult.success(outcomes); - } - - private static PinotZkMultiResult buildFailure(List ops, KeeperException ke) { - List results = ke.getResults(); - List outcomes = new ArrayList<>(ops.size()); - int failedIndex = -1; - KeeperException.Code failedCode = ke.code(); - - if (results == null) { - // Multi failed before per-op results were populated — treat offender as unlocalizable. - for (PinotZkOp op : ops) { - outcomes.add(new PinotZkMultiResult.OpOutcome(op.getPath(), null, failedCode)); - } - return PinotZkMultiResult.failure(outcomes, -1, failedCode); - } - - for (int i = 0; i < ops.size(); i++) { - PinotZkOp op = ops.get(i); - OpResult r = i < results.size() ? results.get(i) : null; - KeeperException.Code code = null; - if (r instanceof OpResult.ErrorResult) { - int err = ((OpResult.ErrorResult) r).getErr(); - code = KeeperException.Code.get(err); - // The first op whose code is not OK / not the runtime-rollback marker is the offender. - if (failedIndex == -1 && code != KeeperException.Code.OK - && code != KeeperException.Code.RUNTIMEINCONSISTENCY) { - failedIndex = i; - failedCode = code; - } - } - outcomes.add(new PinotZkMultiResult.OpOutcome(op.getPath(), statFromOpResult(r), code)); - } - - // failedIndex == -1 here means we couldn't localize the offender from per-op results; - // caller sees the top-level code with index -1 ("unknown op"). - return PinotZkMultiResult.failure(outcomes, failedIndex, failedCode); - } - - private static org.apache.zookeeper.data.Stat statFromOpResult(OpResult r) { - if (r instanceof OpResult.SetDataResult) { - return ((OpResult.SetDataResult) r).getStat(); - } - if (r instanceof OpResult.CreateResult) { - return ((OpResult.CreateResult) r).getStat(); - } - return null; - } -} diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriterTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilderTest.java similarity index 65% rename from pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriterTest.java rename to pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilderTest.java index 9367b5b153da..3d7cad272170 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriterTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilderTest.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.common.utils.helix; -import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; @@ -33,9 +32,9 @@ import org.testng.annotations.Test; -public class ZkMultiWriterTest { +public class ZkMultiWriteBuilderTest { - private static final String ROOT = "/ZkMultiWriterTest"; + private static final String ROOT = "/ZkMultiWriteBuilderTest"; private ZkStarter.ZookeeperInstance _zk; private ZkClient _client; @@ -92,45 +91,48 @@ private int version(String path) { return s.getVersion(); } + private ZkMultiWriteBuilder builder() { + return new ZkMultiWriteBuilder(_client); + } + // ----------------------------------------------------------------------- // Tests // ----------------------------------------------------------------------- @Test - public void testAllSetSuccess() { + public void testAllSetSuccess() + throws KeeperException { String pA = ROOT + "/a"; String pB = ROOT + "/b"; seed(pA, record("a", "v", "1")); seed(pB, record("b", "v", "1")); - PinotZkMultiResult res = ZkMultiWriter.multi(_client, List.of( - PinotZkOp.set(pA, record("a", "v", "2"), 0), - PinotZkOp.set(pB, record("b", "v", "2"), 0))); + builder() + .set(pA, record("a", "v", "2"), 0) + .set(pB, record("b", "v", "2"), 0) + .execute(); - Assert.assertTrue(res.isSuccess()); - Assert.assertEquals(res.getFailedOpIndex(), -1); - Assert.assertNull(res.getFailureCode()); - Assert.assertEquals(res.getOutcomes().size(), 2); - Assert.assertNotNull(res.getOutcomes().get(0).getStat()); - Assert.assertEquals(res.getOutcomes().get(0).getStat().getVersion(), 1); Assert.assertEquals(read(pA).getSimpleField("v"), "2"); Assert.assertEquals(read(pB).getSimpleField("v"), "2"); + Assert.assertEquals(version(pA), 1); + Assert.assertEquals(version(pB), 1); } @Test - public void testMixedCreateSetDeleteSuccess() { + public void testMixedCreateSetDeleteSuccess() + throws KeeperException { String pExisting = ROOT + "/existing"; String pNew = ROOT + "/new"; String pStale = ROOT + "/stale"; seed(pExisting, record("existing", "v", "1")); seed(pStale, record("stale", "v", "x")); - PinotZkMultiResult res = ZkMultiWriter.multi(_client, List.of( - PinotZkOp.set(pExisting, record("existing", "v", "2"), 0), - PinotZkOp.create(pNew, record("new", "v", "1")), - PinotZkOp.delete(pStale, 0))); + builder() + .set(pExisting, record("existing", "v", "2"), 0) + .create(pNew, record("new", "v", "1")) + .delete(pStale, 0) + .execute(); - Assert.assertTrue(res.isSuccess(), "result: " + res); Assert.assertEquals(read(pExisting).getSimpleField("v"), "2"); Assert.assertEquals(read(pNew).getSimpleField("v"), "1"); Assert.assertFalse(_client.exists(pStale)); @@ -147,13 +149,11 @@ public void testBadVersionAtomicRollback() { _client.writeData(pB, record("b", "v", "bumped")); Assert.assertEquals(version(pB), 1); - PinotZkMultiResult res = ZkMultiWriter.multi(_client, List.of( - PinotZkOp.set(pA, record("a", "v", "2"), 0), - PinotZkOp.set(pB, record("b", "v", "2"), 0))); // stale version -> BADVERSION - - Assert.assertFalse(res.isSuccess()); - Assert.assertEquals(res.getFailedOpIndex(), 1); - Assert.assertEquals(res.getFailureCode(), KeeperException.Code.BADVERSION); + Assert.expectThrows(KeeperException.BadVersionException.class, () -> + builder() + .set(pA, record("a", "v", "2"), 0) + .set(pB, record("b", "v", "2"), 0) // stale version -> BADVERSION + .execute()); // Atomic rollback — pA must NOT have been updated. Assert.assertEquals(read(pA).getSimpleField("v"), "1"); @@ -171,13 +171,12 @@ public void testCheckOpGatesSet() { // Bump gate's version; check(gate, 0) should fail and prevent the set. _client.writeData(pGate, record("gate", "v", "bumped")); - PinotZkMultiResult res = ZkMultiWriter.multi(_client, List.of( - PinotZkOp.check(pGate, 0), - PinotZkOp.set(pTarget, record("target", "v", "2"), 0))); + Assert.expectThrows(KeeperException.BadVersionException.class, () -> + builder() + .check(pGate, 0) + .set(pTarget, record("target", "v", "2"), 0) + .execute()); - Assert.assertFalse(res.isSuccess()); - Assert.assertEquals(res.getFailedOpIndex(), 0); - Assert.assertEquals(res.getFailureCode(), KeeperException.Code.BADVERSION); Assert.assertEquals(read(pTarget).getSimpleField("v"), "1", "target must not have been mutated"); } @@ -187,13 +186,12 @@ public void testDeleteNonExistentRollback() { String pMissing = ROOT + "/missing"; seed(pExisting, record("existing", "v", "1")); - PinotZkMultiResult res = ZkMultiWriter.multi(_client, List.of( - PinotZkOp.set(pExisting, record("existing", "v", "2"), 0), - PinotZkOp.delete(pMissing, PinotZkOp.ANY_VERSION))); + Assert.expectThrows(KeeperException.NoNodeException.class, () -> + builder() + .set(pExisting, record("existing", "v", "2"), 0) + .delete(pMissing) + .execute()); - Assert.assertFalse(res.isSuccess()); - Assert.assertEquals(res.getFailedOpIndex(), 1); - Assert.assertEquals(res.getFailureCode(), KeeperException.Code.NONODE); // pExisting must NOT have been updated. Assert.assertEquals(read(pExisting).getSimpleField("v"), "1"); } @@ -205,29 +203,52 @@ public void testCreateExistingNodeRollback() { seed(pA, record("a", "v", "1")); seed(pB, record("b", "v", "existing")); - PinotZkMultiResult res = ZkMultiWriter.multi(_client, List.of( - PinotZkOp.set(pA, record("a", "v", "2"), 0), - PinotZkOp.create(pB, record("b", "v", "fresh")))); + Assert.expectThrows(KeeperException.NodeExistsException.class, () -> + builder() + .set(pA, record("a", "v", "2"), 0) + .create(pB, record("b", "v", "fresh")) + .execute()); - Assert.assertFalse(res.isSuccess()); - Assert.assertEquals(res.getFailedOpIndex(), 1); - Assert.assertEquals(res.getFailureCode(), KeeperException.Code.NODEEXISTS); Assert.assertEquals(read(pA).getSimpleField("v"), "1"); Assert.assertEquals(read(pB).getSimpleField("v"), "existing"); } @Test - public void testAnyVersionSetSucceedsRegardlessOfVersion() { + public void testAnyVersionSetSucceedsRegardlessOfVersion() + throws KeeperException { String pA = ROOT + "/a"; seed(pA, record("a", "v", "1")); _client.writeData(pA, record("a", "v", "bumped")); _client.writeData(pA, record("a", "v", "bumped-again")); Assert.assertEquals(version(pA), 2); - PinotZkMultiResult res = ZkMultiWriter.multi(_client, List.of( - PinotZkOp.set(pA, record("a", "v", "final"), PinotZkOp.ANY_VERSION))); + builder().set(pA, record("a", "v", "final")).execute(); - Assert.assertTrue(res.isSuccess()); Assert.assertEquals(read(pA).getSimpleField("v"), "final"); } + + @Test + public void testBuilderRejectsDoubleExecute() + throws KeeperException { + String pA = ROOT + "/a"; + seed(pA, record("a", "v", "1")); + + // After a successful execute(), the builder rejects further calls. + ZkMultiWriteBuilder b = builder().set(pA, record("a", "v", "2"), 0); + b.execute(); + Assert.expectThrows(IllegalStateException.class, b::execute); + Assert.expectThrows(IllegalStateException.class, () -> b.set(pA, record("a", "v", "3"), 1)); + + // After a failed execute() (atomic rollback), the builder is also burned — no retry through + // the same instance. Caller must obtain a fresh builder for the retry tick. + ZkMultiWriteBuilder failed = builder().set(pA, record("a", "v", "x"), 99); // stale version + Assert.expectThrows(KeeperException.BadVersionException.class, failed::execute); + Assert.expectThrows(IllegalStateException.class, failed::execute); + Assert.expectThrows(IllegalStateException.class, () -> failed.set(pA, record("a", "v", "y"), 1)); + + // Empty execute() also burns the builder. + ZkMultiWriteBuilder empty = builder(); + Assert.expectThrows(IllegalStateException.class, empty::execute); + Assert.expectThrows(IllegalStateException.class, empty::execute); + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 26fae8b2f4b5..0972080911f7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -137,9 +137,7 @@ import org.apache.pinot.common.utils.config.TierConfigUtils; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvider; -import org.apache.pinot.common.utils.helix.PinotZkMultiResult; -import org.apache.pinot.common.utils.helix.PinotZkOp; -import org.apache.pinot.common.utils.helix.ZkMultiWriter; +import org.apache.pinot.common.utils.helix.ZkMultiWriteBuilder; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.api.exception.InvalidTableConfigException; @@ -2124,23 +2122,19 @@ public boolean setZKData(String path, ZNRecord record, int expectedVersion, int } /** - * Submits a list of znode operations as a single atomic ZooKeeper {@code multi()} transaction. - * Each op is independently a {@link PinotZkOp#set}, {@link PinotZkOp#create}, - * {@link PinotZkOp#delete}, or {@link PinotZkOp#check} (version assertion without mutation) on - * any path. Either every op commits or none do. Per-op version checks make this the transactional - * CAS primitive for cross-znode controller writes. - *

Returns a {@link PinotZkMultiResult}. On atomic rollback (e.g. version mismatch, node - * missing, node already exists), the result carries the failing op index and ZK error code — - * no exception is thrown. Unlike the single-znode wrappers on this class ({@code - * setZKData}, {@code createZKNode}, {@code deleteZKPath}) which return {@code boolean} on - * any failure, {@code multiWriteZK} throws on connectivity / session failures (they are not an - * atomic outcome and cannot be meaningfully rolled back into a per-op result). - *

Requires the controller to have been constructed with a non-null zkAddress (via the - * {@link ControllerConf} constructor); otherwise throws {@link IllegalStateException}. Also - * throws {@link IllegalStateException} if called after {@link #stop()}. - */ - public PinotZkMultiResult multiWriteZK(List ops) { - return ZkMultiWriter.multi(getOrBuildMultiWriteZkClient(), ops); + * Returns a fresh {@link ZkMultiWriteBuilder} for submitting an atomic ZooKeeper {@code multi()} + * transaction (set / create / delete / version-check ops on any combination of znodes). Either + * every op commits or none do. + *

Eagerly validates the dedicated multi-write ZK client is available: requires the controller + * to have been constructed with a non-null zkAddress (via the {@link ControllerConf} constructor) + * and not yet {@link #stop()}-ped, otherwise throws {@link IllegalStateException}. + *

The builder's {@code execute()} throws {@link org.apache.zookeeper.KeeperException} on atomic + * rollback (the subtype identifies the cause: {@code BadVersionException}, {@code NoNodeException}, + * {@code NodeExistsException}, ...). Connectivity / session failures propagate as the original + * {@link org.apache.helix.zookeeper.zkclient.exception.ZkException}. + */ + public ZkMultiWriteBuilder multiWriteZK() { + return new ZkMultiWriteBuilder(getOrBuildMultiWriteZkClient()); } private ZkClient getOrBuildMultiWriteZkClient() { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java index 33aa04bb7b68..6063b0343b80 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java @@ -20,15 +20,12 @@ import java.lang.reflect.Field; import java.util.Collections; -import java.util.List; import java.util.TreeMap; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.PropertyKey; import org.apache.helix.model.InstanceConfig; -import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.utils.config.InstanceUtils; -import org.apache.pinot.common.utils.helix.PinotZkOp; import org.apache.pinot.controller.helix.core.lineage.LineageManager; import org.apache.pinot.spi.config.instance.Instance; import org.apache.pinot.spi.config.instance.InstanceConfigValidatorRegistry; @@ -167,10 +164,9 @@ public void testAddInstanceSucceedsWithPassingValidator() { @Test public void testMultiWriteZkThrowsWhenZkAddressNotConfigured() { - // 7-arg test constructor leaves _zkAddress null; multiWriteZK must refuse to lazy-build. - List ops = List.of(PinotZkOp.set("/anything", new ZNRecord("x"), 0)); + // 7-arg test constructor leaves _zkAddress null; the builder factory itself must refuse. try { - _resourceManager.multiWriteZK(ops); + _resourceManager.multiWriteZK(); fail("Expected IllegalStateException"); } catch (IllegalStateException expected) { assertTrue(expected.getMessage().contains("zkAddress"), @@ -184,9 +180,8 @@ public void testMultiWriteZkThrowsAfterStop() // _segmentDeletionManager is normally wired by start(); mock it so stop() doesn't NPE. setField("_segmentDeletionManager", Mockito.mock(SegmentDeletionManager.class)); _resourceManager.stop(); - List ops = List.of(PinotZkOp.set("/anything", new ZNRecord("x"), 0)); try { - _resourceManager.multiWriteZK(ops); + _resourceManager.multiWriteZK(); fail("Expected IllegalStateException"); } catch (IllegalStateException expected) { // either "stopped" or "zkAddress" message is acceptable — both mean "no client available" From 80f1d9aafcd6fc5f8e253af0c76cd851802c59a5 Mon Sep 17 00:00:00 2001 From: Krishan Goyal Date: Thu, 30 Apr 2026 13:58:42 +0530 Subject: [PATCH 3/3] Address PR review: tighten ZkMultiWriteBuilder + happy-path test - Switch ZkMultiWriteBuilder ctor to concrete ZkClient (matches the rest of Pinot, which uses ZkClient over RealmAwareZkClient). - Document the builder is single-use and not thread-safe. - Restrict multiWriteZK to Helix property-store paths: builder takes a propertyStoreRoot prefix and op paths are property-store-relative (the same paths callers pass to ZkHelixPropertyStore). Validates prefix shape in the constructor. - Simplify PinotHelixResourceManager multi-write client setup: derive zkAddress from the started Helix manager instead of caching it + drop unused _stopped flag. - Add PinotHelixResourceManagerStatelessTest#testMultiWriteZkSegment MetadataUpdates: pre-creates two SegmentZKMetadata znodes, updates both atomically via multiWriteZK(), and verifies round-trip read through the property store. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../utils/helix/ZkMultiWriteBuilder.java | 77 ++++++++++++------- .../utils/helix/ZkMultiWriteBuilderTest.java | 3 +- .../helix/core/PinotHelixResourceManager.java | 60 +++++---------- ...ixResourceManagerConfigValidationTest.java | 24 +----- ...inotHelixResourceManagerStatelessTest.java | 47 +++++++++++ 5 files changed, 124 insertions(+), 87 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilder.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilder.java index 371e3b8468af..1053ed414845 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilder.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilder.java @@ -21,9 +21,8 @@ import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.List; -import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.datamodel.ZNRecord; -import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.helix.zookeeper.impl.client.ZkClient; import org.apache.helix.zookeeper.zkclient.exception.ZkException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -32,9 +31,14 @@ /** - * Fluent builder for an atomic ZooKeeper {@code multi()} transaction. Accumulates ops via - * {@link #set}/{@link #create}/{@link #delete}/{@link #check} and submits them as a single - * all-or-nothing batch on {@link #execute()}. + * Fluent builder for an atomic ZooKeeper {@code multi()} transaction over Helix property-store + * paths. Accumulates ops via {@link #set}/{@link #create}/{@link #delete}/{@link #check} and + * submits them as a single all-or-nothing batch on {@link #execute()}. + * + *

Paths passed to op methods are property-store-relative — i.e. the same path you would pass to + * {@code ZkHelixPropertyStore.set(...)}, e.g. {@code /SEGMENTS/{table}/{segment}}. The builder + * prepends the configured property-store root (e.g. {@code /{cluster}/PROPERTYSTORE}) before + * submitting to ZK. Multi-path writes outside the property store are intentionally not supported. * *

On atomic rollback (e.g. version mismatch, node missing, node already exists), {@link #execute()} * throws the underlying {@link KeeperException} subtype ({@code BadVersionException}, @@ -47,33 +51,44 @@ * *

Single-use: each instance can be executed at most once. Obtain a fresh builder per transaction * (typically via {@code PinotHelixResourceManager.multiWriteZK()}). + * + *

Not thread-safe: instance state ({@code _ops}, {@code _executed}) is mutated by every fluent + * call. A single builder must not be shared across threads; use a fresh builder per thread. */ public final class ZkMultiWriteBuilder { /** Pass as {@code expectedVersion} to skip the version check on a {@link #set}/{@link #delete}. */ public static final int ANY_VERSION = -1; - // ZNRecordSerializer is documented thread-safe by Helix (no mutable state beyond per-call buffers), - // so a single static instance is fine across concurrent builders. - private static final ZNRecordSerializer SERIALIZER = new ZNRecordSerializer(); - - private final RealmAwareZkClient _zkClient; + private final ZkClient _zkClient; + private final String _propertyStoreRoot; private final List _ops = new ArrayList<>(); private boolean _executed; - public ZkMultiWriteBuilder(RealmAwareZkClient zkClient) { + /** + * @param zkClient ZK client used to serialize records and submit the transaction. + * @param propertyStoreRoot absolute ZK path that all op paths are prefixed with (typically + * {@code /{clusterName}/PROPERTYSTORE}). Must start with {@code /} and not end with {@code /}. + * Pass {@code ""} to operate on raw absolute paths (test-only). + */ + public ZkMultiWriteBuilder(ZkClient zkClient, String propertyStoreRoot) { _zkClient = Preconditions.checkNotNull(zkClient, "zkClient"); + Preconditions.checkNotNull(propertyStoreRoot, "propertyStoreRoot"); + Preconditions.checkArgument( + propertyStoreRoot.isEmpty() || (propertyStoreRoot.startsWith("/") && !propertyStoreRoot.endsWith("/")), + "propertyStoreRoot must be empty or start with '/' and not end with '/': %s", propertyStoreRoot); + _propertyStoreRoot = propertyStoreRoot; } /** - * Set (overwrite) the znode at {@code path} to {@code record}, with a CAS check on - * {@code expectedVersion}. Pass {@link #ANY_VERSION} to skip the check. + * Set (overwrite) the znode at {@code path} (property-store-relative) to {@code record}, with a + * CAS check on {@code expectedVersion}. Pass {@link #ANY_VERSION} to skip the check. */ public ZkMultiWriteBuilder set(String path, ZNRecord record, int expectedVersion) { checkNotExecuted(); - Preconditions.checkNotNull(path, "path"); Preconditions.checkNotNull(record, "record"); - _ops.add(Op.setData(path, SERIALIZER.serialize(record), expectedVersion)); + String fullPath = resolve(path); + _ops.add(Op.setData(fullPath, _zkClient.serialize(record, fullPath), expectedVersion)); return this; } @@ -82,23 +97,26 @@ public ZkMultiWriteBuilder set(String path, ZNRecord record) { return set(path, record, ANY_VERSION); } - /** Create a persistent znode at {@code path} with {@code record} as its data. */ + /** + * Create a persistent znode at {@code path} (property-store-relative) with {@code record} as its + * data. + */ public ZkMultiWriteBuilder create(String path, ZNRecord record) { checkNotExecuted(); - Preconditions.checkNotNull(path, "path"); Preconditions.checkNotNull(record, "record"); - _ops.add(Op.create(path, SERIALIZER.serialize(record), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + String fullPath = resolve(path); + _ops.add( + Op.create(fullPath, _zkClient.serialize(record, fullPath), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); return this; } /** - * Delete the znode at {@code path}, with a CAS check on {@code expectedVersion}. Pass - * {@link #ANY_VERSION} to skip the check. + * Delete the znode at {@code path} (property-store-relative), with a CAS check on + * {@code expectedVersion}. Pass {@link #ANY_VERSION} to skip the check. */ public ZkMultiWriteBuilder delete(String path, int expectedVersion) { checkNotExecuted(); - Preconditions.checkNotNull(path, "path"); - _ops.add(Op.delete(path, expectedVersion)); + _ops.add(Op.delete(resolve(path), expectedVersion)); return this; } @@ -108,14 +126,13 @@ public ZkMultiWriteBuilder delete(String path) { } /** - * Assert the version of the znode at {@code path}. No mutation; gates other ops in the batch - * atomically — the whole transaction fails with {@link KeeperException.BadVersionException} if the - * version no longer matches. + * Assert the version of the znode at {@code path} (property-store-relative). No mutation; gates + * other ops in the batch atomically — the whole transaction fails with + * {@link KeeperException.BadVersionException} if the version no longer matches. */ public ZkMultiWriteBuilder check(String path, int expectedVersion) { checkNotExecuted(); - Preconditions.checkNotNull(path, "path"); - _ops.add(Op.check(path, expectedVersion)); + _ops.add(Op.check(resolve(path), expectedVersion)); return this; } @@ -144,4 +161,10 @@ public void execute() private void checkNotExecuted() { Preconditions.checkState(!_executed, "ZkMultiWriteBuilder already executed"); } + + private String resolve(String path) { + Preconditions.checkNotNull(path, "path"); + Preconditions.checkArgument(path.startsWith("/"), "path must start with '/': %s", path); + return _propertyStoreRoot + path; + } } diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilderTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilderTest.java index 3d7cad272170..b09e316a44eb 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilderTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/helix/ZkMultiWriteBuilderTest.java @@ -92,7 +92,8 @@ private int version(String path) { } private ZkMultiWriteBuilder builder() { - return new ZkMultiWriteBuilder(_client); + // Tests use absolute paths under ROOT, so pass empty prefix (no property-store rebase). + return new ZkMultiWriteBuilder(_client, ""); } // ----------------------------------------------------------------------- diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 0972080911f7..44f13a550082 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -235,10 +235,6 @@ private enum LineageUpdateType { private final boolean _enableBatchMessageMode; private final int _deletedSegmentsRetentionInDays; private final boolean _enableTieredSegmentAssignment; - @Nullable - private final String _zkAddress; - private final int _zkSessionTimeoutMs; - private final int _zkConnectTimeoutMs; private HelixManager _helixZkManager; private HelixAdmin _helixAdmin; @@ -258,30 +254,16 @@ private enum LineageUpdateType { // renames. The resulting extra session is consistent with the controller's existing footprint // (_propertyStore cache client, _leadControllerManager manager client are each distinct). private volatile ZkClient _zkClient; - private volatile boolean _stopped; public PinotHelixResourceManager(String helixClusterName, @Nullable String dataDir, boolean isSingleTenantCluster, boolean enableBatchMessageMode, int deletedSegmentsRetentionInDays, boolean enableTieredSegmentAssignment, LineageManager lineageManager) { - this(helixClusterName, dataDir, isSingleTenantCluster, enableBatchMessageMode, deletedSegmentsRetentionInDays, - enableTieredSegmentAssignment, lineageManager, null, - CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS, - CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS); - } - - private PinotHelixResourceManager(String helixClusterName, @Nullable String dataDir, - boolean isSingleTenantCluster, boolean enableBatchMessageMode, int deletedSegmentsRetentionInDays, - boolean enableTieredSegmentAssignment, LineageManager lineageManager, @Nullable String zkAddress, - int zkSessionTimeoutMs, int zkConnectTimeoutMs) { _helixClusterName = helixClusterName; _dataDir = dataDir; _isSingleTenantCluster = isSingleTenantCluster; _enableBatchMessageMode = enableBatchMessageMode; _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays; _enableTieredSegmentAssignment = enableTieredSegmentAssignment; - _zkAddress = zkAddress; - _zkSessionTimeoutMs = zkSessionTimeoutMs; - _zkConnectTimeoutMs = zkConnectTimeoutMs; _instanceAdminEndpointCache = CacheBuilder.newBuilder().expireAfterWrite(CACHE_ENTRY_EXPIRE_TIME_HOURS, TimeUnit.HOURS) .build(new CacheLoader<>() { @@ -304,11 +286,7 @@ public PinotHelixResourceManager(ControllerConf controllerConf) { this(controllerConf.getHelixClusterName(), controllerConf.getDataDir(), controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(), controllerConf.getDeletedSegmentsRetentionInDays(), controllerConf.tieredSegmentAssignmentEnabled(), - LineageManagerFactory.create(controllerConf), controllerConf.getZkStr(), - controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG, - CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS), - controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG, - CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS)); + LineageManagerFactory.create(controllerConf)); } /** @@ -369,7 +347,6 @@ public void onInstanceConfigChange(List instanceConfigs, Notific * Stop the Pinot controller instance. */ public synchronized void stop() { - _stopped = true; _segmentDeletionManager.stop(); ZkClient zkClient = _zkClient; if (zkClient != null) { @@ -2123,18 +2100,21 @@ public boolean setZKData(String path, ZNRecord record, int expectedVersion, int /** * Returns a fresh {@link ZkMultiWriteBuilder} for submitting an atomic ZooKeeper {@code multi()} - * transaction (set / create / delete / version-check ops on any combination of znodes). Either - * every op commits or none do. - *

Eagerly validates the dedicated multi-write ZK client is available: requires the controller - * to have been constructed with a non-null zkAddress (via the {@link ControllerConf} constructor) - * and not yet {@link #stop()}-ped, otherwise throws {@link IllegalStateException}. + * transaction over Helix property-store paths (set / create / delete / version-check ops on any + * combination of property-store znodes). Either every op commits or none do. + *

Op paths are property-store-relative (e.g. {@code /SEGMENTS/{table}/{segment}}); the builder + * prepends {@code /{cluster}/PROPERTYSTORE} before submitting to ZK. Multi-path writes outside + * the property store are intentionally not supported. + *

Requires {@link #start} to have been called (so the ZK address is reachable via the Helix + * manager); throws {@link IllegalStateException} otherwise. *

The builder's {@code execute()} throws {@link org.apache.zookeeper.KeeperException} on atomic * rollback (the subtype identifies the cause: {@code BadVersionException}, {@code NoNodeException}, * {@code NodeExistsException}, ...). Connectivity / session failures propagate as the original * {@link org.apache.helix.zookeeper.zkclient.exception.ZkException}. */ public ZkMultiWriteBuilder multiWriteZK() { - return new ZkMultiWriteBuilder(getOrBuildMultiWriteZkClient()); + return new ZkMultiWriteBuilder(getOrBuildMultiWriteZkClient(), + PropertyPathBuilder.propertyStore(_helixClusterName)); } private ZkClient getOrBuildMultiWriteZkClient() { @@ -2144,21 +2124,23 @@ private ZkClient getOrBuildMultiWriteZkClient() { } synchronized (this) { if (_zkClient == null) { - Preconditions.checkState(!_stopped, "PinotHelixResourceManager is stopped"); - Preconditions.checkState(_zkAddress != null, - "multiWriteZK unavailable: zkAddress not configured on PinotHelixResourceManager"); + Preconditions.checkState(_helixZkManager != null, + "multiWriteZK unavailable: PinotHelixResourceManager has not been started"); + String zkAddress = _helixZkManager.getMetadataStoreConnectionString(); + int sessionTimeoutMs = CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS; + int connectTimeoutMs = CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS; LOGGER.info("Building dedicated multiWriteZK ZkClient at {} (session={}ms, connect={}ms)", - _zkAddress, _zkSessionTimeoutMs, _zkConnectTimeoutMs); + zkAddress, sessionTimeoutMs, connectTimeoutMs); ZkClient built = new ZkClient.Builder() - .setZkServer(_zkAddress) - .setSessionTimeout(_zkSessionTimeoutMs) - .setConnectionTimeout(_zkConnectTimeoutMs) + .setZkServer(zkAddress) + .setSessionTimeout(sessionTimeoutMs) + .setConnectionTimeout(connectTimeoutMs) .setZkSerializer(new ZNRecordSerializer()) .build(); - if (!built.waitUntilConnected(_zkConnectTimeoutMs, TimeUnit.MILLISECONDS)) { + if (!built.waitUntilConnected(connectTimeoutMs, TimeUnit.MILLISECONDS)) { ZkStarter.closeAsync(built); throw new RuntimeException( - "Timed out connecting to ZK at " + _zkAddress + " after " + _zkConnectTimeoutMs + "Timed out connecting to ZK at " + zkAddress + " after " + connectTimeoutMs + "ms for multiWriteZK"); } _zkClient = built; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java index 6063b0343b80..2348fef884e1 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerConfigValidationTest.java @@ -163,30 +163,14 @@ public void testAddInstanceSucceedsWithPassingValidator() { } @Test - public void testMultiWriteZkThrowsWhenZkAddressNotConfigured() { - // 7-arg test constructor leaves _zkAddress null; the builder factory itself must refuse. + public void testMultiWriteZkThrowsBeforeStart() { + // The fixture never calls start(), so _helixZkManager is null. multiWriteZK() must refuse to + // build a client — the ZK address is derived from the Helix manager. try { _resourceManager.multiWriteZK(); fail("Expected IllegalStateException"); } catch (IllegalStateException expected) { - assertTrue(expected.getMessage().contains("zkAddress"), - "Unexpected message: " + expected.getMessage()); - } - } - - @Test - public void testMultiWriteZkThrowsAfterStop() - throws Exception { - // _segmentDeletionManager is normally wired by start(); mock it so stop() doesn't NPE. - setField("_segmentDeletionManager", Mockito.mock(SegmentDeletionManager.class)); - _resourceManager.stop(); - try { - _resourceManager.multiWriteZK(); - fail("Expected IllegalStateException"); - } catch (IllegalStateException expected) { - // either "stopped" or "zkAddress" message is acceptable — both mean "no client available" - assertTrue( - expected.getMessage().contains("stopped") || expected.getMessage().contains("zkAddress"), + assertTrue(expected.getMessage().contains("not been started"), "Unexpected message: " + expected.getMessage()); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java index bc643b8c92e4..cd70ff00e999 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java @@ -1741,6 +1741,53 @@ public void testGetConsumerWatermarks() deleteSchema(rawTableName); } + /** + * Happy-path coverage for {@link PinotHelixResourceManager#multiWriteZK()}: pre-creates two + * segment ZK metadata znodes, atomically updates both via a single multi() transaction, then + * reads back through the property store and asserts the mutated fields round-tripped. Verifies + * the dedicated multi-write ZkClient is built correctly and the ZNRecord serialization / + * deserialization path matches what the rest of the controller uses. + */ + @Test + public void testMultiWriteZkSegmentMetadataUpdates() + throws Exception { + String segName1 = "multiWriteZk_seg_1"; + String segName2 = "multiWriteZk_seg_2"; + SegmentZKMetadata seg1 = new SegmentZKMetadata(segName1); + seg1.setCrc(1L); + seg1.setTotalDocs(10L); + SegmentZKMetadata seg2 = new SegmentZKMetadata(segName2); + seg2.setCrc(2L); + seg2.setTotalDocs(20L); + assertTrue(ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, seg1)); + assertTrue(ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, seg2)); + + // Mutate both and submit as a single atomic transaction. + seg1.setCrc(11L); + seg1.setTotalDocs(110L); + seg2.setCrc(22L); + seg2.setTotalDocs(220L); + String path1 = ZKMetadataProvider.constructPropertyStorePathForSegment(OFFLINE_TABLE_NAME, segName1); + String path2 = ZKMetadataProvider.constructPropertyStorePathForSegment(OFFLINE_TABLE_NAME, segName2); + _helixResourceManager.multiWriteZK() + .set(path1, seg1.toZNRecord()) + .set(path2, seg2.toZNRecord()) + .execute(); + + SegmentZKMetadata read1 = ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, segName1); + SegmentZKMetadata read2 = ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, segName2); + assertNotNull(read1); + assertNotNull(read2); + assertEquals(read1.getCrc(), 11L); + assertEquals(read1.getTotalDocs(), 110L); + assertEquals(read2.getCrc(), 22L); + assertEquals(read2.getTotalDocs(), 220L); + + // Cleanup + assertTrue(ZKMetadataProvider.removeSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, segName1)); + assertTrue(ZKMetadataProvider.removeSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, segName2)); + } + @AfterClass public void tearDown() { stopFakeInstances();