diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 2bd134a9e520..a33d9865aece 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -606,6 +606,70 @@ protected void doOffloadSegment(String segmentName) {
}
}
+ @Override
+ public void deleteSegment(String segmentName)
+ throws Exception {
+ _logger.info("Deleting segment: {}", segmentName);
+ Lock segmentLock = getSegmentLock(segmentName);
+ segmentLock.lock();
+ try {
+ if (hasSegment(segmentName)) {
+ _logger.warn("Segment: {} is still loaded, offloading it before delete", segmentName);
+ offloadSegment(segmentName);
+ }
+ doDeleteSegment(segmentName);
+ } catch (Exception e) {
+ addSegmentError(segmentName,
+ new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception while deleting segment", e));
+ throw e;
+ } finally {
+ segmentLock.unlock();
+ }
+ }
+
+ protected void doDeleteSegment(String segmentName)
+ throws Exception {
+ deleteSegmentFilesFromDisk(_tableDataDir, segmentName, _instanceDataManagerConfig);
+ _logger.info("Deleted segment: {}", segmentName);
+ }
+
+ /**
+ * Single source of truth for on-disk cleanup of a physical segment. Removes the segment directory under
+ * {@code tableDataDir} and invokes the configured {@link SegmentDirectoryLoader#delete} for tier-aware cleanup.
+ *
+ *
Used by:
+ *
+ * - {@link #deleteSegment} — the TDM-driven path triggered by a *_TO_DROPPED state transition.
+ * - {@code HelixInstanceDataManager.deleteSegment} fallback — when the per-table TDM is null because the
+ * table was never instantiated, or has already been removed via {@code deleteTable()}.
+ * - TDM extensions that fan a single logical name out to multiple physical segments (e.g. segment-group
+ * member fan-out) — these reuse this helper directly while holding all required per-member locks
+ * acquired in lex order.
+ *
+ *
+ * Caller is responsible for holding the per-segment lock and for offloading the segment from the TDM if it
+ * is still loaded.
+ */
+ public static void deleteSegmentFilesFromDisk(String tableDataDir, String segmentName,
+ InstanceDataManagerConfig instanceConfig)
+ throws Exception {
+ File segmentDir = new File(tableDataDir, segmentName);
+ if (segmentDir.exists()) {
+ FileUtils.deleteQuietly(segmentDir);
+ LOGGER.info("Deleted segment directory {} on default tier", segmentDir);
+ }
+ SegmentDirectoryLoader segmentLoader =
+ SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(instanceConfig.getSegmentDirectoryLoader());
+ if (segmentLoader != null) {
+ LOGGER.info("Deleting segment: {} further with segment loader: {}", segmentName,
+ instanceConfig.getSegmentDirectoryLoader());
+ SegmentDirectoryLoaderContext ctx = new SegmentDirectoryLoaderContext.Builder().setSegmentName(segmentName)
+ .setTableDataDir(tableDataDir)
+ .build();
+ segmentLoader.delete(ctx);
+ }
+ }
+
@Override
public void reloadSegment(String segmentName, boolean forceDownload, String reloadJobId)
throws Exception {
@@ -616,12 +680,7 @@ public void reloadSegment(String segmentName, boolean forceDownload, String relo
SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName);
if (segmentDataManager != null) {
IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
- _segmentReloadSemaphore.acquire(segmentName, _logger);
- try {
- reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
- } finally {
- _segmentReloadSemaphore.release();
- }
+ reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
} else {
_logger.warn("Failed to find segment: {}, skipping reloading it", segmentName);
}
@@ -947,12 +1006,7 @@ protected void reloadSegments(List segmentDataManagers, Inde
CompletableFuture.allOf(segmentDataManagers.stream().map(segmentDataManager -> CompletableFuture.runAsync(() -> {
String segmentName = segmentDataManager.getSegmentName();
try {
- _segmentReloadSemaphore.acquire(segmentName, _logger);
- try {
- reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
- } finally {
- _segmentReloadSemaphore.release();
- }
+ reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
} catch (Throwable t) {
_logger.error("Caught exception while reloading segment: {}", segmentName, t);
failedSegments.add(segmentName);
@@ -973,36 +1027,41 @@ protected void reloadSegment(SegmentDataManager segmentDataManager, IndexLoading
boolean forceDownload)
throws Exception {
String segmentName = segmentDataManager.getSegmentName();
- if (segmentDataManager instanceof RealtimeSegmentDataManager) {
- // Use force commit to reload consuming segment
- if (_instanceDataManagerConfig.shouldReloadConsumingSegment()) {
- // Force-committing consuming segments is restricted only for tables with inconsistent state configs
- // (partial-upsert or dropOutOfOrderRecord=true with replication > 1).
- // For these tables, winner selection could incorrectly favor replicas with fewer consumed rows,
- // triggering unnecessary reconsumption and resulting in inconsistent upsert state.
- // To enable force commit for such tables, change the cluster config
- // `pinot.server.consuming.segment.consistency.mode` to PROTECTED for safer reload.
- TableConfig tableConfig = indexLoadingConfig.getTableConfig();
- ConsumingSegmentConsistencyModeListener config = ConsumingSegmentConsistencyModeListener.getInstance();
- boolean isTableTypeInconsistentDuringConsumption =
- TableConfigUtils.isTableTypeInconsistentDuringConsumption(tableConfig);
- // Allow force commit if:
- // 1. Table doesn't have inconsistent configs (non-upsert or standard upsert tables), OR
- // 2. Consistency mode is PROTECTED or UNSAFE (isForceCommitAllowed = true)
- if (tableConfig == null || (isTableTypeInconsistentDuringConsumption && !config.isForceCommitAllowed())) {
- _logger.warn("Skipping reload (force commit) on consuming segment: {} due to inconsistent state config. "
- + "Change the cluster config: {} to `PROTECTED` for safer commit", segmentName, config.getConfigKey());
+ _segmentReloadSemaphore.acquire(segmentName, _logger);
+ try {
+ if (segmentDataManager instanceof RealtimeSegmentDataManager) {
+ // Use force commit to reload consuming segment
+ if (_instanceDataManagerConfig.shouldReloadConsumingSegment()) {
+ // Force-committing consuming segments is restricted only for tables with inconsistent state configs
+ // (partial-upsert or dropOutOfOrderRecord=true with replication > 1).
+ // For these tables, winner selection could incorrectly favor replicas with fewer consumed rows,
+ // triggering unnecessary reconsumption and resulting in inconsistent upsert state.
+ // To enable force commit for such tables, change the cluster config
+ // `pinot.server.consuming.segment.consistency.mode` to PROTECTED for safer reload.
+ TableConfig tableConfig = indexLoadingConfig.getTableConfig();
+ ConsumingSegmentConsistencyModeListener config = ConsumingSegmentConsistencyModeListener.getInstance();
+ boolean isTableTypeInconsistentDuringConsumption =
+ TableConfigUtils.isTableTypeInconsistentDuringConsumption(tableConfig);
+ // Allow force commit if:
+ // 1. Table doesn't have inconsistent configs (non-upsert or standard upsert tables), OR
+ // 2. Consistency mode is PROTECTED or UNSAFE (isForceCommitAllowed = true)
+ if (tableConfig == null || (isTableTypeInconsistentDuringConsumption && !config.isForceCommitAllowed())) {
+ _logger.warn("Skipping reload (force commit) on consuming segment: {} due to inconsistent state config. "
+ + "Change the cluster config: {} to `PROTECTED` for safer commit", segmentName, config.getConfigKey());
+ } else {
+ _logger.info("Reloading (force committing) consuming segment: {}", segmentName);
+ ((RealtimeSegmentDataManager) segmentDataManager).forceCommit();
+ }
} else {
- _logger.info("Reloading (force committing) consuming segment: {}", segmentName);
- ((RealtimeSegmentDataManager) segmentDataManager).forceCommit();
+ _logger.warn("Skip reloading consuming segment: {} as configured", segmentName);
}
} else {
- _logger.warn("Skip reloading consuming segment: {} as configured", segmentName);
+ SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
+ SegmentMetadata localMetadata = segmentDataManager.getSegment().getSegmentMetadata();
+ reloadSegment(segmentName, indexLoadingConfig, zkMetadata, localMetadata, forceDownload);
}
- } else {
- SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
- SegmentMetadata localMetadata = segmentDataManager.getSegment().getSegmentMetadata();
- reloadSegment(segmentName, indexLoadingConfig, zkMetadata, localMetadata, forceDownload);
+ } finally {
+ _segmentReloadSemaphore.release();
}
}
@@ -1410,6 +1469,25 @@ protected void removeBackup(File indexDir)
@Override
public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) {
+ ImmutableSegment segment = tryLoadExistingSegmentInternal(zkMetadata, indexLoadingConfig);
+ if (segment == null) {
+ return false;
+ }
+ addSegment(segment, zkMetadata);
+ return true;
+ }
+
+ /**
+ * Just Loads a segment from the existing on-disk copy without registering it in {@code _segmentDataManagerMap} or
+ * invoking other hooks. Returns {@code null} when the on-disk copy is absent, has a stale CRC under
+ * {@code shouldCheckCRCOnSegmentLoad}, or fails to load — callers fall back to a fresh download in that case.
+ * Single-segment callers should use {@link #tryLoadExistingSegment} which performs the registration step.
+ * Multi-segment managers can compose this with {@link #downloadSegment} to load all members before wrapping them
+ * under a single map entry.
+ */
+ @Nullable
+ protected ImmutableSegment tryLoadExistingSegmentInternal(SegmentZKMetadata zkMetadata,
+ IndexLoadingConfig indexLoadingConfig) {
String segmentName = zkMetadata.getSegmentName();
Preconditions.checkState(!_shutDown,
"Table data manager is already shut down, cannot load existing segment: %s of table: %s", segmentName,
@@ -1442,14 +1520,14 @@ public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoading
if (segmentMetadata == null) {
_logger.info("Segment: {} does not exist", segmentName);
closeSegmentDirectoryQuietly(segmentDirectory);
- return false;
+ return null;
}
if (isSegmentStatusCompleted(zkMetadata) && !hasSameCRC(zkMetadata, segmentMetadata)) {
_logger.warn("Segment: {} has CRC changed from: {} to: {}", segmentName, segmentMetadata.getCrc(),
zkMetadata.getCrc());
if (_instanceDataManagerConfig.shouldCheckCRCOnSegmentLoad()) {
closeSegmentDirectoryQuietly(segmentDirectory);
- return false;
+ return null;
}
_logger.info("Skipping CRC check for segment: {} as configured. Proceed to load segment.", segmentName);
}
@@ -1470,15 +1548,14 @@ public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoading
indexLoadingConfig, zkMetadata);
}
ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory, indexLoadingConfig);
- addSegment(segment, zkMetadata);
_logger.info("Loaded existing segment: {} with CRC: {} on tier: {}", segmentName, zkMetadata.getCrc(),
TierConfigUtils.normalizeTierName(segmentTier));
- return true;
+ return segment;
} catch (Exception e) {
_logger.error("Failed to load existing segment: {} with CRC: {} on tier: {}", segmentName, zkMetadata.getCrc(),
TierConfigUtils.normalizeTierName(segmentTier), e);
closeSegmentDirectoryQuietly(segmentDirectory);
- return false;
+ return null;
}
}
@@ -1542,8 +1619,7 @@ public List getStaleSegments() {
return staleSegments;
}
- @VisibleForTesting
- StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig, SegmentDataManager segmentDataManager) {
+ protected StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig, SegmentDataManager segmentDataManager) {
TableConfig tableConfig = indexLoadingConfig.getTableConfig();
Schema schema = indexLoadingConfig.getSchema();
assert tableConfig != null && schema != null;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
index 519a3e8ff9fc..fc1789ca5eb6 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
@@ -31,7 +31,6 @@
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -73,7 +72,7 @@ public class BaseTableDataManagerAcquireSegmentTest {
private final Set _allSegments = new HashSet<>();
private final Set _accessedSegManagers = ConcurrentHashMap.newKeySet();
private final Set _allSegManagers = ConcurrentHashMap.newKeySet();
- private Map _internalSegMap;
+ protected Map _internalSegMap;
private Throwable _exception;
private Thread _masterThread;
// Segment numbers in place.
@@ -115,7 +114,7 @@ public void beforeMethod() {
_masterThread = null;
}
- private TableDataManager makeTestableManager()
+ protected TableDataManager makeTestableManager()
throws Exception {
InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(_tmpDir.getAbsolutePath());
@@ -129,18 +128,42 @@ private TableDataManager makeTestableManager()
new SegmentOperationsThrottler(4, 8, true),
new SegmentOperationsThrottler(10, 20, true),
new SegmentOperationsThrottler(4, 8, true));
- TableDataManager tableDataManager = new OfflineTableDataManager();
+ TableDataManager tableDataManager = newTableDataManager();
tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), tableConfig, schema,
new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null, segmentOperationsThrottlerSet,
false, mock(ServerReloadJobStatusCache.class));
tableDataManager.start();
Field segsMapField = BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
segsMapField.setAccessible(true);
- _internalSegMap = (Map) segsMapField.get(tableDataManager);
+ _internalSegMap = (Map) segsMapField.get(tableDataManager);
return tableDataManager;
}
- private ImmutableSegment makeImmutableSegment(String segmentName, int totalDocs) {
+ /**
+ * Returns the concrete {@link TableDataManager} instance under test. Default returns a stock
+ * {@link OfflineTableDataManager}; subclasses override to test a different implementation while inheriting all
+ * test bodies.
+ */
+ protected TableDataManager newTableDataManager() {
+ return new OfflineTableDataManager();
+ }
+
+ /**
+ * Returns the TDM-map key under which a segment with the given name is registered.
+ */
+ protected String tdmKey(String segmentName) {
+ return segmentName;
+ }
+
+ protected void addSegment(TableDataManager tdm, ImmutableSegment seg) {
+ tdm.addSegment(seg);
+ }
+
+ protected ImmutableSegment getInnerSegment(SegmentDataManager sdm) {
+ return (ImmutableSegment) sdm.getSegment();
+ }
+
+ protected ImmutableSegment makeImmutableSegment(String segmentName, int totalDocs) {
ImmutableSegment immutableSegment = mock(ImmutableSegment.class);
SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
when(immutableSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
@@ -164,11 +187,11 @@ public void basicTest()
// Add the segment, get it for use, remove the segment, and then return it.
// Make sure that the segment is not destroyed before return.
ImmutableSegment immutableSegment = makeImmutableSegment(segmentName, totalDocs);
- tableDataManager.addSegment(immutableSegment);
+ addSegment(tableDataManager, immutableSegment);
Assert.assertEquals(tableDataManager.getNumSegments(), 1);
- SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
+ SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(tdmKey(segmentName));
Assert.assertEquals(segmentDataManager.getReferenceCount(), 2);
- tableDataManager.offloadSegment(segmentName);
+ tableDataManager.offloadSegment(tdmKey(segmentName));
Assert.assertEquals(tableDataManager.getNumSegments(), 0);
Assert.assertEquals(segmentDataManager.getReferenceCount(), 1);
Assert.assertEquals(_nDestroys, 0);
@@ -177,7 +200,7 @@ public void basicTest()
Assert.assertEquals(_nDestroys, 1);
// Now the segment should not be available for use.Also, returning a null reader is fine
- segmentDataManager = tableDataManager.acquireSegment(segmentName);
+ segmentDataManager = tableDataManager.acquireSegment(tdmKey(segmentName));
Assert.assertNull(segmentDataManager);
List segmentDataManagers = tableDataManager.acquireAllSegments();
Assert.assertEquals(segmentDataManagers.size(), 0);
@@ -185,26 +208,26 @@ public void basicTest()
// If a caller tries to acquire the deleted segment using acquireSegments, it will be returned in
// notAcquiredSegments. The isSegmentDeletedRecently method should return true.
List notAcquiredSegments = new ArrayList<>();
- tableDataManager.acquireSegments(List.of(segmentName), notAcquiredSegments);
+ tableDataManager.acquireSegments(List.of(tdmKey(segmentName)), notAcquiredSegments);
Assert.assertEquals(notAcquiredSegments.size(), 1);
- Assert.assertTrue(tableDataManager.isSegmentDeletedRecently(segmentName));
+ Assert.assertTrue(tableDataManager.isSegmentDeletedRecently(tdmKey(segmentName)));
// Adding and removing the segment again is fine. After adding the segment back, isSegmentDeletedRecently should
// return false.
- tableDataManager.addSegment(immutableSegment);
- Assert.assertFalse(tableDataManager.isSegmentDeletedRecently(segmentName));
- tableDataManager.offloadSegment(segmentName);
+ addSegment(tableDataManager, immutableSegment);
+ Assert.assertFalse(tableDataManager.isSegmentDeletedRecently(tdmKey(segmentName)));
+ tableDataManager.offloadSegment(tdmKey(segmentName));
// Removing the segment again is fine.
- tableDataManager.offloadSegment(segmentName);
+ tableDataManager.offloadSegment(tdmKey(segmentName));
Assert.assertEquals(tableDataManager.getNumSegments(), 0);
// Add a new segment and remove it in order this time.
final String anotherSeg = "AnotherSegment";
ImmutableSegment ix1 = makeImmutableSegment(anotherSeg, totalDocs);
- tableDataManager.addSegment(ix1);
+ addSegment(tableDataManager, ix1);
Assert.assertEquals(tableDataManager.getNumSegments(), 1);
- SegmentDataManager sdm1 = tableDataManager.acquireSegment(anotherSeg);
+ SegmentDataManager sdm1 = tableDataManager.acquireSegment(tdmKey(anotherSeg));
Assert.assertNotNull(sdm1);
Assert.assertEquals(sdm1.getReferenceCount(), 2);
// acquire all segments
@@ -220,15 +243,15 @@ public void basicTest()
Assert.assertEquals(sdm1.getReferenceCount(), 1);
// Now replace the segment with another one.
ImmutableSegment ix2 = makeImmutableSegment(anotherSeg, totalDocs + 1);
- tableDataManager.addSegment(ix2);
+ addSegment(tableDataManager, ix2);
Assert.assertEquals(tableDataManager.getNumSegments(), 1);
// Now the previous one should have been destroyed, and
Assert.assertEquals(sdm1.getReferenceCount(), 0);
verify(ix1, times(1)).destroy();
// Delete ix2 without accessing it.
- SegmentDataManager sdm2 = _internalSegMap.get(anotherSeg);
+ SegmentDataManager sdm2 = _internalSegMap.get(tdmKey(anotherSeg));
Assert.assertEquals(sdm2.getReferenceCount(), 1);
- tableDataManager.offloadSegment(anotherSeg);
+ tableDataManager.offloadSegment(tdmKey(anotherSeg));
Assert.assertEquals(tableDataManager.getNumSegments(), 0);
Assert.assertEquals(sdm2.getReferenceCount(), 0);
verify(ix2, times(1)).destroy();
@@ -265,8 +288,8 @@ public void testReplace()
for (int i = _lo; i <= _hi; i++) {
final String segName = SEGMENT_PREFIX + i;
- tableDataManager.addSegment(makeImmutableSegment(segName, _random.nextInt()));
- _allSegManagers.add(_internalSegMap.get(segName));
+ addSegment(tableDataManager, makeImmutableSegment(segName, _random.nextInt()));
+ _allSegManagers.add(_internalSegMap.get(tdmKey(segName)));
}
runStorageServer(numQueryThreads, runTimeSec, tableDataManager); // replaces segments while online
@@ -315,14 +338,14 @@ private void runStorageServer(int numQueryThreads, int runTimeSec, TableDataMana
for (SegmentDataManager segmentDataManager : _internalSegMap.values()) {
Assert.assertEquals(segmentDataManager.getReferenceCount(), 1);
// We should never have called destroy on these segments. Remove it from the list of accessed segments.
- verify(segmentDataManager.getSegment(), never()).destroy();
+ verify(getInnerSegment(segmentDataManager), never()).destroy();
_allSegManagers.remove(segmentDataManager);
_accessedSegManagers.remove(segmentDataManager);
}
// For the remaining segments in accessed list, destroy must have been called exactly once.
for (SegmentDataManager segmentDataManager : _allSegManagers) {
- verify(segmentDataManager.getSegment(), times(1)).destroy();
+ verify(getInnerSegment(segmentDataManager), times(1)).destroy();
// Also their count should be 0
Assert.assertEquals(segmentDataManager.getReferenceCount(), 0);
}
@@ -361,7 +384,7 @@ public void run() {
Set segmentIds = pickSegments();
List segmentList = new ArrayList<>(segmentIds.size());
for (Integer segmentId : segmentIds) {
- segmentList.add(SEGMENT_PREFIX + segmentId);
+ segmentList.add(tdmKey(SEGMENT_PREFIX + segmentId));
}
segmentDataManagers = _tableDataManager.acquireSegments(segmentList, new ArrayList<>());
}
@@ -448,8 +471,9 @@ public void run() {
private void addSegment() {
final int segmentToAdd = _hi + 1;
final String segName = SEGMENT_PREFIX + segmentToAdd;
- _tableDataManager.addSegment(makeImmutableSegment(segName, _random.nextInt()));
- _allSegManagers.add(_internalSegMap.get(segName));
+ BaseTableDataManagerAcquireSegmentTest.this.addSegment(_tableDataManager,
+ makeImmutableSegment(segName, _random.nextInt()));
+ _allSegManagers.add(_internalSegMap.get(tdmKey(segName)));
_hi = segmentToAdd;
}
@@ -457,8 +481,9 @@ private void addSegment() {
private void replaceSegment() {
int segToReplace = _random.nextInt(_hi - _lo + 1) + _lo;
final String segName = SEGMENT_PREFIX + segToReplace;
- _tableDataManager.addSegment(makeImmutableSegment(segName, _random.nextInt()));
- _allSegManagers.add(_internalSegMap.get(segName));
+ BaseTableDataManagerAcquireSegmentTest.this.addSegment(_tableDataManager,
+ makeImmutableSegment(segName, _random.nextInt()));
+ _allSegManagers.add(_internalSegMap.get(tdmKey(segName)));
}
// Remove the segment _lo and then bump _lo
@@ -466,7 +491,7 @@ private void removeSegment()
throws Exception {
// Keep at least one segment in place.
if (_hi > _lo) {
- _tableDataManager.offloadSegment(SEGMENT_PREFIX + _lo);
+ _tableDataManager.offloadSegment(tdmKey(SEGMENT_PREFIX + _lo));
_lo++;
} else {
addSegment();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
index 61657b8e5fdf..c1ef555a3afe 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
@@ -52,6 +52,7 @@
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -64,7 +65,7 @@
@Test
-public class BaseTableDataManagerNeedRefreshTest {
+public class BaseTableDataManagerNeedRefreshTest extends BaseTableDataManagerTest {
private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "BaseTableDataManagerNeedRefreshTest");
private static final String DEFAULT_TABLE_NAME = "mytable";
private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(DEFAULT_TABLE_NAME);
@@ -91,7 +92,7 @@ public class BaseTableDataManagerNeedRefreshTest {
private static final Schema SCHEMA;
private static final IndexLoadingConfig INDEX_LOADING_CONFIG;
private static final ImmutableSegmentDataManager IMMUTABLE_SEGMENT_DATA_MANAGER;
- private static final BaseTableDataManager BASE_TABLE_DATA_MANAGER;
+ private BaseTableDataManager _baseTableDataManager;
private String _testName = "defaultTestName";
@@ -102,12 +103,16 @@ public class BaseTableDataManagerNeedRefreshTest {
INDEX_LOADING_CONFIG = new IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
IMMUTABLE_SEGMENT_DATA_MANAGER =
createImmutableSegmentDataManager(INDEX_LOADING_CONFIG, "basicSegment", generateRows());
- BASE_TABLE_DATA_MANAGER = BaseTableDataManagerTest.createTableManager();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+ @BeforeClass
+ public void initBaseTableDataManager() {
+ _baseTableDataManager = createTableManager();
+ }
+
protected static TableConfigBuilder getTableConfigBuilder() {
return new TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME)
.setTimeColumnName(DEFAULT_TIME_COLUMN_NAME)
@@ -198,7 +203,7 @@ private static ImmutableSegmentDataManager createImmutableSegmentDataManager(Ind
when(segmentDataManager.getSegmentName()).thenReturn(segmentName);
File indexDir = createSegment(indexLoadingConfig, segmentName, rows);
ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
- BaseTableDataManagerTest.SEGMENT_OPERATIONS_THROTTLER);
+ SEGMENT_OPERATIONS_THROTTLER);
when(segmentDataManager.getSegment()).thenReturn(immutableSegment);
return segmentDataManager;
}
@@ -233,7 +238,7 @@ void testAddTimeColumn()
ImmutableSegmentDataManager segmentDataManager =
createImmutableSegmentDataManager(indexLoadingConfig, "noChanges", List.of(row));
- BaseTableDataManager tableDataManager = BaseTableDataManagerTest.createTableManager();
+ BaseTableDataManager tableDataManager = createTableManager();
StaleSegment response = tableDataManager.isSegmentStale(indexLoadingConfig, segmentDataManager);
assertFalse(response.isStale());
@@ -247,7 +252,7 @@ void testAddTimeColumn()
@Test
void testChangeTimeColumn() {
TableConfig tableConfig = getTableConfigBuilder().setTimeColumnName(MS_SINCE_EPOCH_COLUMN_NAME).build();
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(tableConfig, SCHEMA),
+ StaleSegment response = _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(tableConfig, SCHEMA),
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "time column");
@@ -258,7 +263,7 @@ void testRemoveColumn()
throws Exception {
Schema schema = getSchema();
schema.removeField(TEXT_INDEX_COLUMN);
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema),
+ StaleSegment response = _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema),
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "column deleted: textColumn");
@@ -270,7 +275,7 @@ void testFieldType()
Schema schema = getSchema();
schema.removeField(TEXT_INDEX_COLUMN);
schema.addField(new MetricFieldSpec(TEXT_INDEX_COLUMN, FieldSpec.DataType.STRING, true));
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema),
+ StaleSegment response = _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema),
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "field type changed: textColumn");
@@ -282,7 +287,7 @@ void testChangeDataType()
Schema schema = getSchema();
schema.removeField(TEXT_INDEX_COLUMN);
schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN, FieldSpec.DataType.INT, true));
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema),
+ StaleSegment response = _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema),
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "data type changed: textColumn");
@@ -294,7 +299,7 @@ void testChangeToMV()
Schema schema = getSchema();
schema.removeField(TEXT_INDEX_COLUMN);
schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN, FieldSpec.DataType.STRING, false));
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema),
+ StaleSegment response = _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema),
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "single / multi value changed: textColumn");
@@ -306,7 +311,7 @@ void testChangeToSV()
Schema schema = getSchema();
schema.removeField(TEXT_INDEX_COLUMN_MV);
schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN_MV, FieldSpec.DataType.STRING, true));
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema),
+ StaleSegment response = _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema),
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "single / multi value changed: textColumnMV");
@@ -317,12 +322,12 @@ void testSortColumnMismatch() {
// Check with a column that is not sorted
TableConfig tableConfig = getTableConfigBuilder().setSortedColumn(MS_SINCE_EPOCH_COLUMN_NAME).build();
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(tableConfig, SCHEMA);
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER);
+ StaleSegment response = _baseTableDataManager.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "sort column changed: MilliSecondsSinceEpoch");
// Check with a column that is sorted
tableConfig.getIndexingConfig().setSortedColumn(List.of(TEXT_INDEX_COLUMN));
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
+ assertFalse(_baseTableDataManager.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
}
@DataProvider(name = "testFilterArgs")
@@ -357,17 +362,17 @@ void testFilter(String segmentName, TableConfig tableConfigWithFilter, String ex
createImmutableSegmentDataManager(indexLoadingConfig, segmentName, generateRows());
// When TableConfig has a filter but segment does not have, needRefresh is true.
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER);
+ StaleSegment response = _baseTableDataManager.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), expectedReason);
// When TableConfig does not have a filter but segment has, needRefresh is true
- response = BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, segmentWithFilter);
+ response = _baseTableDataManager.isSegmentStale(INDEX_LOADING_CONFIG, segmentWithFilter);
assertTrue(response.isStale());
assertEquals(response.getReason(), expectedReason);
// When TableConfig has a filter AND segment also has a filter, needRefresh is false
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, segmentWithFilter).isStale());
+ assertFalse(_baseTableDataManager.isSegmentStale(indexLoadingConfig, segmentWithFilter).isStale());
}
@Test
@@ -380,17 +385,17 @@ void testPartition()
createImmutableSegmentDataManager(indexLoadingConfig, "partitionWithModulo", generateRows());
// when segment has no partition AND tableConfig has partitions then needRefresh = true
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER);
+ StaleSegment response = _baseTableDataManager.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "partition function added: partitionedColumn");
// when segment has partitions AND tableConfig has no partitions, then needRefresh = false
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, segmentWithPartition).isStale());
+ assertFalse(_baseTableDataManager.isSegmentStale(INDEX_LOADING_CONFIG, segmentWithPartition).isStale());
// when # of partitions is different, then needRefresh = true
TableConfig partitionedTableConfig40 = getTableConfigBuilder().setSegmentPartitionConfig(new SegmentPartitionConfig(
Map.of(PARTITIONED_COLUMN_NAME, new ColumnPartitionConfig(PARTITION_FUNCTION_NAME, 40)))).build();
- response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(partitionedTableConfig40, SCHEMA),
+ response = _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(partitionedTableConfig40, SCHEMA),
segmentWithPartition);
assertTrue(response.isStale());
assertEquals(response.getReason(), "num partitions changed: partitionedColumn");
@@ -399,7 +404,7 @@ void testPartition()
TableConfig partitionedTableConfigMurmur = getTableConfigBuilder().setSegmentPartitionConfig(
new SegmentPartitionConfig(
Map.of(PARTITIONED_COLUMN_NAME, new ColumnPartitionConfig("murmur", NUM_PARTITIONS)))).build();
- response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(partitionedTableConfigMurmur, SCHEMA),
+ response = _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(partitionedTableConfigMurmur, SCHEMA),
segmentWithPartition);
assertTrue(response.isStale());
assertEquals(response.getReason(), "partition function name changed: partitionedColumn");
@@ -414,12 +419,12 @@ void testNullValueVector()
createImmutableSegmentDataManager(indexLoadingConfig, "withoutNullHandling", generateRows());
// If null handling is removed from table config AND segment has NVV, then NVV can be removed. needRefresh = true
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER);
+ StaleSegment response = _baseTableDataManager.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "null value vector index removed from column: NullValueColumn");
// if NVV is added to table config AND segment does not have NVV, then it cannot be added. needRefresh = false
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, segmentWithoutNullHandling).isStale());
+ assertFalse(_baseTableDataManager.isSegmentStale(INDEX_LOADING_CONFIG, segmentWithoutNullHandling).isStale());
}
@Test
@@ -428,7 +433,7 @@ public void testAddMultiColumnTextIndex() {
new MultiColumnTextIndexConfig(List.of(MC_TEXT_INDEX_COLUMN_1, MC_TEXT_INDEX_COLUMN_2));
TableConfig tableConfig =
getTableConfigBuilder().setMultiColumnTextIndexConfig(newIndex).build();
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(tableConfig, SCHEMA),
+ assertTrue(_baseTableDataManager.isSegmentStale(new IndexLoadingConfig(tableConfig, SCHEMA),
IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
}
@@ -448,7 +453,7 @@ public void testMultiColumnTextIndexWithDifferentColumns()
.build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -470,7 +475,7 @@ public void testMultiColumnTextIndexWithDifferentSharedProperties()
.build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -492,7 +497,7 @@ public void testMultiColumnTextIndexWithDifferentColumnProperties()
.build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -504,7 +509,7 @@ public void addStartreeIndex() {
new StarTreeIndexConfig(List.of("Carrier"), null, List.of(AggregationFunctionColumnPair.COUNT_STAR_NAME), null,
100);
TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(tableConfig, SCHEMA),
+ assertTrue(_baseTableDataManager.isSegmentStale(new IndexLoadingConfig(tableConfig, SCHEMA),
IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
}
@@ -529,7 +534,7 @@ public void testStarTreeIndexWithDifferentColumn()
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -552,7 +557,7 @@ public void testStarTreeIndexWithManyColumns()
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -575,7 +580,7 @@ public void testStartIndexWithDifferentOrder()
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -598,7 +603,7 @@ void testStarTreeIndexWithSkipDimCols()
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -621,7 +626,7 @@ void testStarTreeIndexWithDiffOrderSkipDimCols()
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertFalse(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -642,7 +647,7 @@ void testStarTreeIndexRemoveSkipDimCols()
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -662,7 +667,7 @@ void testStarTreeIndexAddAggFn()
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -683,7 +688,7 @@ void testStarTreeIndexDiffOrderAggFn()
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertFalse(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -705,7 +710,7 @@ void testStarTreeIndexRemoveAggFn()
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -727,7 +732,7 @@ void testStarTreeIndexNewMetricAgg()
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -751,7 +756,7 @@ void testStarTreeIndexDiffOrderAggFn2()
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertFalse(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -771,7 +776,7 @@ void testStarTreeIndexMaxLeafNode()
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -784,7 +789,7 @@ void testStarTreeIndexRemove()
TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
ImmutableSegmentDataManager segmentDataManager =
createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, SCHEMA), _testName, generateRows());
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, segmentDataManager).isStale());
+ assertTrue(_baseTableDataManager.isSegmentStale(INDEX_LOADING_CONFIG, segmentDataManager).isStale());
}
@Test
@@ -805,7 +810,7 @@ void testStarTreeIndexAddMultiple()
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig, newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -824,7 +829,7 @@ void testStarTreeIndexEnableDefault()
TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
newTableConfig.getIndexingConfig().setEnableDefaultStarTree(true);
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -840,7 +845,7 @@ void testStarTreeIndexNoChanges()
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(tableConfig, SCHEMA);
ImmutableSegmentDataManager segmentDataManager =
createImmutableSegmentDataManager(indexLoadingConfig, _testName, generateRows());
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, segmentDataManager).isStale());
+ assertFalse(_baseTableDataManager.isSegmentStale(indexLoadingConfig, segmentDataManager).isStale());
}
@Test
@@ -859,7 +864,7 @@ void testStarTreeIndexDisableDefault()
TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
newTableConfig.getIndexingConfig().setEnableDefaultStarTree(false);
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -877,7 +882,7 @@ void testTimestampIndexNoChange()
ImmutableSegmentDataManager segmentDataManager =
createImmutableSegmentDataManager(indexLoadingConfig, _testName, generateRows());
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, segmentDataManager).isStale());
+ assertFalse(_baseTableDataManager.isSegmentStale(indexLoadingConfig, segmentDataManager).isStale());
}
@Test
@@ -894,7 +899,7 @@ void testTimestampIndexAdded()
// Segment was created without the timestamp index.
StaleSegment response =
- BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfigWithTimestampIndex, IMMUTABLE_SEGMENT_DATA_MANAGER);
+ _baseTableDataManager.isSegmentStale(indexLoadingConfigWithTimestampIndex, IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "timestamp index changed");
}
@@ -917,7 +922,7 @@ void testTimestampIndexRemoved()
// Evaluate staleness against a fresh config with no timestamp index.
IndexLoadingConfig indexLoadingConfigNoTimestamp = new IndexLoadingConfig(getTableConfigBuilder().build(),
getSchema());
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfigNoTimestamp, segmentDataManager);
+ StaleSegment response = _baseTableDataManager.isSegmentStale(indexLoadingConfigNoTimestamp, segmentDataManager);
assertTrue(response.isStale());
assertEquals(response.getReason(), "timestamp index changed");
}
@@ -942,7 +947,7 @@ void testTimestampIndexGranularityAdded()
getTableConfigBuilder().setFieldConfigList(List.of(fieldConfigDayAndWeek)).build();
IndexLoadingConfig indexLoadingConfigDayAndWeek = new IndexLoadingConfig(tableConfigDayAndWeek, getSchema());
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfigDayAndWeek, segmentDataManager);
+ StaleSegment response = _baseTableDataManager.isSegmentStale(indexLoadingConfigDayAndWeek, segmentDataManager);
assertTrue(response.isStale());
assertEquals(response.getReason(), "timestamp index changed");
}
@@ -968,7 +973,7 @@ void testTimestampIndexGranularityRemoved()
TableConfig tableConfigDay = getTableConfigBuilder().setFieldConfigList(List.of(fieldConfigDay)).build();
IndexLoadingConfig indexLoadingConfigDay = new IndexLoadingConfig(tableConfigDay, getSchema());
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfigDay, segmentDataManager);
+ StaleSegment response = _baseTableDataManager.isSegmentStale(indexLoadingConfigDay, segmentDataManager);
assertTrue(response.isStale());
assertEquals(response.getReason(), "timestamp index changed");
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index a21c210616ce..a98f4d8b00bc 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -890,32 +890,52 @@ public void decrypt(File origFile, File decFile) {
}
}
- static OfflineTableDataManager createTableManager() {
+ protected BaseTableDataManager createTableManager() {
return createTableManager(createDefaultInstanceDataManagerConfig());
}
- static OfflineTableDataManager createTableManagerWithAsyncSegmentRefreshEnabled() {
+ protected BaseTableDataManager createTableManagerWithAsyncSegmentRefreshEnabled() {
return createTableManagerWithAsyncSegmentRefreshEnabled(createDefaultInstanceDataManagerConfig());
}
- private static OfflineTableDataManager createTableManager(InstanceDataManagerConfig instanceDataManagerConfig) {
- OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
- tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), DEFAULT_TABLE_CONFIG,
+ protected BaseTableDataManager createTableManager(InstanceDataManagerConfig instanceDataManagerConfig) {
+ BaseTableDataManager tableDataManager = newTableDataManager();
+ tableDataManager.init(instanceDataManagerConfig, createHelixManagerMock(), new SegmentLocks(), DEFAULT_TABLE_CONFIG,
SCHEMA, new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null,
SEGMENT_OPERATIONS_THROTTLER, false, mock(ServerReloadJobStatusCache.class));
return tableDataManager;
}
- private static OfflineTableDataManager createTableManagerWithAsyncSegmentRefreshEnabled(
+ protected BaseTableDataManager createTableManagerWithAsyncSegmentRefreshEnabled(
InstanceDataManagerConfig instanceDataManagerConfig) {
- OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
- tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), DEFAULT_TABLE_CONFIG,
+ BaseTableDataManager tableDataManager = newTableDataManager();
+ tableDataManager.init(instanceDataManagerConfig, createHelixManagerMock(), new SegmentLocks(), DEFAULT_TABLE_CONFIG,
SCHEMA, new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null,
SEGMENT_OPERATIONS_THROTTLER, true, mock(ServerReloadJobStatusCache.class));
return tableDataManager;
}
- private static InstanceDataManagerConfig createDefaultInstanceDataManagerConfig() {
+ /**
+ * Returns the concrete {@link BaseTableDataManager} instance under test. Default returns a stock
+ * {@link OfflineTableDataManager}; subclasses override to test a different implementation while inheriting
+ * all test bodies.
+ */
+ protected BaseTableDataManager newTableDataManager() {
+ return new OfflineTableDataManager();
+ }
+
+ /**
+ * Returns the {@link HelixManager} mock wired into the TDM under test. Default returns a bare Mockito mock
+ * (no property store stubbed) — fine for the inherited test bodies which never read ZK directly. Subclasses
+ * that exercise paths reading {@code _propertyStore} (e.g. {@code fetchZKMetadata}, {@code fetchIndexLoadingConfig})
+ * override to stub {@code helixManager.getHelixPropertyStore()} with a {@code FakePropertyStore} pre-seeded
+ * with table config + schema + per-segment ZK metadata.
+ */
+ protected HelixManager createHelixManagerMock() {
+ return mock(HelixManager.class);
+ }
+
+ protected static InstanceDataManagerConfig createDefaultInstanceDataManagerConfig() {
InstanceDataManagerConfig config = mock(InstanceDataManagerConfig.class);
when(config.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
// Check CRC matching on segment load time.
@@ -923,7 +943,7 @@ private static InstanceDataManagerConfig createDefaultInstanceDataManagerConfig(
return config;
}
- private static File createSegment(SegmentVersion segmentVersion, int numRows)
+ protected static File createSegment(SegmentVersion segmentVersion, int numRows)
throws Exception {
SegmentGeneratorConfig config = new SegmentGeneratorConfig(DEFAULT_TABLE_CONFIG, SCHEMA);
config.setOutDir(TABLE_DATA_DIR.getAbsolutePath());
@@ -942,14 +962,14 @@ private static File createSegment(SegmentVersion segmentVersion, int numRows)
return new File(TABLE_DATA_DIR, SEGMENT_NAME);
}
- private static SegmentZKMetadata createRawSegment(SegmentVersion segmentVersion, int numRows)
+ protected static SegmentZKMetadata createRawSegment(SegmentVersion segmentVersion, int numRows)
throws Exception {
File indexDir = createSegment(segmentVersion, numRows);
return makeRawSegment(indexDir,
new File(TEMP_DIR, SEGMENT_NAME + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION), true);
}
- private static SegmentZKMetadata makeRawSegment(File indexDir, File rawSegmentFile, boolean deleteIndexDir)
+ protected static SegmentZKMetadata makeRawSegment(File indexDir, File rawSegmentFile, boolean deleteIndexDir)
throws Exception {
long crc = getCRC(indexDir);
SegmentZKMetadata zkMetadata = new SegmentZKMetadata(SEGMENT_NAME);
@@ -962,7 +982,7 @@ private static SegmentZKMetadata makeRawSegment(File indexDir, File rawSegmentFi
return zkMetadata;
}
- private static long getCRC(File indexDir)
+ protected static long getCRC(File indexDir)
throws IOException {
File creationMetaFile = SegmentDirectoryPaths.findCreationMetaFile(indexDir);
assertNotNull(creationMetaFile);
@@ -971,7 +991,7 @@ private static long getCRC(File indexDir)
}
}
- private IndexLoadingConfig createTierIndexLoadingConfig(TableConfig tableConfig) {
+ protected IndexLoadingConfig createTierIndexLoadingConfig(TableConfig tableConfig) {
InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getSegmentDirectoryLoader()).thenReturn(TIER_SEGMENT_DIRECTORY_LOADER);
when(instanceDataManagerConfig.getConfig()).thenReturn(new PinotConfiguration());
@@ -981,7 +1001,7 @@ private IndexLoadingConfig createTierIndexLoadingConfig(TableConfig tableConfig)
return indexLoadingConfig;
}
- private ImmutableSegmentDataManager createImmutableSegmentDataManager(String segmentName, long crc) {
+ protected ImmutableSegmentDataManager createImmutableSegmentDataManager(String segmentName, long crc) {
ImmutableSegmentDataManager segmentDataManager = mock(ImmutableSegmentDataManager.class);
when(segmentDataManager.getSegmentName()).thenReturn(segmentName);
ImmutableSegment immutableSegment = mock(ImmutableSegment.class);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 15a8a7565e6f..10c4d414cdec 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -194,6 +194,13 @@ void offloadSegment(String segmentName)
void offloadSegmentUnsafe(String segmentName)
throws Exception;
+ /**
+ * Deletes a segment from this table — offloads it if currently loaded, then removes its on-disk data (the per-segment
+ * directory and any tier-specific artefacts via {@link org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader}).
+ */
+ void deleteSegment(String segmentName)
+ throws Exception;
+
/**
* Reloads an existing immutable segment for the table, which can be an OFFLINE or REALTIME table.
* A new segment may be downloaded if the local one has a different CRC; or can be forced to download if
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 9d7464cca970..4bbd90ae876a 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -48,6 +48,7 @@
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.core.data.manager.BaseTableDataManager;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.LogicalTableContext;
import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
@@ -64,9 +65,6 @@
import org.apache.pinot.segment.local.utils.ServerReloadJobStatusCache;
import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
-import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
-import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.LogicalTableConfig;
@@ -383,38 +381,23 @@ public void offloadSegment(String tableNameWithType, String segmentName)
public void deleteSegment(String tableNameWithType, String segmentName)
throws Exception {
LOGGER.info("Deleting segment: {} from table: {}", segmentName, tableNameWithType);
- // Segment deletion is handled at instance level because table data manager might not exist. Acquire the lock here.
+ // Hold the per-segment lock around the TDM lookup so the lookup + delete is atomic vs. removeTableDataManager
+ // shutting the TDM down concurrently. The TDM's own deleteSegment re-acquires the same lock (ReentrantLock).
Lock segmentLock = _segmentLocks.getLock(tableNameWithType, segmentName);
segmentLock.lock();
try {
- // Check if the segment is still loaded, if so, offload it first.
- // This might happen when the server disconnected from ZK and reconnected, and the segment is still loaded.
- // TODO: Consider using table data manager to delete the segment. This will allow the table data manager to clean
- // up the segment data on all tiers. Note that table data manager might have not been created, and table
- // config might have been deleted at this point.
TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType);
- if (tableDataManager != null && tableDataManager.hasSegment(segmentName)) {
- LOGGER.warn("Segment: {} from table: {} is still loaded, offloading it first", segmentName, tableNameWithType);
- tableDataManager.offloadSegment(segmentName);
- }
- // Clean up the segment data on default tier unconditionally.
- File segmentDir = getSegmentDataDirectory(tableNameWithType, segmentName);
- if (segmentDir.exists()) {
- FileUtils.deleteQuietly(segmentDir);
- LOGGER.info("Deleted segment directory {} on default tier", segmentDir);
- }
- // We might clean up further more with the specific segment loader. But note that table data manager might have
- // not been created, and table config might have been deleted at this point.
- SegmentDirectoryLoader segmentLoader = SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(
- _instanceDataManagerConfig.getSegmentDirectoryLoader());
- if (segmentLoader != null) {
- LOGGER.info("Deleting segment: {} further with segment loader: {}", segmentName,
- _instanceDataManagerConfig.getSegmentDirectoryLoader());
- SegmentDirectoryLoaderContext ctx = new SegmentDirectoryLoaderContext.Builder().setSegmentName(segmentName)
- .setTableDataDir(_instanceDataManagerConfig.getInstanceDataDir() + "/" + tableNameWithType).build();
- segmentLoader.delete(ctx);
+ if (tableDataManager != null) {
+ // The TDM owns the offload-if-loaded prelude, the on-disk dir delete, and the tier-aware
+ // segment-directory-loader cleanup.
+ tableDataManager.deleteSegment(segmentName);
+ } else {
+ // Fallback: TDM can be null if it was never instantiated, or has already been removed via deleteTable.
+ // In that case, do a path-only cleanup keyed by segment name.
+ String tableDataDir = _instanceDataManagerConfig.getInstanceDataDir() + "/" + tableNameWithType;
+ BaseTableDataManager.deleteSegmentFilesFromDisk(tableDataDir, segmentName, _instanceDataManagerConfig);
+ LOGGER.info("Deleted segment: {} from table: {}", segmentName, tableNameWithType);
}
- LOGGER.info("Deleted segment: {} from table: {}", segmentName, tableNameWithType);
} finally {
segmentLock.unlock();
}