Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import org.apache.accumulo.core.Constants;
Expand Down Expand Up @@ -172,6 +173,30 @@
public class Manager extends AbstractServer implements LiveTServerSet.Listener, TableObserver,
CurrentState, HighlyAvailableService, ServerProcessService.Iface {

private final class MergeLocks {

private final Object lock = new Object();
private Map<TableId,ReentrantLock> lockStorage = new HashMap<TableId,ReentrantLock>();

private ReentrantLock getLock(TableId tid) {
synchronized (lock) {
return lockStorage.computeIfAbsent(tid, k -> new ReentrantLock(true));
}
}

private void cleanup() {
synchronized (lock) {
Set<TableId> removals = new HashSet<>();
for (Entry<TableId,ReentrantLock> e : lockStorage.entrySet()) {
if (!getContext().tableNodeExists(e.getKey()) && !e.getValue().isLocked()) {
removals.add(e.getKey());
}
}
removals.forEach(lockStorage::remove);
}
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final class MergeLocks {
private final Object lock = new Object();
private Map<TableId,ReentrantLock> lockStorage = new HashMap<TableId,ReentrantLock>();
private ReentrantLock getLock(TableId tid) {
synchronized (lock) {
return lockStorage.computeIfAbsent(tid, k -> new ReentrantLock(true));
}
}
private void cleanup() {
synchronized (lock) {
Set<TableId> removals = new HashSet<>();
for (Entry<TableId,ReentrantLock> e : lockStorage.entrySet()) {
if (!getContext().tableNodeExists(e.getKey()) && !e.getValue().isLocked()) {
removals.add(e.getKey());
}
}
removals.forEach(lockStorage::remove);
}
}
}

static final Logger log = LoggerFactory.getLogger(Manager.class);

// When in safe mode totalAssignedOrHosted() is called every 10s
Expand Down Expand Up @@ -201,8 +226,8 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener,
Collections.synchronizedMap(new HashMap<>());
final Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet<>());
final Migrations migrations = new Migrations();
private final MergeLocks mergeLocks = new MergeLocks();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final MergeLocks mergeLocks = new MergeLocks();
private final LoadingCache<TableId,ReentrantLock> mergeLocks = Caffeine.newBuilder().weakValues()
.scheduler(Scheduler.systemScheduler()).build(k -> new ReentrantLock());

My main concern with this is that the TableId in the keys will prevent cleanup of the TableId.cache weak values. The scheduler is supposed to help with cleanup of the keys whose values have been garbage collected, but it may also be a good idea to use String (tableId.canonical()) for the key.

final EventCoordinator nextEvent = new EventCoordinator();
private final Object mergeLock = new Object();
private Thread replicationWorkThread;
private Thread replicationAssignerThread;
RecoveryManager recoveryManager = null;
Expand Down Expand Up @@ -477,7 +502,9 @@ public TServerConnection getConnection(TServerInstance server) {

public MergeInfo getMergeInfo(TableId tableId) {
ServerContext context = getContext();
synchronized (mergeLock) {
final ReentrantLock l = mergeLocks.getLock(tableId);
Comment thread
dlmarion marked this conversation as resolved.
Outdated
l.lock();
try {
try {
String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + "/merge";
if (!context.getZooReaderWriter().exists(path)) {
Expand All @@ -496,15 +523,19 @@ public MergeInfo getMergeInfo(TableId tableId) {
log.warn("Unexpected error reading merge state", ex);
return new MergeInfo();
}
} finally {
l.unlock();
}
}

public void setMergeState(MergeInfo info, MergeState state)
throws KeeperException, InterruptedException {
ServerContext context = getContext();
synchronized (mergeLock) {
String path =
getZooKeeperRoot() + Constants.ZTABLES + "/" + info.getExtent().tableId() + "/merge";
final TableId tid = info.getExtent().tableId();
final ReentrantLock l = mergeLocks.getLock(tid);
Comment thread
dlmarion marked this conversation as resolved.
Outdated
l.lock();
try {
String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tid + "/merge";
info.setState(state);
if (state.equals(MergeState.NONE)) {
context.getZooReaderWriter().recursiveDelete(path, NodeMissingPolicy.SKIP);
Expand All @@ -519,16 +550,20 @@ public void setMergeState(MergeInfo info, MergeState state)
state.equals(MergeState.STARTED) ? ZooUtil.NodeExistsPolicy.FAIL
: ZooUtil.NodeExistsPolicy.OVERWRITE);
}
mergeLock.notifyAll();
} finally {
l.unlock();
}
nextEvent.event("Merge state of %s set to %s", info.getExtent(), state);
}

public void clearMergeState(TableId tableId) throws KeeperException, InterruptedException {
synchronized (mergeLock) {
final ReentrantLock l = mergeLocks.getLock(tableId);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final ReentrantLock l = mergeLocks.getLock(tableId);
final ReentrantLock l = mergeLocks.get(tableId);

l.lock();
try {
String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + "/merge";
getContext().getZooReaderWriter().recursiveDelete(path, NodeMissingPolicy.SKIP);
mergeLock.notifyAll();
} finally {
l.unlock();
}
nextEvent.event("Merge state of %s cleared", tableId);
}
Expand Down Expand Up @@ -1479,6 +1514,9 @@ boolean canSuspendTablets() {
throw new IllegalStateException("Exception updating manager lock", e);
}

ThreadPools.watchCriticalScheduledTask(
context.getScheduledExecutor().scheduleWithFixedDelay(mergeLocks::cleanup, 3, 3, HOURS));

Copy link
Copy Markdown
Member

@ctubbsii ctubbsii May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this cleanup code with the weakValues cache's scheduler.

while (!clientService.isServing()) {
sleepUninterruptibly(100, MILLISECONDS);
}
Expand Down