From f3cd165b80642ec23e468b2b25ca03fa01335fa2 Mon Sep 17 00:00:00 2001 From: Krishan Goyal Date: Wed, 29 Apr 2026 16:13:04 +0530 Subject: [PATCH 1/5] Refactor BaseTableDataManager so multi-segment managers can compose its building blocks Three small refactors that keep single-segment behavior identical and let a future multi-segment SegmentDataManager (e.g. a wrapper around N constituent segments) reuse the same load and reload primitives without forking them: 1. Extract `protected ImmutableSegment loadSegment(zkMetadata, ilc)` from `downloadAndLoadSegment`. The new helper performs only the download + `ImmutableSegmentLoader.load`, returning the segment without registering it in `_segmentDataManagerMap` or invoking upsert hooks. Single-segment callers continue to use `downloadAndLoadSegment`, which now composes the helper + `addSegment(...)`. This lets a multi-segment manager load all of its members first and register a single wrapper entry under one name. 2. Push `_segmentReloadSemaphore` acquire/release down into `reloadSegment(SegmentDataManager, IndexLoadingConfig, boolean)`. The public `reloadSegment(String)` and the private parallel `reloadSegments(List)` both used to wrap the inner call with the semaphore; that acquire is now inside the per-physical-segment body and the outer wrappers are removed (which would otherwise double-acquire on a non-reentrant semaphore). For non-group tables this is structurally identical (one segment -> one acquire -> one release; same concurrency bound). For multi-segment managers that fan out N reloads, each member contends for a slot independently. 3. Drop `@VisibleForTesting` on `isSegmentStale(IndexLoadingConfig, SegmentDataManager)` and widen to plain `protected` so subclasses can call it from group-aware overrides of `getStaleSegments` / `needReloadSegments`. The semaphore stays at the orchestration boundary in `doReplaceSegment` (not relocated into `replaceSegmentIfCrcMismatch`), because subclasses commonly override `replaceSegmentIfCrcMismatch` and a relocation there would leak the acquire across paths that bypass the override; subclasses needing per-member acquire on a multi-segment replace can wrap the call themselves. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../data/manager/BaseTableDataManager.java | 90 ++++++++++--------- 1 file changed, 47 insertions(+), 43 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 2bd134a9e520..fd17d880649b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -486,14 +486,24 @@ public void downloadAndLoadSegment(SegmentZKMetadata zkMetadata, IndexLoadingCon throws Exception { String segmentName = zkMetadata.getSegmentName(); _logger.info("Downloading and loading segment: {}", segmentName); - File indexDir = downloadSegment(zkMetadata); - ImmutableSegment immutableSegment = - ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, _segmentOperationsThrottlerSet, zkMetadata); + ImmutableSegment immutableSegment = loadSegment(zkMetadata, indexLoadingConfig); addSegment(immutableSegment, zkMetadata); _logger.info("Downloaded and loaded segment: {} with CRC: {} on tier: {}", segmentName, zkMetadata.getCrc(), TierConfigUtils.normalizeTierName(zkMetadata.getTier())); } + /** + * Downloads and loads a segment without registering it in the segment data manager map and without invoking upsert + * hooks. Returns the loaded {@link ImmutableSegment} so callers can compose registration separately. Single-segment + * callers should use {@link #downloadAndLoadSegment} which performs the registration step. Multi-segment managers + * may load several physical segments before wrapping them under a single map entry. + */ + protected ImmutableSegment loadSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) + throws Exception { + File indexDir = downloadSegment(zkMetadata); + return ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, _segmentOperationsThrottlerSet, zkMetadata); + } + @Override public void replaceSegment(String segmentName) throws Exception { @@ -616,12 +626,7 @@ public void reloadSegment(String segmentName, boolean forceDownload, String relo SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName); if (segmentDataManager != null) { IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig(); - _segmentReloadSemaphore.acquire(segmentName, _logger); - try { - reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload); - } finally { - _segmentReloadSemaphore.release(); - } + reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload); } else { _logger.warn("Failed to find segment: {}, skipping reloading it", segmentName); } @@ -947,12 +952,7 @@ protected void reloadSegments(List segmentDataManagers, Inde CompletableFuture.allOf(segmentDataManagers.stream().map(segmentDataManager -> CompletableFuture.runAsync(() -> { String segmentName = segmentDataManager.getSegmentName(); try { - _segmentReloadSemaphore.acquire(segmentName, _logger); - try { - reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload); - } finally { - _segmentReloadSemaphore.release(); - } + reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload); } catch (Throwable t) { _logger.error("Caught exception while reloading segment: {}", segmentName, t); failedSegments.add(segmentName); @@ -973,36 +973,41 @@ protected void reloadSegment(SegmentDataManager segmentDataManager, IndexLoading boolean forceDownload) throws Exception { String segmentName = segmentDataManager.getSegmentName(); - if (segmentDataManager instanceof RealtimeSegmentDataManager) { - // Use force commit to reload consuming segment - if (_instanceDataManagerConfig.shouldReloadConsumingSegment()) { - // Force-committing consuming segments is restricted only for tables with inconsistent state configs - // (partial-upsert or dropOutOfOrderRecord=true with replication > 1). - // For these tables, winner selection could incorrectly favor replicas with fewer consumed rows, - // triggering unnecessary reconsumption and resulting in inconsistent upsert state. - // To enable force commit for such tables, change the cluster config - // `pinot.server.consuming.segment.consistency.mode` to PROTECTED for safer reload. - TableConfig tableConfig = indexLoadingConfig.getTableConfig(); - ConsumingSegmentConsistencyModeListener config = ConsumingSegmentConsistencyModeListener.getInstance(); - boolean isTableTypeInconsistentDuringConsumption = - TableConfigUtils.isTableTypeInconsistentDuringConsumption(tableConfig); - // Allow force commit if: - // 1. Table doesn't have inconsistent configs (non-upsert or standard upsert tables), OR - // 2. Consistency mode is PROTECTED or UNSAFE (isForceCommitAllowed = true) - if (tableConfig == null || (isTableTypeInconsistentDuringConsumption && !config.isForceCommitAllowed())) { - _logger.warn("Skipping reload (force commit) on consuming segment: {} due to inconsistent state config. " - + "Change the cluster config: {} to `PROTECTED` for safer commit", segmentName, config.getConfigKey()); + _segmentReloadSemaphore.acquire(segmentName, _logger); + try { + if (segmentDataManager instanceof RealtimeSegmentDataManager) { + // Use force commit to reload consuming segment + if (_instanceDataManagerConfig.shouldReloadConsumingSegment()) { + // Force-committing consuming segments is restricted only for tables with inconsistent state configs + // (partial-upsert or dropOutOfOrderRecord=true with replication > 1). + // For these tables, winner selection could incorrectly favor replicas with fewer consumed rows, + // triggering unnecessary reconsumption and resulting in inconsistent upsert state. + // To enable force commit for such tables, change the cluster config + // `pinot.server.consuming.segment.consistency.mode` to PROTECTED for safer reload. + TableConfig tableConfig = indexLoadingConfig.getTableConfig(); + ConsumingSegmentConsistencyModeListener config = ConsumingSegmentConsistencyModeListener.getInstance(); + boolean isTableTypeInconsistentDuringConsumption = + TableConfigUtils.isTableTypeInconsistentDuringConsumption(tableConfig); + // Allow force commit if: + // 1. Table doesn't have inconsistent configs (non-upsert or standard upsert tables), OR + // 2. Consistency mode is PROTECTED or UNSAFE (isForceCommitAllowed = true) + if (tableConfig == null || (isTableTypeInconsistentDuringConsumption && !config.isForceCommitAllowed())) { + _logger.warn("Skipping reload (force commit) on consuming segment: {} due to inconsistent state config. " + + "Change the cluster config: {} to `PROTECTED` for safer commit", segmentName, config.getConfigKey()); + } else { + _logger.info("Reloading (force committing) consuming segment: {}", segmentName); + ((RealtimeSegmentDataManager) segmentDataManager).forceCommit(); + } } else { - _logger.info("Reloading (force committing) consuming segment: {}", segmentName); - ((RealtimeSegmentDataManager) segmentDataManager).forceCommit(); + _logger.warn("Skip reloading consuming segment: {} as configured", segmentName); } } else { - _logger.warn("Skip reloading consuming segment: {} as configured", segmentName); + SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName); + SegmentMetadata localMetadata = segmentDataManager.getSegment().getSegmentMetadata(); + reloadSegment(segmentName, indexLoadingConfig, zkMetadata, localMetadata, forceDownload); } - } else { - SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName); - SegmentMetadata localMetadata = segmentDataManager.getSegment().getSegmentMetadata(); - reloadSegment(segmentName, indexLoadingConfig, zkMetadata, localMetadata, forceDownload); + } finally { + _segmentReloadSemaphore.release(); } } @@ -1542,8 +1547,7 @@ public List getStaleSegments() { return staleSegments; } - @VisibleForTesting - StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig, SegmentDataManager segmentDataManager) { + protected StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig, SegmentDataManager segmentDataManager) { TableConfig tableConfig = indexLoadingConfig.getTableConfig(); Schema schema = indexLoadingConfig.getSchema(); assert tableConfig != null && schema != null; From cc2afba2db257f7394e297a29b15919d94b278c0 Mon Sep 17 00:00:00 2001 From: Krishan Goyal Date: Wed, 29 Apr 2026 16:20:57 +0530 Subject: [PATCH 2/5] Cleanup comment --- .../pinot/core/data/manager/BaseTableDataManager.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index fd17d880649b..b3daecf132ff 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -492,12 +492,6 @@ public void downloadAndLoadSegment(SegmentZKMetadata zkMetadata, IndexLoadingCon TierConfigUtils.normalizeTierName(zkMetadata.getTier())); } - /** - * Downloads and loads a segment without registering it in the segment data manager map and without invoking upsert - * hooks. Returns the loaded {@link ImmutableSegment} so callers can compose registration separately. Single-segment - * callers should use {@link #downloadAndLoadSegment} which performs the registration step. Multi-segment managers - * may load several physical segments before wrapping them under a single map entry. - */ protected ImmutableSegment loadSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) throws Exception { File indexDir = downloadSegment(zkMetadata); From ab53cb5abed47146b831ffa538393e0d74e93385 Mon Sep 17 00:00:00 2001 From: Krishan Goyal Date: Wed, 6 May 2026 13:44:34 +0530 Subject: [PATCH 3/5] Refactor BaseTDM and its tests to enable cleaner extensions, abstractions and ownership for future work --- .../data/manager/BaseTableDataManager.java | 99 +++++++++++++++-- ...aseTableDataManagerAcquireSegmentTest.java | 87 +++++++++------ .../BaseTableDataManagerNeedRefreshTest.java | 103 +++++++++--------- .../manager/BaseTableDataManagerTest.java | 50 ++++++--- .../local/data/manager/TableDataManager.java | 7 ++ .../helix/HelixInstanceDataManager.java | 50 +++------ 6 files changed, 257 insertions(+), 139 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index b3daecf132ff..5d47d700bad7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -486,18 +486,14 @@ public void downloadAndLoadSegment(SegmentZKMetadata zkMetadata, IndexLoadingCon throws Exception { String segmentName = zkMetadata.getSegmentName(); _logger.info("Downloading and loading segment: {}", segmentName); - ImmutableSegment immutableSegment = loadSegment(zkMetadata, indexLoadingConfig); + File indexDir = downloadSegment(zkMetadata); + ImmutableSegment immutableSegment = + ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, _segmentOperationsThrottlerSet, zkMetadata); addSegment(immutableSegment, zkMetadata); _logger.info("Downloaded and loaded segment: {} with CRC: {} on tier: {}", segmentName, zkMetadata.getCrc(), TierConfigUtils.normalizeTierName(zkMetadata.getTier())); } - protected ImmutableSegment loadSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) - throws Exception { - File indexDir = downloadSegment(zkMetadata); - return ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, _segmentOperationsThrottlerSet, zkMetadata); - } - @Override public void replaceSegment(String segmentName) throws Exception { @@ -610,6 +606,67 @@ protected void doOffloadSegment(String segmentName) { } } + /** + * Default per-segment delete: takes the segment lock, offloads if currently loaded (using the standard + * {@link #offloadSegment(String)} entry point — the per-segment lock is a {@link + * java.util.concurrent.locks.ReentrantLock}, so re-acquiring it inside {@code offloadSegment} is safe), then removes + * the on-disk directory and any tier-specific artefacts via {@link #deleteSegmentFilesFromDisk}. + */ + @Override + public void deleteSegment(String segmentName) + throws Exception { + _logger.info("Deleting segment: {}", segmentName); + Lock segmentLock = getSegmentLock(segmentName); + segmentLock.lock(); + try { + if (hasSegment(segmentName)) { + _logger.warn("Segment: {} is still loaded, offloading it before delete", segmentName); + offloadSegment(segmentName); + } + deleteSegmentFilesFromDisk(_tableDataDir, segmentName, _instanceDataManagerConfig); + _logger.info("Deleted segment: {}", segmentName); + } finally { + segmentLock.unlock(); + } + } + + /** + * Single source of truth for on-disk cleanup of a physical segment. Removes the segment directory under + * {@code tableDataDir} and invokes the configured {@link SegmentDirectoryLoader#delete} for tier-aware cleanup. + * + *

Used by: + *

    + *
  • {@link #deleteSegment} — the TDM-driven path triggered by a *_TO_DROPPED state transition.
  • + *
  • {@code HelixInstanceDataManager.deleteSegment} fallback — when the per-table TDM is null because the + * table was never instantiated, or has already been removed via {@code deleteTable()}.
  • + *
  • TDM extensions that fan a single logical name out to multiple physical segments (e.g. segment-group + * member fan-out) — these reuse this helper directly while holding all required per-member locks + * acquired in lex order.
  • + *
+ * + *

Caller is responsible for holding the per-segment lock and for offloading the segment from the TDM if it + * is still loaded. + */ + public static void deleteSegmentFilesFromDisk(String tableDataDir, String segmentName, + InstanceDataManagerConfig instanceConfig) + throws Exception { + File segmentDir = new File(tableDataDir, segmentName); + if (segmentDir.exists()) { + FileUtils.deleteQuietly(segmentDir); + LOGGER.info("Deleted segment directory: {}", segmentDir); + } + SegmentDirectoryLoader segmentLoader = + SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(instanceConfig.getSegmentDirectoryLoader()); + if (segmentLoader != null) { + LOGGER.info("Deleting segment: {} further with segment loader: {}", segmentName, + instanceConfig.getSegmentDirectoryLoader()); + SegmentDirectoryLoaderContext ctx = new SegmentDirectoryLoaderContext.Builder().setSegmentName(segmentName) + .setTableDataDir(tableDataDir) + .build(); + segmentLoader.delete(ctx); + } + } + @Override public void reloadSegment(String segmentName, boolean forceDownload, String reloadJobId) throws Exception { @@ -1409,6 +1466,25 @@ protected void removeBackup(File indexDir) @Override public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) { + ImmutableSegment segment = tryLoadExistingSegmentInternal(zkMetadata, indexLoadingConfig); + if (segment == null) { + return false; + } + addSegment(segment, zkMetadata); + return true; + } + + /** + * Just Loads a segment from the existing on-disk copy without registering it in {@code _segmentDataManagerMap} or + * invoking other hooks. Returns {@code null} when the on-disk copy is absent, has a stale CRC under + * {@code shouldCheckCRCOnSegmentLoad}, or fails to load — callers fall back to a fresh download in that case. + * Single-segment callers should use {@link #tryLoadExistingSegment} which performs the registration step. + * Multi-segment managers can compose this with {@link #downloadSegment} to load all members before wrapping them + * under a single map entry. + */ + @Nullable + protected ImmutableSegment tryLoadExistingSegmentInternal(SegmentZKMetadata zkMetadata, + IndexLoadingConfig indexLoadingConfig) { String segmentName = zkMetadata.getSegmentName(); Preconditions.checkState(!_shutDown, "Table data manager is already shut down, cannot load existing segment: %s of table: %s", segmentName, @@ -1441,14 +1517,14 @@ public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoading if (segmentMetadata == null) { _logger.info("Segment: {} does not exist", segmentName); closeSegmentDirectoryQuietly(segmentDirectory); - return false; + return null; } if (isSegmentStatusCompleted(zkMetadata) && !hasSameCRC(zkMetadata, segmentMetadata)) { _logger.warn("Segment: {} has CRC changed from: {} to: {}", segmentName, segmentMetadata.getCrc(), zkMetadata.getCrc()); if (_instanceDataManagerConfig.shouldCheckCRCOnSegmentLoad()) { closeSegmentDirectoryQuietly(segmentDirectory); - return false; + return null; } _logger.info("Skipping CRC check for segment: {} as configured. Proceed to load segment.", segmentName); } @@ -1469,15 +1545,14 @@ public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoading indexLoadingConfig, zkMetadata); } ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory, indexLoadingConfig); - addSegment(segment, zkMetadata); _logger.info("Loaded existing segment: {} with CRC: {} on tier: {}", segmentName, zkMetadata.getCrc(), TierConfigUtils.normalizeTierName(segmentTier)); - return true; + return segment; } catch (Exception e) { _logger.error("Failed to load existing segment: {} with CRC: {} on tier: {}", segmentName, zkMetadata.getCrc(), TierConfigUtils.normalizeTierName(segmentTier), e); closeSegmentDirectoryQuietly(segmentDirectory); - return false; + return null; } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java index 519a3e8ff9fc..fc1789ca5eb6 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java @@ -31,7 +31,6 @@ import org.apache.commons.io.FileUtils; import org.apache.helix.HelixManager; import org.apache.pinot.common.metrics.ServerMetrics; -import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; @@ -73,7 +72,7 @@ public class BaseTableDataManagerAcquireSegmentTest { private final Set _allSegments = new HashSet<>(); private final Set _accessedSegManagers = ConcurrentHashMap.newKeySet(); private final Set _allSegManagers = ConcurrentHashMap.newKeySet(); - private Map _internalSegMap; + protected Map _internalSegMap; private Throwable _exception; private Thread _masterThread; // Segment numbers in place. @@ -115,7 +114,7 @@ public void beforeMethod() { _masterThread = null; } - private TableDataManager makeTestableManager() + protected TableDataManager makeTestableManager() throws Exception { InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class); when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(_tmpDir.getAbsolutePath()); @@ -129,18 +128,42 @@ private TableDataManager makeTestableManager() new SegmentOperationsThrottler(4, 8, true), new SegmentOperationsThrottler(10, 20, true), new SegmentOperationsThrottler(4, 8, true)); - TableDataManager tableDataManager = new OfflineTableDataManager(); + TableDataManager tableDataManager = newTableDataManager(); tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), tableConfig, schema, new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null, segmentOperationsThrottlerSet, false, mock(ServerReloadJobStatusCache.class)); tableDataManager.start(); Field segsMapField = BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap"); segsMapField.setAccessible(true); - _internalSegMap = (Map) segsMapField.get(tableDataManager); + _internalSegMap = (Map) segsMapField.get(tableDataManager); return tableDataManager; } - private ImmutableSegment makeImmutableSegment(String segmentName, int totalDocs) { + /** + * Returns the concrete {@link TableDataManager} instance under test. Default returns a stock + * {@link OfflineTableDataManager}; subclasses override to test a different implementation while inheriting all + * test bodies. + */ + protected TableDataManager newTableDataManager() { + return new OfflineTableDataManager(); + } + + /** + * Returns the TDM-map key under which a segment with the given name is registered. + */ + protected String tdmKey(String segmentName) { + return segmentName; + } + + protected void addSegment(TableDataManager tdm, ImmutableSegment seg) { + tdm.addSegment(seg); + } + + protected ImmutableSegment getInnerSegment(SegmentDataManager sdm) { + return (ImmutableSegment) sdm.getSegment(); + } + + protected ImmutableSegment makeImmutableSegment(String segmentName, int totalDocs) { ImmutableSegment immutableSegment = mock(ImmutableSegment.class); SegmentMetadata segmentMetadata = mock(SegmentMetadata.class); when(immutableSegment.getSegmentMetadata()).thenReturn(segmentMetadata); @@ -164,11 +187,11 @@ public void basicTest() // Add the segment, get it for use, remove the segment, and then return it. // Make sure that the segment is not destroyed before return. ImmutableSegment immutableSegment = makeImmutableSegment(segmentName, totalDocs); - tableDataManager.addSegment(immutableSegment); + addSegment(tableDataManager, immutableSegment); Assert.assertEquals(tableDataManager.getNumSegments(), 1); - SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName); + SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(tdmKey(segmentName)); Assert.assertEquals(segmentDataManager.getReferenceCount(), 2); - tableDataManager.offloadSegment(segmentName); + tableDataManager.offloadSegment(tdmKey(segmentName)); Assert.assertEquals(tableDataManager.getNumSegments(), 0); Assert.assertEquals(segmentDataManager.getReferenceCount(), 1); Assert.assertEquals(_nDestroys, 0); @@ -177,7 +200,7 @@ public void basicTest() Assert.assertEquals(_nDestroys, 1); // Now the segment should not be available for use.Also, returning a null reader is fine - segmentDataManager = tableDataManager.acquireSegment(segmentName); + segmentDataManager = tableDataManager.acquireSegment(tdmKey(segmentName)); Assert.assertNull(segmentDataManager); List segmentDataManagers = tableDataManager.acquireAllSegments(); Assert.assertEquals(segmentDataManagers.size(), 0); @@ -185,26 +208,26 @@ public void basicTest() // If a caller tries to acquire the deleted segment using acquireSegments, it will be returned in // notAcquiredSegments. The isSegmentDeletedRecently method should return true. List notAcquiredSegments = new ArrayList<>(); - tableDataManager.acquireSegments(List.of(segmentName), notAcquiredSegments); + tableDataManager.acquireSegments(List.of(tdmKey(segmentName)), notAcquiredSegments); Assert.assertEquals(notAcquiredSegments.size(), 1); - Assert.assertTrue(tableDataManager.isSegmentDeletedRecently(segmentName)); + Assert.assertTrue(tableDataManager.isSegmentDeletedRecently(tdmKey(segmentName))); // Adding and removing the segment again is fine. After adding the segment back, isSegmentDeletedRecently should // return false. - tableDataManager.addSegment(immutableSegment); - Assert.assertFalse(tableDataManager.isSegmentDeletedRecently(segmentName)); - tableDataManager.offloadSegment(segmentName); + addSegment(tableDataManager, immutableSegment); + Assert.assertFalse(tableDataManager.isSegmentDeletedRecently(tdmKey(segmentName))); + tableDataManager.offloadSegment(tdmKey(segmentName)); // Removing the segment again is fine. - tableDataManager.offloadSegment(segmentName); + tableDataManager.offloadSegment(tdmKey(segmentName)); Assert.assertEquals(tableDataManager.getNumSegments(), 0); // Add a new segment and remove it in order this time. final String anotherSeg = "AnotherSegment"; ImmutableSegment ix1 = makeImmutableSegment(anotherSeg, totalDocs); - tableDataManager.addSegment(ix1); + addSegment(tableDataManager, ix1); Assert.assertEquals(tableDataManager.getNumSegments(), 1); - SegmentDataManager sdm1 = tableDataManager.acquireSegment(anotherSeg); + SegmentDataManager sdm1 = tableDataManager.acquireSegment(tdmKey(anotherSeg)); Assert.assertNotNull(sdm1); Assert.assertEquals(sdm1.getReferenceCount(), 2); // acquire all segments @@ -220,15 +243,15 @@ public void basicTest() Assert.assertEquals(sdm1.getReferenceCount(), 1); // Now replace the segment with another one. ImmutableSegment ix2 = makeImmutableSegment(anotherSeg, totalDocs + 1); - tableDataManager.addSegment(ix2); + addSegment(tableDataManager, ix2); Assert.assertEquals(tableDataManager.getNumSegments(), 1); // Now the previous one should have been destroyed, and Assert.assertEquals(sdm1.getReferenceCount(), 0); verify(ix1, times(1)).destroy(); // Delete ix2 without accessing it. - SegmentDataManager sdm2 = _internalSegMap.get(anotherSeg); + SegmentDataManager sdm2 = _internalSegMap.get(tdmKey(anotherSeg)); Assert.assertEquals(sdm2.getReferenceCount(), 1); - tableDataManager.offloadSegment(anotherSeg); + tableDataManager.offloadSegment(tdmKey(anotherSeg)); Assert.assertEquals(tableDataManager.getNumSegments(), 0); Assert.assertEquals(sdm2.getReferenceCount(), 0); verify(ix2, times(1)).destroy(); @@ -265,8 +288,8 @@ public void testReplace() for (int i = _lo; i <= _hi; i++) { final String segName = SEGMENT_PREFIX + i; - tableDataManager.addSegment(makeImmutableSegment(segName, _random.nextInt())); - _allSegManagers.add(_internalSegMap.get(segName)); + addSegment(tableDataManager, makeImmutableSegment(segName, _random.nextInt())); + _allSegManagers.add(_internalSegMap.get(tdmKey(segName))); } runStorageServer(numQueryThreads, runTimeSec, tableDataManager); // replaces segments while online @@ -315,14 +338,14 @@ private void runStorageServer(int numQueryThreads, int runTimeSec, TableDataMana for (SegmentDataManager segmentDataManager : _internalSegMap.values()) { Assert.assertEquals(segmentDataManager.getReferenceCount(), 1); // We should never have called destroy on these segments. Remove it from the list of accessed segments. - verify(segmentDataManager.getSegment(), never()).destroy(); + verify(getInnerSegment(segmentDataManager), never()).destroy(); _allSegManagers.remove(segmentDataManager); _accessedSegManagers.remove(segmentDataManager); } // For the remaining segments in accessed list, destroy must have been called exactly once. for (SegmentDataManager segmentDataManager : _allSegManagers) { - verify(segmentDataManager.getSegment(), times(1)).destroy(); + verify(getInnerSegment(segmentDataManager), times(1)).destroy(); // Also their count should be 0 Assert.assertEquals(segmentDataManager.getReferenceCount(), 0); } @@ -361,7 +384,7 @@ public void run() { Set segmentIds = pickSegments(); List segmentList = new ArrayList<>(segmentIds.size()); for (Integer segmentId : segmentIds) { - segmentList.add(SEGMENT_PREFIX + segmentId); + segmentList.add(tdmKey(SEGMENT_PREFIX + segmentId)); } segmentDataManagers = _tableDataManager.acquireSegments(segmentList, new ArrayList<>()); } @@ -448,8 +471,9 @@ public void run() { private void addSegment() { final int segmentToAdd = _hi + 1; final String segName = SEGMENT_PREFIX + segmentToAdd; - _tableDataManager.addSegment(makeImmutableSegment(segName, _random.nextInt())); - _allSegManagers.add(_internalSegMap.get(segName)); + BaseTableDataManagerAcquireSegmentTest.this.addSegment(_tableDataManager, + makeImmutableSegment(segName, _random.nextInt())); + _allSegManagers.add(_internalSegMap.get(tdmKey(segName))); _hi = segmentToAdd; } @@ -457,8 +481,9 @@ private void addSegment() { private void replaceSegment() { int segToReplace = _random.nextInt(_hi - _lo + 1) + _lo; final String segName = SEGMENT_PREFIX + segToReplace; - _tableDataManager.addSegment(makeImmutableSegment(segName, _random.nextInt())); - _allSegManagers.add(_internalSegMap.get(segName)); + BaseTableDataManagerAcquireSegmentTest.this.addSegment(_tableDataManager, + makeImmutableSegment(segName, _random.nextInt())); + _allSegManagers.add(_internalSegMap.get(tdmKey(segName))); } // Remove the segment _lo and then bump _lo @@ -466,7 +491,7 @@ private void removeSegment() throws Exception { // Keep at least one segment in place. if (_hi > _lo) { - _tableDataManager.offloadSegment(SEGMENT_PREFIX + _lo); + _tableDataManager.offloadSegment(tdmKey(SEGMENT_PREFIX + _lo)); _lo++; } else { addSegment(); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java index 61657b8e5fdf..c1ef555a3afe 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java @@ -52,6 +52,7 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -64,7 +65,7 @@ @Test -public class BaseTableDataManagerNeedRefreshTest { +public class BaseTableDataManagerNeedRefreshTest extends BaseTableDataManagerTest { private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "BaseTableDataManagerNeedRefreshTest"); private static final String DEFAULT_TABLE_NAME = "mytable"; private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(DEFAULT_TABLE_NAME); @@ -91,7 +92,7 @@ public class BaseTableDataManagerNeedRefreshTest { private static final Schema SCHEMA; private static final IndexLoadingConfig INDEX_LOADING_CONFIG; private static final ImmutableSegmentDataManager IMMUTABLE_SEGMENT_DATA_MANAGER; - private static final BaseTableDataManager BASE_TABLE_DATA_MANAGER; + private BaseTableDataManager _baseTableDataManager; private String _testName = "defaultTestName"; @@ -102,12 +103,16 @@ public class BaseTableDataManagerNeedRefreshTest { INDEX_LOADING_CONFIG = new IndexLoadingConfig(TABLE_CONFIG, SCHEMA); IMMUTABLE_SEGMENT_DATA_MANAGER = createImmutableSegmentDataManager(INDEX_LOADING_CONFIG, "basicSegment", generateRows()); - BASE_TABLE_DATA_MANAGER = BaseTableDataManagerTest.createTableManager(); } catch (Exception e) { throw new RuntimeException(e); } } + @BeforeClass + public void initBaseTableDataManager() { + _baseTableDataManager = createTableManager(); + } + protected static TableConfigBuilder getTableConfigBuilder() { return new TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME) .setTimeColumnName(DEFAULT_TIME_COLUMN_NAME) @@ -198,7 +203,7 @@ private static ImmutableSegmentDataManager createImmutableSegmentDataManager(Ind when(segmentDataManager.getSegmentName()).thenReturn(segmentName); File indexDir = createSegment(indexLoadingConfig, segmentName, rows); ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, - BaseTableDataManagerTest.SEGMENT_OPERATIONS_THROTTLER); + SEGMENT_OPERATIONS_THROTTLER); when(segmentDataManager.getSegment()).thenReturn(immutableSegment); return segmentDataManager; } @@ -233,7 +238,7 @@ void testAddTimeColumn() ImmutableSegmentDataManager segmentDataManager = createImmutableSegmentDataManager(indexLoadingConfig, "noChanges", List.of(row)); - BaseTableDataManager tableDataManager = BaseTableDataManagerTest.createTableManager(); + BaseTableDataManager tableDataManager = createTableManager(); StaleSegment response = tableDataManager.isSegmentStale(indexLoadingConfig, segmentDataManager); assertFalse(response.isStale()); @@ -247,7 +252,7 @@ void testAddTimeColumn() @Test void testChangeTimeColumn() { TableConfig tableConfig = getTableConfigBuilder().setTimeColumnName(MS_SINCE_EPOCH_COLUMN_NAME).build(); - StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(tableConfig, SCHEMA), + StaleSegment response = _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(tableConfig, SCHEMA), IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "time column"); @@ -258,7 +263,7 @@ void testRemoveColumn() throws Exception { Schema schema = getSchema(); schema.removeField(TEXT_INDEX_COLUMN); - StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema), + StaleSegment response = _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema), IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "column deleted: textColumn"); @@ -270,7 +275,7 @@ void testFieldType() Schema schema = getSchema(); schema.removeField(TEXT_INDEX_COLUMN); schema.addField(new MetricFieldSpec(TEXT_INDEX_COLUMN, FieldSpec.DataType.STRING, true)); - StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema), + StaleSegment response = _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema), IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "field type changed: textColumn"); @@ -282,7 +287,7 @@ void testChangeDataType() Schema schema = getSchema(); schema.removeField(TEXT_INDEX_COLUMN); schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN, FieldSpec.DataType.INT, true)); - StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema), + StaleSegment response = _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema), IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "data type changed: textColumn"); @@ -294,7 +299,7 @@ void testChangeToMV() Schema schema = getSchema(); schema.removeField(TEXT_INDEX_COLUMN); schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN, FieldSpec.DataType.STRING, false)); - StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema), + StaleSegment response = _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema), IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "single / multi value changed: textColumn"); @@ -306,7 +311,7 @@ void testChangeToSV() Schema schema = getSchema(); schema.removeField(TEXT_INDEX_COLUMN_MV); schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN_MV, FieldSpec.DataType.STRING, true)); - StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema), + StaleSegment response = _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema), IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "single / multi value changed: textColumnMV"); @@ -317,12 +322,12 @@ void testSortColumnMismatch() { // Check with a column that is not sorted TableConfig tableConfig = getTableConfigBuilder().setSortedColumn(MS_SINCE_EPOCH_COLUMN_NAME).build(); IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(tableConfig, SCHEMA); - StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER); + StaleSegment response = _baseTableDataManager.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "sort column changed: MilliSecondsSinceEpoch"); // Check with a column that is sorted tableConfig.getIndexingConfig().setSortedColumn(List.of(TEXT_INDEX_COLUMN)); - assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER).isStale()); + assertFalse(_baseTableDataManager.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER).isStale()); } @DataProvider(name = "testFilterArgs") @@ -357,17 +362,17 @@ void testFilter(String segmentName, TableConfig tableConfigWithFilter, String ex createImmutableSegmentDataManager(indexLoadingConfig, segmentName, generateRows()); // When TableConfig has a filter but segment does not have, needRefresh is true. - StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER); + StaleSegment response = _baseTableDataManager.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), expectedReason); // When TableConfig does not have a filter but segment has, needRefresh is true - response = BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, segmentWithFilter); + response = _baseTableDataManager.isSegmentStale(INDEX_LOADING_CONFIG, segmentWithFilter); assertTrue(response.isStale()); assertEquals(response.getReason(), expectedReason); // When TableConfig has a filter AND segment also has a filter, needRefresh is false - assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, segmentWithFilter).isStale()); + assertFalse(_baseTableDataManager.isSegmentStale(indexLoadingConfig, segmentWithFilter).isStale()); } @Test @@ -380,17 +385,17 @@ void testPartition() createImmutableSegmentDataManager(indexLoadingConfig, "partitionWithModulo", generateRows()); // when segment has no partition AND tableConfig has partitions then needRefresh = true - StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER); + StaleSegment response = _baseTableDataManager.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "partition function added: partitionedColumn"); // when segment has partitions AND tableConfig has no partitions, then needRefresh = false - assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, segmentWithPartition).isStale()); + assertFalse(_baseTableDataManager.isSegmentStale(INDEX_LOADING_CONFIG, segmentWithPartition).isStale()); // when # of partitions is different, then needRefresh = true TableConfig partitionedTableConfig40 = getTableConfigBuilder().setSegmentPartitionConfig(new SegmentPartitionConfig( Map.of(PARTITIONED_COLUMN_NAME, new ColumnPartitionConfig(PARTITION_FUNCTION_NAME, 40)))).build(); - response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(partitionedTableConfig40, SCHEMA), + response = _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(partitionedTableConfig40, SCHEMA), segmentWithPartition); assertTrue(response.isStale()); assertEquals(response.getReason(), "num partitions changed: partitionedColumn"); @@ -399,7 +404,7 @@ void testPartition() TableConfig partitionedTableConfigMurmur = getTableConfigBuilder().setSegmentPartitionConfig( new SegmentPartitionConfig( Map.of(PARTITIONED_COLUMN_NAME, new ColumnPartitionConfig("murmur", NUM_PARTITIONS)))).build(); - response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(partitionedTableConfigMurmur, SCHEMA), + response = _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(partitionedTableConfigMurmur, SCHEMA), segmentWithPartition); assertTrue(response.isStale()); assertEquals(response.getReason(), "partition function name changed: partitionedColumn"); @@ -414,12 +419,12 @@ void testNullValueVector() createImmutableSegmentDataManager(indexLoadingConfig, "withoutNullHandling", generateRows()); // If null handling is removed from table config AND segment has NVV, then NVV can be removed. needRefresh = true - StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER); + StaleSegment response = _baseTableDataManager.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "null value vector index removed from column: NullValueColumn"); // if NVV is added to table config AND segment does not have NVV, then it cannot be added. needRefresh = false - assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, segmentWithoutNullHandling).isStale()); + assertFalse(_baseTableDataManager.isSegmentStale(INDEX_LOADING_CONFIG, segmentWithoutNullHandling).isStale()); } @Test @@ -428,7 +433,7 @@ public void testAddMultiColumnTextIndex() { new MultiColumnTextIndexConfig(List.of(MC_TEXT_INDEX_COLUMN_1, MC_TEXT_INDEX_COLUMN_2)); TableConfig tableConfig = getTableConfigBuilder().setMultiColumnTextIndexConfig(newIndex).build(); - assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(tableConfig, SCHEMA), + assertTrue(_baseTableDataManager.isSegmentStale(new IndexLoadingConfig(tableConfig, SCHEMA), IMMUTABLE_SEGMENT_DATA_MANAGER).isStale()); } @@ -448,7 +453,7 @@ public void testMultiColumnTextIndexWithDifferentColumns() .build(); assertTrue( - BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) .isStale()); } @@ -470,7 +475,7 @@ public void testMultiColumnTextIndexWithDifferentSharedProperties() .build(); assertTrue( - BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) .isStale()); } @@ -492,7 +497,7 @@ public void testMultiColumnTextIndexWithDifferentColumnProperties() .build(); assertTrue( - BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) .isStale()); } @@ -504,7 +509,7 @@ public void addStartreeIndex() { new StarTreeIndexConfig(List.of("Carrier"), null, List.of(AggregationFunctionColumnPair.COUNT_STAR_NAME), null, 100); TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); - assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(tableConfig, SCHEMA), + assertTrue(_baseTableDataManager.isSegmentStale(new IndexLoadingConfig(tableConfig, SCHEMA), IMMUTABLE_SEGMENT_DATA_MANAGER).isStale()); } @@ -529,7 +534,7 @@ public void testStarTreeIndexWithDifferentColumn() TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); assertTrue( - BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) .isStale()); } @@ -552,7 +557,7 @@ public void testStarTreeIndexWithManyColumns() TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); assertTrue( - BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) .isStale()); } @@ -575,7 +580,7 @@ public void testStartIndexWithDifferentOrder() TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); assertTrue( - BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) .isStale()); } @@ -598,7 +603,7 @@ void testStarTreeIndexWithSkipDimCols() TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); assertTrue( - BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) .isStale()); } @@ -621,7 +626,7 @@ void testStarTreeIndexWithDiffOrderSkipDimCols() TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); assertFalse( - BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) .isStale()); } @@ -642,7 +647,7 @@ void testStarTreeIndexRemoveSkipDimCols() TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); assertTrue( - BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) .isStale()); } @@ -662,7 +667,7 @@ void testStarTreeIndexAddAggFn() TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); assertTrue( - BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) .isStale()); } @@ -683,7 +688,7 @@ void testStarTreeIndexDiffOrderAggFn() TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); assertFalse( - BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) .isStale()); } @@ -705,7 +710,7 @@ void testStarTreeIndexRemoveAggFn() TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); assertTrue( - BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) .isStale()); } @@ -727,7 +732,7 @@ void testStarTreeIndexNewMetricAgg() TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); assertTrue( - BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) .isStale()); } @@ -751,7 +756,7 @@ void testStarTreeIndexDiffOrderAggFn2() TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); assertFalse( - BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) .isStale()); } @@ -771,7 +776,7 @@ void testStarTreeIndexMaxLeafNode() TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); assertTrue( - BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) .isStale()); } @@ -784,7 +789,7 @@ void testStarTreeIndexRemove() TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); ImmutableSegmentDataManager segmentDataManager = createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, SCHEMA), _testName, generateRows()); - assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, segmentDataManager).isStale()); + assertTrue(_baseTableDataManager.isSegmentStale(INDEX_LOADING_CONFIG, segmentDataManager).isStale()); } @Test @@ -805,7 +810,7 @@ void testStarTreeIndexAddMultiple() TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig, newStarTreeIndexConfig)).build(); assertTrue( - BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) .isStale()); } @@ -824,7 +829,7 @@ void testStarTreeIndexEnableDefault() TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); newTableConfig.getIndexingConfig().setEnableDefaultStarTree(true); assertTrue( - BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) .isStale()); } @@ -840,7 +845,7 @@ void testStarTreeIndexNoChanges() IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(tableConfig, SCHEMA); ImmutableSegmentDataManager segmentDataManager = createImmutableSegmentDataManager(indexLoadingConfig, _testName, generateRows()); - assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, segmentDataManager).isStale()); + assertFalse(_baseTableDataManager.isSegmentStale(indexLoadingConfig, segmentDataManager).isStale()); } @Test @@ -859,7 +864,7 @@ void testStarTreeIndexDisableDefault() TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); newTableConfig.getIndexingConfig().setEnableDefaultStarTree(false); assertTrue( - BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + _baseTableDataManager.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) .isStale()); } @@ -877,7 +882,7 @@ void testTimestampIndexNoChange() ImmutableSegmentDataManager segmentDataManager = createImmutableSegmentDataManager(indexLoadingConfig, _testName, generateRows()); - assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, segmentDataManager).isStale()); + assertFalse(_baseTableDataManager.isSegmentStale(indexLoadingConfig, segmentDataManager).isStale()); } @Test @@ -894,7 +899,7 @@ void testTimestampIndexAdded() // Segment was created without the timestamp index. StaleSegment response = - BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfigWithTimestampIndex, IMMUTABLE_SEGMENT_DATA_MANAGER); + _baseTableDataManager.isSegmentStale(indexLoadingConfigWithTimestampIndex, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "timestamp index changed"); } @@ -917,7 +922,7 @@ void testTimestampIndexRemoved() // Evaluate staleness against a fresh config with no timestamp index. IndexLoadingConfig indexLoadingConfigNoTimestamp = new IndexLoadingConfig(getTableConfigBuilder().build(), getSchema()); - StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfigNoTimestamp, segmentDataManager); + StaleSegment response = _baseTableDataManager.isSegmentStale(indexLoadingConfigNoTimestamp, segmentDataManager); assertTrue(response.isStale()); assertEquals(response.getReason(), "timestamp index changed"); } @@ -942,7 +947,7 @@ void testTimestampIndexGranularityAdded() getTableConfigBuilder().setFieldConfigList(List.of(fieldConfigDayAndWeek)).build(); IndexLoadingConfig indexLoadingConfigDayAndWeek = new IndexLoadingConfig(tableConfigDayAndWeek, getSchema()); - StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfigDayAndWeek, segmentDataManager); + StaleSegment response = _baseTableDataManager.isSegmentStale(indexLoadingConfigDayAndWeek, segmentDataManager); assertTrue(response.isStale()); assertEquals(response.getReason(), "timestamp index changed"); } @@ -968,7 +973,7 @@ void testTimestampIndexGranularityRemoved() TableConfig tableConfigDay = getTableConfigBuilder().setFieldConfigList(List.of(fieldConfigDay)).build(); IndexLoadingConfig indexLoadingConfigDay = new IndexLoadingConfig(tableConfigDay, getSchema()); - StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfigDay, segmentDataManager); + StaleSegment response = _baseTableDataManager.isSegmentStale(indexLoadingConfigDay, segmentDataManager); assertTrue(response.isStale()); assertEquals(response.getReason(), "timestamp index changed"); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java index a21c210616ce..a98f4d8b00bc 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java @@ -890,32 +890,52 @@ public void decrypt(File origFile, File decFile) { } } - static OfflineTableDataManager createTableManager() { + protected BaseTableDataManager createTableManager() { return createTableManager(createDefaultInstanceDataManagerConfig()); } - static OfflineTableDataManager createTableManagerWithAsyncSegmentRefreshEnabled() { + protected BaseTableDataManager createTableManagerWithAsyncSegmentRefreshEnabled() { return createTableManagerWithAsyncSegmentRefreshEnabled(createDefaultInstanceDataManagerConfig()); } - private static OfflineTableDataManager createTableManager(InstanceDataManagerConfig instanceDataManagerConfig) { - OfflineTableDataManager tableDataManager = new OfflineTableDataManager(); - tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), DEFAULT_TABLE_CONFIG, + protected BaseTableDataManager createTableManager(InstanceDataManagerConfig instanceDataManagerConfig) { + BaseTableDataManager tableDataManager = newTableDataManager(); + tableDataManager.init(instanceDataManagerConfig, createHelixManagerMock(), new SegmentLocks(), DEFAULT_TABLE_CONFIG, SCHEMA, new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null, SEGMENT_OPERATIONS_THROTTLER, false, mock(ServerReloadJobStatusCache.class)); return tableDataManager; } - private static OfflineTableDataManager createTableManagerWithAsyncSegmentRefreshEnabled( + protected BaseTableDataManager createTableManagerWithAsyncSegmentRefreshEnabled( InstanceDataManagerConfig instanceDataManagerConfig) { - OfflineTableDataManager tableDataManager = new OfflineTableDataManager(); - tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), DEFAULT_TABLE_CONFIG, + BaseTableDataManager tableDataManager = newTableDataManager(); + tableDataManager.init(instanceDataManagerConfig, createHelixManagerMock(), new SegmentLocks(), DEFAULT_TABLE_CONFIG, SCHEMA, new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null, SEGMENT_OPERATIONS_THROTTLER, true, mock(ServerReloadJobStatusCache.class)); return tableDataManager; } - private static InstanceDataManagerConfig createDefaultInstanceDataManagerConfig() { + /** + * Returns the concrete {@link BaseTableDataManager} instance under test. Default returns a stock + * {@link OfflineTableDataManager}; subclasses override to test a different implementation while inheriting + * all test bodies. + */ + protected BaseTableDataManager newTableDataManager() { + return new OfflineTableDataManager(); + } + + /** + * Returns the {@link HelixManager} mock wired into the TDM under test. Default returns a bare Mockito mock + * (no property store stubbed) — fine for the inherited test bodies which never read ZK directly. Subclasses + * that exercise paths reading {@code _propertyStore} (e.g. {@code fetchZKMetadata}, {@code fetchIndexLoadingConfig}) + * override to stub {@code helixManager.getHelixPropertyStore()} with a {@code FakePropertyStore} pre-seeded + * with table config + schema + per-segment ZK metadata. + */ + protected HelixManager createHelixManagerMock() { + return mock(HelixManager.class); + } + + protected static InstanceDataManagerConfig createDefaultInstanceDataManagerConfig() { InstanceDataManagerConfig config = mock(InstanceDataManagerConfig.class); when(config.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); // Check CRC matching on segment load time. @@ -923,7 +943,7 @@ private static InstanceDataManagerConfig createDefaultInstanceDataManagerConfig( return config; } - private static File createSegment(SegmentVersion segmentVersion, int numRows) + protected static File createSegment(SegmentVersion segmentVersion, int numRows) throws Exception { SegmentGeneratorConfig config = new SegmentGeneratorConfig(DEFAULT_TABLE_CONFIG, SCHEMA); config.setOutDir(TABLE_DATA_DIR.getAbsolutePath()); @@ -942,14 +962,14 @@ private static File createSegment(SegmentVersion segmentVersion, int numRows) return new File(TABLE_DATA_DIR, SEGMENT_NAME); } - private static SegmentZKMetadata createRawSegment(SegmentVersion segmentVersion, int numRows) + protected static SegmentZKMetadata createRawSegment(SegmentVersion segmentVersion, int numRows) throws Exception { File indexDir = createSegment(segmentVersion, numRows); return makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION), true); } - private static SegmentZKMetadata makeRawSegment(File indexDir, File rawSegmentFile, boolean deleteIndexDir) + protected static SegmentZKMetadata makeRawSegment(File indexDir, File rawSegmentFile, boolean deleteIndexDir) throws Exception { long crc = getCRC(indexDir); SegmentZKMetadata zkMetadata = new SegmentZKMetadata(SEGMENT_NAME); @@ -962,7 +982,7 @@ private static SegmentZKMetadata makeRawSegment(File indexDir, File rawSegmentFi return zkMetadata; } - private static long getCRC(File indexDir) + protected static long getCRC(File indexDir) throws IOException { File creationMetaFile = SegmentDirectoryPaths.findCreationMetaFile(indexDir); assertNotNull(creationMetaFile); @@ -971,7 +991,7 @@ private static long getCRC(File indexDir) } } - private IndexLoadingConfig createTierIndexLoadingConfig(TableConfig tableConfig) { + protected IndexLoadingConfig createTierIndexLoadingConfig(TableConfig tableConfig) { InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class); when(instanceDataManagerConfig.getSegmentDirectoryLoader()).thenReturn(TIER_SEGMENT_DIRECTORY_LOADER); when(instanceDataManagerConfig.getConfig()).thenReturn(new PinotConfiguration()); @@ -981,7 +1001,7 @@ private IndexLoadingConfig createTierIndexLoadingConfig(TableConfig tableConfig) return indexLoadingConfig; } - private ImmutableSegmentDataManager createImmutableSegmentDataManager(String segmentName, long crc) { + protected ImmutableSegmentDataManager createImmutableSegmentDataManager(String segmentName, long crc) { ImmutableSegmentDataManager segmentDataManager = mock(ImmutableSegmentDataManager.class); when(segmentDataManager.getSegmentName()).thenReturn(segmentName); ImmutableSegment immutableSegment = mock(ImmutableSegment.class); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index 15a8a7565e6f..10c4d414cdec 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -194,6 +194,13 @@ void offloadSegment(String segmentName) void offloadSegmentUnsafe(String segmentName) throws Exception; + /** + * Deletes a segment from this table — offloads it if currently loaded, then removes its on-disk data (the per-segment + * directory and any tier-specific artefacts via {@link org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader}). + */ + void deleteSegment(String segmentName) + throws Exception; + /** * Reloads an existing immutable segment for the table, which can be an OFFLINE or REALTIME table. * A new segment may be downloaded if the local one has a different CRC; or can be forced to download if diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 9d7464cca970..591eb8126591 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -48,6 +48,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; +import org.apache.pinot.core.data.manager.BaseTableDataManager; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.LogicalTableContext; import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider; @@ -64,9 +65,6 @@ import org.apache.pinot.segment.local.utils.ServerReloadJobStatusCache; import org.apache.pinot.segment.local.utils.TableConfigUtils; import org.apache.pinot.segment.spi.SegmentMetadata; -import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader; -import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext; -import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.LogicalTableConfig; @@ -383,38 +381,26 @@ public void offloadSegment(String tableNameWithType, String segmentName) public void deleteSegment(String tableNameWithType, String segmentName) throws Exception { LOGGER.info("Deleting segment: {} from table: {}", segmentName, tableNameWithType); - // Segment deletion is handled at instance level because table data manager might not exist. Acquire the lock here. + TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType); + if (tableDataManager != null) { + // The TDM owns the per-segment lock, the offload-if-loaded prelude, the on-disk dir delete, and the tier-aware + // segment-directory-loader cleanup. + tableDataManager.deleteSegment(segmentName); + } else { + // Fallback: TDM can be null if it was never instantiated, or has already been removed via deleteTable. + // In that case, do a path-only cleanup keyed by segment name. + deleteSegmentFilesFallback(tableNameWithType, segmentName); + LOGGER.info("Deleted segment: {} from table: {} ", segmentName, tableNameWithType); + } + } + + private void deleteSegmentFilesFallback(String tableNameWithType, String segmentName) + throws Exception { Lock segmentLock = _segmentLocks.getLock(tableNameWithType, segmentName); segmentLock.lock(); try { - // Check if the segment is still loaded, if so, offload it first. - // This might happen when the server disconnected from ZK and reconnected, and the segment is still loaded. - // TODO: Consider using table data manager to delete the segment. This will allow the table data manager to clean - // up the segment data on all tiers. Note that table data manager might have not been created, and table - // config might have been deleted at this point. - TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType); - if (tableDataManager != null && tableDataManager.hasSegment(segmentName)) { - LOGGER.warn("Segment: {} from table: {} is still loaded, offloading it first", segmentName, tableNameWithType); - tableDataManager.offloadSegment(segmentName); - } - // Clean up the segment data on default tier unconditionally. - File segmentDir = getSegmentDataDirectory(tableNameWithType, segmentName); - if (segmentDir.exists()) { - FileUtils.deleteQuietly(segmentDir); - LOGGER.info("Deleted segment directory {} on default tier", segmentDir); - } - // We might clean up further more with the specific segment loader. But note that table data manager might have - // not been created, and table config might have been deleted at this point. - SegmentDirectoryLoader segmentLoader = SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader( - _instanceDataManagerConfig.getSegmentDirectoryLoader()); - if (segmentLoader != null) { - LOGGER.info("Deleting segment: {} further with segment loader: {}", segmentName, - _instanceDataManagerConfig.getSegmentDirectoryLoader()); - SegmentDirectoryLoaderContext ctx = new SegmentDirectoryLoaderContext.Builder().setSegmentName(segmentName) - .setTableDataDir(_instanceDataManagerConfig.getInstanceDataDir() + "/" + tableNameWithType).build(); - segmentLoader.delete(ctx); - } - LOGGER.info("Deleted segment: {} from table: {}", segmentName, tableNameWithType); + String tableDataDir = _instanceDataManagerConfig.getInstanceDataDir() + "/" + tableNameWithType; + BaseTableDataManager.deleteSegmentFilesFromDisk(tableDataDir, segmentName, _instanceDataManagerConfig); } finally { segmentLock.unlock(); } From 0eb9597afb68e6142c8b7c869c360930dfce81ab Mon Sep 17 00:00:00 2001 From: Krishan Goyal Date: Wed, 6 May 2026 17:31:05 +0530 Subject: [PATCH 4/5] Cleanup deleteSegment --- .../data/manager/BaseTableDataManager.java | 27 +++++++++------- .../helix/HelixInstanceDataManager.java | 31 +++++++++---------- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 5d47d700bad7..88f5cce7c481 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -606,12 +606,6 @@ protected void doOffloadSegment(String segmentName) { } } - /** - * Default per-segment delete: takes the segment lock, offloads if currently loaded (using the standard - * {@link #offloadSegment(String)} entry point — the per-segment lock is a {@link - * java.util.concurrent.locks.ReentrantLock}, so re-acquiring it inside {@code offloadSegment} is safe), then removes - * the on-disk directory and any tier-specific artefacts via {@link #deleteSegmentFilesFromDisk}. - */ @Override public void deleteSegment(String segmentName) throws Exception { @@ -619,17 +613,26 @@ public void deleteSegment(String segmentName) Lock segmentLock = getSegmentLock(segmentName); segmentLock.lock(); try { - if (hasSegment(segmentName)) { - _logger.warn("Segment: {} is still loaded, offloading it before delete", segmentName); - offloadSegment(segmentName); - } - deleteSegmentFilesFromDisk(_tableDataDir, segmentName, _instanceDataManagerConfig); - _logger.info("Deleted segment: {}", segmentName); + doDeleteSegment(segmentName); + } catch (Exception e) { + addSegmentError(segmentName, + new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception while deleting segment", e)); + throw e; } finally { segmentLock.unlock(); } } + protected void doDeleteSegment(String segmentName) + throws Exception { + if (hasSegment(segmentName)) { + _logger.warn("Segment: {} is still loaded, offloading it before delete", segmentName); + offloadSegment(segmentName); + } + deleteSegmentFilesFromDisk(_tableDataDir, segmentName, _instanceDataManagerConfig); + _logger.info("Deleted segment: {}", segmentName); + } + /** * Single source of truth for on-disk cleanup of a physical segment. Removes the segment directory under * {@code tableDataDir} and invokes the configured {@link SegmentDirectoryLoader#delete} for tier-aware cleanup. diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 591eb8126591..4bbd90ae876a 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -381,26 +381,23 @@ public void offloadSegment(String tableNameWithType, String segmentName) public void deleteSegment(String tableNameWithType, String segmentName) throws Exception { LOGGER.info("Deleting segment: {} from table: {}", segmentName, tableNameWithType); - TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType); - if (tableDataManager != null) { - // The TDM owns the per-segment lock, the offload-if-loaded prelude, the on-disk dir delete, and the tier-aware - // segment-directory-loader cleanup. - tableDataManager.deleteSegment(segmentName); - } else { - // Fallback: TDM can be null if it was never instantiated, or has already been removed via deleteTable. - // In that case, do a path-only cleanup keyed by segment name. - deleteSegmentFilesFallback(tableNameWithType, segmentName); - LOGGER.info("Deleted segment: {} from table: {} ", segmentName, tableNameWithType); - } - } - - private void deleteSegmentFilesFallback(String tableNameWithType, String segmentName) - throws Exception { + // Hold the per-segment lock around the TDM lookup so the lookup + delete is atomic vs. removeTableDataManager + // shutting the TDM down concurrently. The TDM's own deleteSegment re-acquires the same lock (ReentrantLock). Lock segmentLock = _segmentLocks.getLock(tableNameWithType, segmentName); segmentLock.lock(); try { - String tableDataDir = _instanceDataManagerConfig.getInstanceDataDir() + "/" + tableNameWithType; - BaseTableDataManager.deleteSegmentFilesFromDisk(tableDataDir, segmentName, _instanceDataManagerConfig); + TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType); + if (tableDataManager != null) { + // The TDM owns the offload-if-loaded prelude, the on-disk dir delete, and the tier-aware + // segment-directory-loader cleanup. + tableDataManager.deleteSegment(segmentName); + } else { + // Fallback: TDM can be null if it was never instantiated, or has already been removed via deleteTable. + // In that case, do a path-only cleanup keyed by segment name. + String tableDataDir = _instanceDataManagerConfig.getInstanceDataDir() + "/" + tableNameWithType; + BaseTableDataManager.deleteSegmentFilesFromDisk(tableDataDir, segmentName, _instanceDataManagerConfig); + LOGGER.info("Deleted segment: {} from table: {}", segmentName, tableNameWithType); + } } finally { segmentLock.unlock(); } From 953952b5ab5443e800bda503e547c4aa18be7c63 Mon Sep 17 00:00:00 2001 From: Krishan Goyal Date: Wed, 6 May 2026 19:53:34 +0530 Subject: [PATCH 5/5] Move offloadSegment to deleteSegment --- .../pinot/core/data/manager/BaseTableDataManager.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 88f5cce7c481..a33d9865aece 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -613,6 +613,10 @@ public void deleteSegment(String segmentName) Lock segmentLock = getSegmentLock(segmentName); segmentLock.lock(); try { + if (hasSegment(segmentName)) { + _logger.warn("Segment: {} is still loaded, offloading it before delete", segmentName); + offloadSegment(segmentName); + } doDeleteSegment(segmentName); } catch (Exception e) { addSegmentError(segmentName, @@ -625,10 +629,6 @@ public void deleteSegment(String segmentName) protected void doDeleteSegment(String segmentName) throws Exception { - if (hasSegment(segmentName)) { - _logger.warn("Segment: {} is still loaded, offloading it before delete", segmentName); - offloadSegment(segmentName); - } deleteSegmentFilesFromDisk(_tableDataDir, segmentName, _instanceDataManagerConfig); _logger.info("Deleted segment: {}", segmentName); } @@ -656,7 +656,7 @@ public static void deleteSegmentFilesFromDisk(String tableDataDir, String segmen File segmentDir = new File(tableDataDir, segmentName); if (segmentDir.exists()) { FileUtils.deleteQuietly(segmentDir); - LOGGER.info("Deleted segment directory: {}", segmentDir); + LOGGER.info("Deleted segment directory {} on default tier", segmentDir); } SegmentDirectoryLoader segmentLoader = SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(instanceConfig.getSegmentDirectoryLoader());