-
Notifications
You must be signed in to change notification settings - Fork 975
Fix core dumps triggered by rocksdb compacting when shutdown bk #4706
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
bc91be9
3e60a8d
67a6ac2
7018ea1
abee9aa
076d7d7
9aca965
fb5df96
16061f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,12 +20,14 @@ | |
| */ | ||
| package org.apache.bookkeeper.bookie.storage.ldb; | ||
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.common.collect.Iterables; | ||
| import java.io.Closeable; | ||
| import java.io.IOException; | ||
| import java.util.Map.Entry; | ||
| import java.util.Set; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import org.apache.bookkeeper.bookie.Bookie; | ||
| import org.apache.bookkeeper.bookie.EntryLocation; | ||
| import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch; | ||
|
|
@@ -48,7 +50,8 @@ public class EntryLocationIndex implements Closeable { | |
| private final KeyValueStorage locationsDb; | ||
| private final ConcurrentLongHashSet deletedLedgers = ConcurrentLongHashSet.newBuilder().build(); | ||
| private final EntryLocationIndexStats stats; | ||
| private boolean isCompacting; | ||
| @VisibleForTesting | ||
| final AtomicBoolean compacting = new AtomicBoolean(false); | ||
|
|
||
| public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath, | ||
| StatsLogger stats) throws IOException { | ||
|
|
@@ -67,7 +70,19 @@ public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory stora | |
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
| log.info("Closing EntryLocationIndex"); | ||
| while (!compacting.compareAndSet(false, true)) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should not wait without any timeout; the Bookie pod shutdown will be blocked forever. |
||
| // Wait till the locationsDb stops compacting | ||
| log.info("Waiting the locationsDb stops compacting"); | ||
| try { | ||
| Thread.sleep(1000); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw new IOException(e); | ||
| } | ||
| } | ||
| locationsDb.close(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to test this case:
|
||
| log.info("Closed EntryLocationIndex"); | ||
| } | ||
|
|
||
| public long getLocation(long ledgerId, long entryId) throws IOException { | ||
|
|
@@ -203,15 +218,17 @@ public String getEntryLocationDBPath() { | |
|
|
||
| public void compact() throws IOException { | ||
| try { | ||
| isCompacting = true; | ||
| if (!compacting.compareAndSet(false, true)) { | ||
|
lhotari marked this conversation as resolved.
|
||
| return; | ||
| } | ||
| locationsDb.compact(); | ||
| } finally { | ||
| isCompacting = false; | ||
| compacting.set(false); | ||
| } | ||
| } | ||
|
|
||
| public boolean isCompacting() { | ||
| return isCompacting; | ||
| return compacting.get(); | ||
| } | ||
|
|
||
| public void removeOffsetFromDeletedLedgers() throws IOException { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -81,6 +81,7 @@ public class KeyValueStorageRocksDB implements KeyValueStorage { | |
| private final RocksDB db; | ||
| private RocksObject options; | ||
| private List<ColumnFamilyDescriptor> columnFamilyDescriptors; | ||
| private List<ColumnFamilyHandle> columnFamilyHandles; | ||
|
|
||
| private final WriteOptions optionSync; | ||
| private final WriteOptions optionDontSync; | ||
|
|
@@ -175,6 +176,7 @@ private RocksDB initializeRocksDBWithConfFile(String basePath, String subPath, D | |
| this.dbPath = FileSystems.getDefault().getPath(basePath, subPath).toFile().toString(); | ||
| this.options = dbOptions; | ||
| this.columnFamilyDescriptors = cfDescs; | ||
| this.columnFamilyHandles = cfHandles; | ||
| if (readOnly) { | ||
| return RocksDB.openReadOnly(dbOptions, dbPath, cfDescs, cfHandles); | ||
| } else { | ||
|
|
@@ -297,6 +299,12 @@ public void close() throws IOException { | |
| try { | ||
| closedLock.writeLock().lock(); | ||
| closed = true; | ||
| db.cancelAllBackgroundWork(true); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wrapping this call in a separate try-catch would be recommended
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. locationsDb.compact(); is not a background action, and cancel here doesn't work. |
||
| if (columnFamilyHandles != null) { | ||
| for (ColumnFamilyHandle cfh : columnFamilyHandles) { | ||
| cfh.close(); | ||
| } | ||
| } | ||
| db.close(); | ||
| } finally { | ||
| closedLock.writeLock().unlock(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,14 +21,19 @@ | |
| package org.apache.bookkeeper.bookie.storage.ldb; | ||
|
|
||
| import static org.junit.Assert.assertEquals; | ||
| import static org.junit.Assert.assertFalse; | ||
| import static org.junit.Assert.assertTrue; | ||
|
|
||
| import java.io.File; | ||
| import java.io.IOException; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import org.apache.bookkeeper.conf.ServerConfiguration; | ||
| import org.apache.bookkeeper.stats.NullStatsLogger; | ||
| import org.apache.bookkeeper.test.TestStatsProvider; | ||
| import org.awaitility.Awaitility; | ||
| import org.junit.Test; | ||
| import org.junit.jupiter.api.Timeout; | ||
|
|
||
| /** | ||
| * Unit test for {@link EntryLocationIndex}. | ||
|
|
@@ -231,4 +236,39 @@ public void testEntryIndexLookupLatencyStats() throws IOException { | |
| assertEquals(1, lookupEntryLocationOpStats.getFailureCount()); | ||
| assertEquals(1, lookupEntryLocationOpStats.getSuccessCount()); | ||
| } | ||
|
|
||
| @Test | ||
| @Timeout(60) | ||
| public void testClose() throws Exception { | ||
| File tmpDir = File.createTempFile("bkTest", ".dir"); | ||
| tmpDir.delete(); | ||
| tmpDir.mkdir(); | ||
| tmpDir.deleteOnExit(); | ||
|
|
||
| EntryLocationIndex idx = new EntryLocationIndex(serverConfiguration, KeyValueStorageRocksDB.factory, | ||
| tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE); | ||
|
|
||
| // mock EntryLocationIndex is compacting | ||
| idx.compacting.set(true); | ||
| AtomicBoolean closeFlag = new AtomicBoolean(false); | ||
| AtomicLong closeEscapedMills = new AtomicLong(0); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: escaped -> elapsed |
||
| new Thread(() -> { | ||
| try { | ||
| long start = System.currentTimeMillis(); | ||
| idx.close(); | ||
| closeEscapedMills.set(System.currentTimeMillis() - start); | ||
| closeFlag.set(true); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| }).start(); | ||
| long sleepMills = 10_000; | ||
| Thread.sleep(sleepMills); | ||
| assertFalse(closeFlag.get()); | ||
|
|
||
| // mock EntryLocationIndex finish compacting | ||
| idx.compacting.set(false); | ||
| Awaitility.await().untilAsserted(() -> assertTrue(closeFlag.get())); | ||
| assertTrue(closeEscapedMills.get() >= sleepMills); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.