Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,56 @@ protected void doOffloadSegment(String segmentName) {
}
}

@Override
public void deleteSegment(String segmentName)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved from HelixInstanceDataManager to TableDataManager as this is the right place

throws Exception {
_logger.info("Deleting segment: {}", segmentName);
Lock segmentLock = getSegmentLock(segmentName);
segmentLock.lock();
try {
if (hasSegment(segmentName)) {
_logger.warn("Segment: {} is still loaded, offloading it before delete", segmentName);
offloadSegment(segmentName);
}
doDeleteSegment(segmentName);
} catch (Exception e) {
addSegmentError(segmentName,
new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception while deleting segment", e));
throw e;
} finally {
segmentLock.unlock();
}
}

protected void doDeleteSegment(String segmentName)
throws Exception {
deleteSegmentFilesFromDisk(_tableDataDir, segmentName, _instanceDataManagerConfig);
_logger.info("Deleted segment: {}", segmentName);
}

/**
* Removes the segment directory locally and does tier-aware cleanup too
*/
public static void deleteSegmentFilesFromDisk(String tableDataDir, String segmentName,
InstanceDataManagerConfig instanceConfig)
throws Exception {
File segmentDir = new File(tableDataDir, segmentName);
if (segmentDir.exists()) {
FileUtils.deleteQuietly(segmentDir);
LOGGER.info("Deleted segment directory {} on default tier", segmentDir);
}
SegmentDirectoryLoader segmentLoader =
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(instanceConfig.getSegmentDirectoryLoader());
if (segmentLoader != null) {
LOGGER.info("Deleting segment: {} further with segment loader: {}", segmentName,
instanceConfig.getSegmentDirectoryLoader());
SegmentDirectoryLoaderContext ctx = new SegmentDirectoryLoaderContext.Builder().setSegmentName(segmentName)
.setTableDataDir(tableDataDir)
.build();
segmentLoader.delete(ctx);
}
}

@Override
public void reloadSegment(String segmentName, boolean forceDownload, String reloadJobId)
throws Exception {
Expand All @@ -616,12 +666,7 @@ public void reloadSegment(String segmentName, boolean forceDownload, String relo
SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName);
if (segmentDataManager != null) {
IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
_segmentReloadSemaphore.acquire(segmentName, _logger);
try {
reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
} finally {
_segmentReloadSemaphore.release();
}
reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the sempahore to inside reloadSegment

} else {
_logger.warn("Failed to find segment: {}, skipping reloading it", segmentName);
}
Expand Down Expand Up @@ -947,12 +992,7 @@ protected void reloadSegments(List<SegmentDataManager> segmentDataManagers, Inde
CompletableFuture.allOf(segmentDataManagers.stream().map(segmentDataManager -> CompletableFuture.runAsync(() -> {
String segmentName = segmentDataManager.getSegmentName();
try {
_segmentReloadSemaphore.acquire(segmentName, _logger);
try {
reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
} finally {
_segmentReloadSemaphore.release();
}
reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
} catch (Throwable t) {
_logger.error("Caught exception while reloading segment: {}", segmentName, t);
failedSegments.add(segmentName);
Expand Down Expand Up @@ -1016,7 +1056,10 @@ public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingCon
File indexDir = getSegmentDataDir(segmentName, segmentTier, indexLoadingConfig.getTableConfig());
Lock segmentLock = getSegmentLock(segmentName);
segmentLock.lock();
boolean semaphoreAcquired = false;
try {
_segmentReloadSemaphore.acquire(segmentName, _logger);
Comment thread
krishan1390 marked this conversation as resolved.
Outdated
semaphoreAcquired = true;
/*
Determines if a segment should be downloaded from deep storage based on:
1. A forced download flag.
Expand Down Expand Up @@ -1091,6 +1134,9 @@ public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingCon
reloadFailureException));
throw reloadFailureException;
} finally {
if (semaphoreAcquired) {
_segmentReloadSemaphore.release();
}
segmentLock.unlock();
}
_logger.info("Reloaded segment: {}", segmentName);
Expand Down Expand Up @@ -1410,6 +1456,22 @@ protected void removeBackup(File indexDir)

@Override
public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) {
ImmutableSegment segment = tryLoadExistingSegmentInternal(zkMetadata, indexLoadingConfig);
if (segment == null) {
return false;
}
addSegment(segment, zkMetadata);
return true;
}

/**
* Just Loads a segment from the existing on-disk copy without registering it in {@code _segmentDataManagerMap} or
* invoking other hooks.
* Returns {@code null} when the on-disk copy is absent, has a stale CRC under or fails to load
*/
@Nullable
protected ImmutableSegment tryLoadExistingSegmentInternal(SegmentZKMetadata zkMetadata,
IndexLoadingConfig indexLoadingConfig) {
String segmentName = zkMetadata.getSegmentName();
Preconditions.checkState(!_shutDown,
"Table data manager is already shut down, cannot load existing segment: %s of table: %s", segmentName,
Expand Down Expand Up @@ -1442,14 +1504,14 @@ public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoading
if (segmentMetadata == null) {
_logger.info("Segment: {} does not exist", segmentName);
closeSegmentDirectoryQuietly(segmentDirectory);
return false;
return null;
}
if (isSegmentStatusCompleted(zkMetadata) && !hasSameCRC(zkMetadata, segmentMetadata)) {
_logger.warn("Segment: {} has CRC changed from: {} to: {}", segmentName, segmentMetadata.getCrc(),
zkMetadata.getCrc());
if (_instanceDataManagerConfig.shouldCheckCRCOnSegmentLoad()) {
closeSegmentDirectoryQuietly(segmentDirectory);
return false;
return null;
}
_logger.info("Skipping CRC check for segment: {} as configured. Proceed to load segment.", segmentName);
}
Expand All @@ -1470,15 +1532,14 @@ public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoading
indexLoadingConfig, zkMetadata);
}
ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory, indexLoadingConfig);
addSegment(segment, zkMetadata);
_logger.info("Loaded existing segment: {} with CRC: {} on tier: {}", segmentName, zkMetadata.getCrc(),
TierConfigUtils.normalizeTierName(segmentTier));
return true;
return segment;
} catch (Exception e) {
_logger.error("Failed to load existing segment: {} with CRC: {} on tier: {}", segmentName, zkMetadata.getCrc(),
TierConfigUtils.normalizeTierName(segmentTier), e);
closeSegmentDirectoryQuietly(segmentDirectory);
return false;
return null;
}
}

Expand Down Expand Up @@ -1542,8 +1603,7 @@ public List<StaleSegment> getStaleSegments() {
return staleSegments;
}

@VisibleForTesting
StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig, SegmentDataManager segmentDataManager) {
protected StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig, SegmentDataManager segmentDataManager) {
TableConfig tableConfig = indexLoadingConfig.getTableConfig();
Schema schema = indexLoadingConfig.getSchema();
assert tableConfig != null && schema != null;
Expand Down Expand Up @@ -1847,7 +1907,7 @@ protected SegmentDirectory initSegmentDirectory(String segmentName, String segme

// CRC check can be performed on both segment CRC and data CRC (if available) based on the ZK property value of
// useDataCRC.
protected static boolean hasSameCRC(SegmentZKMetadata zkMetadata, SegmentMetadata localMetadata) {
public static boolean hasSameCRC(SegmentZKMetadata zkMetadata, SegmentMetadata localMetadata) {
if (zkMetadata.getCrc() == Long.parseLong(localMetadata.getCrc())) {
return true;
}
Expand Down
Loading
Loading