From 83d928e2d17ccca2ee50a65035b9df1420c56256 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Mon, 18 May 2026 18:59:21 +0000 Subject: [PATCH 1/4] Use per-table merge lock in Manager instead of single lock This change introduces a per-table merge lock instead of a single lock to protect reading/writing to ZooKeeper. Closes #6374 --- .../org/apache/accumulo/manager/Manager.java | 51 ++++++++++++++++--- 1 file changed, 43 insertions(+), 8 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 0b3ea6ab404..c83ba594081 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; @@ -172,6 +173,30 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener, TableObserver, CurrentState, HighlyAvailableService, ServerProcessService.Iface { + private final class MergeLocks { + + private final Object lock = new Object(); + private Map lockStorage = new HashMap(); + + private ReentrantLock getLock(TableId tid) { + synchronized (lock) { + return lockStorage.computeIfAbsent(tid, k -> new ReentrantLock(true)); + } + } + + private void cleanup() { + synchronized (lock) { + Set removals = new HashSet<>(); + for (Entry e : lockStorage.entrySet()) { + if (!getContext().tableNodeExists(e.getKey()) && !e.getValue().isLocked()) { + removals.add(e.getKey()); + } + } + removals.forEach(lockStorage::remove); + } + } + } + static final Logger log = LoggerFactory.getLogger(Manager.class); // When in safe mode totalAssignedOrHosted() is called every 10s @@ -201,8 +226,8 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener, Collections.synchronizedMap(new HashMap<>()); final Set serversToShutdown = Collections.synchronizedSet(new HashSet<>()); final Migrations migrations = new Migrations(); + private final MergeLocks mergeLocks = new MergeLocks(); final EventCoordinator nextEvent = new EventCoordinator(); - private final Object mergeLock = new Object(); private Thread replicationWorkThread; private Thread replicationAssignerThread; RecoveryManager recoveryManager = null; @@ -477,7 +502,8 @@ public TServerConnection getConnection(TServerInstance server) { public MergeInfo getMergeInfo(TableId tableId) { ServerContext context = getContext(); - synchronized (mergeLock) { + mergeLocks.getLock(tableId).lock(); + try { try { String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + "/merge"; if (!context.getZooReaderWriter().exists(path)) { @@ -496,15 +522,18 @@ public MergeInfo getMergeInfo(TableId tableId) { log.warn("Unexpected error reading merge state", ex); return new MergeInfo(); } + } finally { + mergeLocks.getLock(tableId).unlock(); } } public void setMergeState(MergeInfo info, MergeState state) throws KeeperException, InterruptedException { ServerContext context = getContext(); - synchronized (mergeLock) { - String path = - getZooKeeperRoot() + Constants.ZTABLES + "/" + info.getExtent().tableId() + "/merge"; + final TableId tid = info.getExtent().tableId(); + mergeLocks.getLock(tid).lock(); + try { + String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tid + "/merge"; info.setState(state); if (state.equals(MergeState.NONE)) { context.getZooReaderWriter().recursiveDelete(path, NodeMissingPolicy.SKIP); @@ -519,16 +548,19 @@ public void setMergeState(MergeInfo info, MergeState state) state.equals(MergeState.STARTED) ? ZooUtil.NodeExistsPolicy.FAIL : ZooUtil.NodeExistsPolicy.OVERWRITE); } - mergeLock.notifyAll(); + } finally { + mergeLocks.getLock(tid).unlock(); } nextEvent.event("Merge state of %s set to %s", info.getExtent(), state); } public void clearMergeState(TableId tableId) throws KeeperException, InterruptedException { - synchronized (mergeLock) { + mergeLocks.getLock(tableId).lock(); + try { String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + "/merge"; getContext().getZooReaderWriter().recursiveDelete(path, NodeMissingPolicy.SKIP); - mergeLock.notifyAll(); + } finally { + mergeLocks.getLock(tableId).unlock(); } nextEvent.event("Merge state of %s cleared", tableId); } @@ -1479,6 +1511,9 @@ boolean canSuspendTablets() { throw new IllegalStateException("Exception updating manager lock", e); } + ThreadPools.watchCriticalScheduledTask( + context.getScheduledExecutor().scheduleWithFixedDelay(mergeLocks::cleanup, 3, 3, HOURS)); + while (!clientService.isServing()) { sleepUninterruptibly(100, MILLISECONDS); } From a58c1bcd803329f28a2d7e26e37b79bca5c27d29 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Mon, 18 May 2026 20:00:58 +0000 Subject: [PATCH 2/4] Resolve spotbugs failure --- .../java/org/apache/accumulo/manager/Manager.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index c83ba594081..b8063fedec7 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -502,7 +502,8 @@ public TServerConnection getConnection(TServerInstance server) { public MergeInfo getMergeInfo(TableId tableId) { ServerContext context = getContext(); - mergeLocks.getLock(tableId).lock(); + final ReentrantLock l = mergeLocks.getLock(tableId); + l.lock(); try { try { String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + "/merge"; @@ -523,7 +524,7 @@ public MergeInfo getMergeInfo(TableId tableId) { return new MergeInfo(); } } finally { - mergeLocks.getLock(tableId).unlock(); + l.unlock(); } } @@ -531,7 +532,8 @@ public void setMergeState(MergeInfo info, MergeState state) throws KeeperException, InterruptedException { ServerContext context = getContext(); final TableId tid = info.getExtent().tableId(); - mergeLocks.getLock(tid).lock(); + final ReentrantLock l = mergeLocks.getLock(tid); + l.lock(); try { String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tid + "/merge"; info.setState(state); @@ -549,18 +551,19 @@ public void setMergeState(MergeInfo info, MergeState state) : ZooUtil.NodeExistsPolicy.OVERWRITE); } } finally { - mergeLocks.getLock(tid).unlock(); + l.unlock(); } nextEvent.event("Merge state of %s set to %s", info.getExtent(), state); } public void clearMergeState(TableId tableId) throws KeeperException, InterruptedException { - mergeLocks.getLock(tableId).lock(); + final ReentrantLock l = mergeLocks.getLock(tableId); + l.lock(); try { String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + "/merge"; getContext().getZooReaderWriter().recursiveDelete(path, NodeMissingPolicy.SKIP); } finally { - mergeLocks.getLock(tableId).unlock(); + l.unlock(); } nextEvent.event("Merge state of %s cleared", tableId); } From 484f5273a0c2ab1230138db140a4e53ccd4f6273 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Tue, 19 May 2026 11:31:34 +0000 Subject: [PATCH 3/4] Implemented PR suggestion --- .../org/apache/accumulo/manager/Manager.java | 41 +++++-------------- 1 file changed, 10 insertions(+), 31 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index b8063fedec7..01399b8d1f5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -158,6 +158,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Scheduler; import com.google.common.collect.ImmutableSortedMap; import com.google.common.util.concurrent.RateLimiter; @@ -173,30 +176,6 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener, TableObserver, CurrentState, HighlyAvailableService, ServerProcessService.Iface { - private final class MergeLocks { - - private final Object lock = new Object(); - private Map lockStorage = new HashMap(); - - private ReentrantLock getLock(TableId tid) { - synchronized (lock) { - return lockStorage.computeIfAbsent(tid, k -> new ReentrantLock(true)); - } - } - - private void cleanup() { - synchronized (lock) { - Set removals = new HashSet<>(); - for (Entry e : lockStorage.entrySet()) { - if (!getContext().tableNodeExists(e.getKey()) && !e.getValue().isLocked()) { - removals.add(e.getKey()); - } - } - removals.forEach(lockStorage::remove); - } - } - } - static final Logger log = LoggerFactory.getLogger(Manager.class); // When in safe mode totalAssignedOrHosted() is called every 10s @@ -226,7 +205,10 @@ private void cleanup() { Collections.synchronizedMap(new HashMap<>()); final Set serversToShutdown = Collections.synchronizedSet(new HashSet<>()); final Migrations migrations = new Migrations(); - private final MergeLocks mergeLocks = new MergeLocks(); + + private final LoadingCache mergeLocks = Caffeine.newBuilder().weakValues() + .scheduler(Scheduler.systemScheduler()).build(k -> new ReentrantLock()); + final EventCoordinator nextEvent = new EventCoordinator(); private Thread replicationWorkThread; private Thread replicationAssignerThread; @@ -502,7 +484,7 @@ public TServerConnection getConnection(TServerInstance server) { public MergeInfo getMergeInfo(TableId tableId) { ServerContext context = getContext(); - final ReentrantLock l = mergeLocks.getLock(tableId); + final ReentrantLock l = mergeLocks.get(tableId.canonical()); l.lock(); try { try { @@ -532,7 +514,7 @@ public void setMergeState(MergeInfo info, MergeState state) throws KeeperException, InterruptedException { ServerContext context = getContext(); final TableId tid = info.getExtent().tableId(); - final ReentrantLock l = mergeLocks.getLock(tid); + final ReentrantLock l = mergeLocks.get(tid.canonical()); l.lock(); try { String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tid + "/merge"; @@ -557,7 +539,7 @@ public void setMergeState(MergeInfo info, MergeState state) } public void clearMergeState(TableId tableId) throws KeeperException, InterruptedException { - final ReentrantLock l = mergeLocks.getLock(tableId); + final ReentrantLock l = mergeLocks.get(tableId.canonical()); l.lock(); try { String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + "/merge"; @@ -1514,9 +1496,6 @@ boolean canSuspendTablets() { throw new IllegalStateException("Exception updating manager lock", e); } - ThreadPools.watchCriticalScheduledTask( - context.getScheduledExecutor().scheduleWithFixedDelay(mergeLocks::cleanup, 3, 3, HOURS)); - while (!clientService.isServing()) { sleepUninterruptibly(100, MILLISECONDS); } From c3d7836be5b9cfb78d09a5de410067f2eeb8c21f Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Tue, 19 May 2026 14:48:09 +0000 Subject: [PATCH 4/4] Add missing dependency --- server/manager/pom.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/manager/pom.xml b/server/manager/pom.xml index b2280f88f46..0da212e0240 100644 --- a/server/manager/pom.xml +++ b/server/manager/pom.xml @@ -31,6 +31,10 @@ Apache Accumulo Manager Server The manager server for Apache Accumulo for load balancing and other system-wide operations. + + com.github.ben-manes.caffeine + caffeine + com.google.code.gson gson