Skip to content

Use per-table merge lock in Manager instead of single lock#6378

Open
dlmarion wants to merge 4 commits into
apache:2.1from
dlmarion:per-table-merge-lock
Open

Use per-table merge lock in Manager instead of single lock#6378
dlmarion wants to merge 4 commits into
apache:2.1from
dlmarion:per-table-merge-lock

Conversation

@dlmarion
Copy link
Copy Markdown
Contributor

This change introduces a per-table merge lock instead of a single lock to protect reading/writing to ZooKeeper.

Closes #6374

This change introduces a per-table merge lock instead of
a single lock to protect reading/writing to ZooKeeper.

Closes apache#6374
@dlmarion dlmarion added this to the 2.1.5 milestone May 18, 2026
@dlmarion dlmarion self-assigned this May 18, 2026
Copy link
Copy Markdown
Member

@ctubbsii ctubbsii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use a Caffeine cache to handle this. The key could also be a hash of the TableId, if you wanted to stripe this, but less aggressively than one per table.

Collections.synchronizedMap(new HashMap<>());
final Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet<>());
final Migrations migrations = new Migrations();
private final MergeLocks mergeLocks = new MergeLocks();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final MergeLocks mergeLocks = new MergeLocks();
private final LoadingCache<TableId,ReentrantLock> mergeLocks = Caffeine.newBuilder().weakValues()
.scheduler(Scheduler.systemScheduler()).build(k -> new ReentrantLock());

My main concern with this is that the TableId in the keys will prevent cleanup of the TableId.cache weak values. The scheduler is supposed to help with cleanup of the keys whose values have been garbage collected, but it may also be a good idea to use String (tableId.canonical()) for the key.

Comment on lines +176 to +199
private final class MergeLocks {

private final Object lock = new Object();
private Map<TableId,ReentrantLock> lockStorage = new HashMap<TableId,ReentrantLock>();

private ReentrantLock getLock(TableId tid) {
synchronized (lock) {
return lockStorage.computeIfAbsent(tid, k -> new ReentrantLock(true));
}
}

private void cleanup() {
synchronized (lock) {
Set<TableId> removals = new HashSet<>();
for (Entry<TableId,ReentrantLock> e : lockStorage.entrySet()) {
if (!getContext().tableNodeExists(e.getKey()) && !e.getValue().isLocked()) {
removals.add(e.getKey());
}
}
removals.forEach(lockStorage::remove);
}
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final class MergeLocks {
private final Object lock = new Object();
private Map<TableId,ReentrantLock> lockStorage = new HashMap<TableId,ReentrantLock>();
private ReentrantLock getLock(TableId tid) {
synchronized (lock) {
return lockStorage.computeIfAbsent(tid, k -> new ReentrantLock(true));
}
}
private void cleanup() {
synchronized (lock) {
Set<TableId> removals = new HashSet<>();
for (Entry<TableId,ReentrantLock> e : lockStorage.entrySet()) {
if (!getContext().tableNodeExists(e.getKey()) && !e.getValue().isLocked()) {
removals.add(e.getKey());
}
}
removals.forEach(lockStorage::remove);
}
}
}

Comment thread server/manager/src/main/java/org/apache/accumulo/manager/Manager.java Outdated
Comment thread server/manager/src/main/java/org/apache/accumulo/manager/Manager.java Outdated

public void clearMergeState(TableId tableId) throws KeeperException, InterruptedException {
synchronized (mergeLock) {
final ReentrantLock l = mergeLocks.getLock(tableId);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final ReentrantLock l = mergeLocks.getLock(tableId);
final ReentrantLock l = mergeLocks.get(tableId);


ThreadPools.watchCriticalScheduledTask(
context.getScheduledExecutor().scheduleWithFixedDelay(mergeLocks::cleanup, 3, 3, HOURS));

Copy link
Copy Markdown
Member

@ctubbsii ctubbsii May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this cleanup code with the weakValues cache's scheduler.

Copy link
Copy Markdown
Contributor

@ddanielr ddanielr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

I added a comment about some code that looks odd to me, but on a closer look it seems fine.

synchronized (mergeLock) {
String path =
getZooKeeperRoot() + Constants.ZTABLES + "/" + info.getExtent().tableId() + "/merge";
final TableId tid = info.getExtent().tableId();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code looks like we might hit a NPE on the .tableId() call if the extent doesn't exist.

MergeInfo has a no-args constructor that does not set the extent.
Instead the extent is then set via a MergeInfo.readFields() call.

However for all calls of setMergeState the extent or state of MergeInfo is checked so this doesn't seem to be an issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants