Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,70 @@ protected void doOffloadSegment(String segmentName) {
}
}

@Override
public void deleteSegment(String segmentName)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved from HelixInstanceDataManager to TableDataManager as this is the right place

throws Exception {
_logger.info("Deleting segment: {}", segmentName);
Lock segmentLock = getSegmentLock(segmentName);
segmentLock.lock();
try {
if (hasSegment(segmentName)) {
_logger.warn("Segment: {} is still loaded, offloading it before delete", segmentName);
offloadSegment(segmentName);
}
doDeleteSegment(segmentName);
} catch (Exception e) {
addSegmentError(segmentName,
new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception while deleting segment", e));
throw e;
} finally {
segmentLock.unlock();
}
}

protected void doDeleteSegment(String segmentName)
throws Exception {
deleteSegmentFilesFromDisk(_tableDataDir, segmentName, _instanceDataManagerConfig);
_logger.info("Deleted segment: {}", segmentName);
}

/**
* Single source of truth for on-disk cleanup of a physical segment. Removes the segment directory under
* {@code tableDataDir} and invokes the configured {@link SegmentDirectoryLoader#delete} for tier-aware cleanup.
*
* <p>Used by:
* <ul>
* <li>{@link #deleteSegment} — the TDM-driven path triggered by a *_TO_DROPPED state transition.</li>
* <li>{@code HelixInstanceDataManager.deleteSegment} fallback — when the per-table TDM is null because the
* table was never instantiated, or has already been removed via {@code deleteTable()}.</li>
* <li>TDM extensions that fan a single logical name out to multiple physical segments (e.g. segment-group
* member fan-out) — these reuse this helper directly while holding all required per-member locks
* acquired in lex order.</li>
* </ul>
*
* <p>Caller is responsible for holding the per-segment lock and for offloading the segment from the TDM if it
* is still loaded.
*/
public static void deleteSegmentFilesFromDisk(String tableDataDir, String segmentName,
InstanceDataManagerConfig instanceConfig)
throws Exception {
File segmentDir = new File(tableDataDir, segmentName);
if (segmentDir.exists()) {
FileUtils.deleteQuietly(segmentDir);
LOGGER.info("Deleted segment directory {} on default tier", segmentDir);
}
SegmentDirectoryLoader segmentLoader =
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(instanceConfig.getSegmentDirectoryLoader());
if (segmentLoader != null) {
LOGGER.info("Deleting segment: {} further with segment loader: {}", segmentName,
instanceConfig.getSegmentDirectoryLoader());
SegmentDirectoryLoaderContext ctx = new SegmentDirectoryLoaderContext.Builder().setSegmentName(segmentName)
.setTableDataDir(tableDataDir)
.build();
segmentLoader.delete(ctx);
}
}

@Override
public void reloadSegment(String segmentName, boolean forceDownload, String reloadJobId)
throws Exception {
Expand All @@ -616,12 +680,7 @@ public void reloadSegment(String segmentName, boolean forceDownload, String relo
SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName);
if (segmentDataManager != null) {
IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
_segmentReloadSemaphore.acquire(segmentName, _logger);
try {
reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
} finally {
_segmentReloadSemaphore.release();
}
reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the sempahore to inside reloadSegment

} else {
_logger.warn("Failed to find segment: {}, skipping reloading it", segmentName);
}
Expand Down Expand Up @@ -947,12 +1006,7 @@ protected void reloadSegments(List<SegmentDataManager> segmentDataManagers, Inde
CompletableFuture.allOf(segmentDataManagers.stream().map(segmentDataManager -> CompletableFuture.runAsync(() -> {
String segmentName = segmentDataManager.getSegmentName();
try {
_segmentReloadSemaphore.acquire(segmentName, _logger);
try {
reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
} finally {
_segmentReloadSemaphore.release();
}
reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
} catch (Throwable t) {
_logger.error("Caught exception while reloading segment: {}", segmentName, t);
failedSegments.add(segmentName);
Expand Down Expand Up @@ -1017,68 +1071,74 @@ public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingCon
Lock segmentLock = getSegmentLock(segmentName);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incase we get an exception here the above semaphore might remain acquired.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a try finally block before but it led to a larger diff due to indentation changes. Which made it harder to review.

I then checked that getSegmentLock shouldn't throw an exception and so removed the try finally.

Let me know what you prefer and I can udpate

segmentLock.lock();
try {
/*
Determines if a segment should be downloaded from deep storage based on:
1. A forced download flag.
2. The segment status being marked as "DONE" in ZK metadata and a CRC mismatch
between ZK metadata and local metadata CRC.
- The "DONE" status confirms that the COMMIT_END_METADATA call succeeded
and the segment is available in deep storage or with a peer before discarding
the local copy.

Otherwise:
- Copy the backup directory back to the original index directory.
- Continue loading the segment from the index directory.
*/
boolean shouldDownload =
forceDownload || (isSegmentStatusCompleted(zkMetadata) && !hasSameCRC(zkMetadata, localMetadata)
&& _instanceDataManagerConfig.shouldCheckCRCOnSegmentLoad());
if (shouldDownload) {
// Create backup directory to handle failure of segment reloading.
createBackup(indexDir);
if (forceDownload) {
_logger.info("Force downloading segment: {}", segmentName);
_segmentReloadSemaphore.acquire(segmentName, _logger);
Comment thread
krishan1390 marked this conversation as resolved.
Outdated
try {
/*
Determines if a segment should be downloaded from deep storage based on:
1. A forced download flag.
2. The segment status being marked as "DONE" in ZK metadata and a CRC mismatch
between ZK metadata and local metadata CRC.
- The "DONE" status confirms that the COMMIT_END_METADATA call succeeded
and the segment is available in deep storage or with a peer before discarding
the local copy.

Otherwise:
- Copy the backup directory back to the original index directory.
- Continue loading the segment from the index directory.
*/
boolean shouldDownload =
forceDownload || (isSegmentStatusCompleted(zkMetadata) && !hasSameCRC(zkMetadata, localMetadata)
&& _instanceDataManagerConfig.shouldCheckCRCOnSegmentLoad());
if (shouldDownload) {
// Create backup directory to handle failure of segment reloading.
createBackup(indexDir);
if (forceDownload) {
_logger.info("Force downloading segment: {}", segmentName);
} else {
_logger.info("Downloading segment: {} because its CRC has changed from: {} to: {}", segmentName,
localMetadata.getCrc(), zkMetadata.getCrc());
}
indexDir = downloadSegment(zkMetadata);
} else {
_logger.info("Downloading segment: {} because its CRC has changed from: {} to: {}", segmentName,
localMetadata.getCrc(), zkMetadata.getCrc());
}
indexDir = downloadSegment(zkMetadata);
} else {
_logger.info("Reloading existing segment: {} on tier: {}", segmentName,
TierConfigUtils.normalizeTierName(segmentTier));
SegmentDirectory segmentDirectory =
initSegmentDirectory(segmentName, String.valueOf(zkMetadata.getCrc()), indexLoadingConfig, zkMetadata);
// We should first try to reuse existing segment directory
if (canReuseExistingDirectoryForReload(zkMetadata, segmentTier, segmentDirectory, indexLoadingConfig)) {
_logger.info("Reloading segment: {} using existing segment directory as no reprocessing needed", segmentName);
// No reprocessing needed, reuse the same segment
ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory, indexLoadingConfig);
addSegment(segment, zkMetadata);
return;
}
// Create backup directory to handle failure of segment reloading.
createBackup(indexDir);
// The indexDir is empty after calling createBackup, as it's renamed to a backup directory.
// The SegmentDirectory should initialize accordingly. Like for SegmentLocalFSDirectory, it
// doesn't load anything from an empty indexDir, but gets the info to complete the copyTo.
try {
segmentDirectory.copyTo(indexDir);
} finally {
segmentDirectory.close();
_logger.info("Reloading existing segment: {} on tier: {}", segmentName,
TierConfigUtils.normalizeTierName(segmentTier));
SegmentDirectory segmentDirectory =
initSegmentDirectory(segmentName, String.valueOf(zkMetadata.getCrc()), indexLoadingConfig, zkMetadata);
// We should first try to reuse existing segment directory
if (canReuseExistingDirectoryForReload(zkMetadata, segmentTier, segmentDirectory, indexLoadingConfig)) {
_logger.info("Reloading segment: {} using existing segment directory as no reprocessing needed",
segmentName);
// No reprocessing needed, reuse the same segment
ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory, indexLoadingConfig);
addSegment(segment, zkMetadata);
return;
}
// Create backup directory to handle failure of segment reloading.
createBackup(indexDir);
// The indexDir is empty after calling createBackup, as it's renamed to a backup directory.
// The SegmentDirectory should initialize accordingly. Like for SegmentLocalFSDirectory, it
// doesn't load anything from an empty indexDir, but gets the info to complete the copyTo.
try {
segmentDirectory.copyTo(indexDir);
} finally {
segmentDirectory.close();
}
}
}

// Load from indexDir and replace the old segment in memory. What's inside indexDir
// may come from SegmentDirectory.copyTo() or the segment downloaded from deep store.
indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
_logger.info("Loading segment: {} from indexDir: {} to tier: {}", segmentName, indexDir,
TierConfigUtils.normalizeTierName(zkMetadata.getTier()));
ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
_segmentOperationsThrottlerSet, zkMetadata);
addSegment(segment, zkMetadata);

// Remove backup directory to mark the completion of segment reloading.
removeBackup(indexDir);
// Load from indexDir and replace the old segment in memory. What's inside indexDir
// may come from SegmentDirectory.copyTo() or the segment downloaded from deep store.
indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
_logger.info("Loading segment: {} from indexDir: {} to tier: {}", segmentName, indexDir,
TierConfigUtils.normalizeTierName(zkMetadata.getTier()));
ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
_segmentOperationsThrottlerSet, zkMetadata);
addSegment(segment, zkMetadata);

// Remove backup directory to mark the completion of segment reloading.
removeBackup(indexDir);
} finally {
_segmentReloadSemaphore.release();
}
} catch (Exception reloadFailureException) {
try {
LoaderUtils.reloadFailureRecovery(indexDir);
Expand Down Expand Up @@ -1410,6 +1470,25 @@ protected void removeBackup(File indexDir)

@Override
public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) {
ImmutableSegment segment = tryLoadExistingSegmentInternal(zkMetadata, indexLoadingConfig);
if (segment == null) {
return false;
}
addSegment(segment, zkMetadata);
return true;
}

/**
* Just Loads a segment from the existing on-disk copy without registering it in {@code _segmentDataManagerMap} or
* invoking other hooks. Returns {@code null} when the on-disk copy is absent, has a stale CRC under
* {@code shouldCheckCRCOnSegmentLoad}, or fails to load — callers fall back to a fresh download in that case.
* Single-segment callers should use {@link #tryLoadExistingSegment} which performs the registration step.
* Multi-segment managers can compose this with {@link #downloadSegment} to load all members before wrapping them
* under a single map entry.
*/
@Nullable
protected ImmutableSegment tryLoadExistingSegmentInternal(SegmentZKMetadata zkMetadata,
IndexLoadingConfig indexLoadingConfig) {
String segmentName = zkMetadata.getSegmentName();
Preconditions.checkState(!_shutDown,
"Table data manager is already shut down, cannot load existing segment: %s of table: %s", segmentName,
Expand Down Expand Up @@ -1442,14 +1521,14 @@ public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoading
if (segmentMetadata == null) {
_logger.info("Segment: {} does not exist", segmentName);
closeSegmentDirectoryQuietly(segmentDirectory);
return false;
return null;
}
if (isSegmentStatusCompleted(zkMetadata) && !hasSameCRC(zkMetadata, segmentMetadata)) {
_logger.warn("Segment: {} has CRC changed from: {} to: {}", segmentName, segmentMetadata.getCrc(),
zkMetadata.getCrc());
if (_instanceDataManagerConfig.shouldCheckCRCOnSegmentLoad()) {
closeSegmentDirectoryQuietly(segmentDirectory);
return false;
return null;
}
_logger.info("Skipping CRC check for segment: {} as configured. Proceed to load segment.", segmentName);
}
Expand All @@ -1470,15 +1549,14 @@ public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoading
indexLoadingConfig, zkMetadata);
}
ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory, indexLoadingConfig);
addSegment(segment, zkMetadata);
_logger.info("Loaded existing segment: {} with CRC: {} on tier: {}", segmentName, zkMetadata.getCrc(),
TierConfigUtils.normalizeTierName(segmentTier));
return true;
return segment;
} catch (Exception e) {
_logger.error("Failed to load existing segment: {} with CRC: {} on tier: {}", segmentName, zkMetadata.getCrc(),
TierConfigUtils.normalizeTierName(segmentTier), e);
closeSegmentDirectoryQuietly(segmentDirectory);
return false;
return null;
}
}

Expand Down Expand Up @@ -1542,8 +1620,7 @@ public List<StaleSegment> getStaleSegments() {
return staleSegments;
}

@VisibleForTesting
StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig, SegmentDataManager segmentDataManager) {
protected StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig, SegmentDataManager segmentDataManager) {
TableConfig tableConfig = indexLoadingConfig.getTableConfig();
Schema schema = indexLoadingConfig.getSchema();
assert tableConfig != null && schema != null;
Expand Down Expand Up @@ -1847,7 +1924,7 @@ protected SegmentDirectory initSegmentDirectory(String segmentName, String segme

// CRC check can be performed on both segment CRC and data CRC (if available) based on the ZK property value of
// useDataCRC.
protected static boolean hasSameCRC(SegmentZKMetadata zkMetadata, SegmentMetadata localMetadata) {
public static boolean hasSameCRC(SegmentZKMetadata zkMetadata, SegmentMetadata localMetadata) {
if (zkMetadata.getCrc() == Long.parseLong(localMetadata.getCrc())) {
return true;
}
Expand Down
Loading
Loading