Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,70 @@ protected void doOffloadSegment(String segmentName) {
}
}

@Override
public void deleteSegment(String segmentName)
throws Exception {
_logger.info("Deleting segment: {}", segmentName);
Lock segmentLock = getSegmentLock(segmentName);
segmentLock.lock();
try {
if (hasSegment(segmentName)) {
_logger.warn("Segment: {} is still loaded, offloading it before delete", segmentName);
offloadSegment(segmentName);
}
doDeleteSegment(segmentName);
} catch (Exception e) {
addSegmentError(segmentName,
new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception while deleting segment", e));
throw e;
} finally {
segmentLock.unlock();
}
}

protected void doDeleteSegment(String segmentName)
throws Exception {
deleteSegmentFilesFromDisk(_tableDataDir, segmentName, _instanceDataManagerConfig);
_logger.info("Deleted segment: {}", segmentName);
}

/**
* Single source of truth for on-disk cleanup of a physical segment. Removes the segment directory under
* {@code tableDataDir} and invokes the configured {@link SegmentDirectoryLoader#delete} for tier-aware cleanup.
*
* <p>Used by:
* <ul>
* <li>{@link #deleteSegment} — the TDM-driven path triggered by a *_TO_DROPPED state transition.</li>
* <li>{@code HelixInstanceDataManager.deleteSegment} fallback — when the per-table TDM is null because the
* table was never instantiated, or has already been removed via {@code deleteTable()}.</li>
* <li>TDM extensions that fan a single logical name out to multiple physical segments (e.g. segment-group
* member fan-out) — these reuse this helper directly while holding all required per-member locks
* acquired in lex order.</li>
* </ul>
*
* <p>Caller is responsible for holding the per-segment lock and for offloading the segment from the TDM if it
* is still loaded.
*/
public static void deleteSegmentFilesFromDisk(String tableDataDir, String segmentName,
InstanceDataManagerConfig instanceConfig)
throws Exception {
File segmentDir = new File(tableDataDir, segmentName);
if (segmentDir.exists()) {
FileUtils.deleteQuietly(segmentDir);
LOGGER.info("Deleted segment directory {} on default tier", segmentDir);
}
SegmentDirectoryLoader segmentLoader =
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(instanceConfig.getSegmentDirectoryLoader());
if (segmentLoader != null) {
LOGGER.info("Deleting segment: {} further with segment loader: {}", segmentName,
instanceConfig.getSegmentDirectoryLoader());
SegmentDirectoryLoaderContext ctx = new SegmentDirectoryLoaderContext.Builder().setSegmentName(segmentName)
.setTableDataDir(tableDataDir)
.build();
segmentLoader.delete(ctx);
}
}

@Override
public void reloadSegment(String segmentName, boolean forceDownload, String reloadJobId)
throws Exception {
Expand All @@ -616,12 +680,7 @@ public void reloadSegment(String segmentName, boolean forceDownload, String relo
SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName);
if (segmentDataManager != null) {
IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
_segmentReloadSemaphore.acquire(segmentName, _logger);
try {
reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
} finally {
_segmentReloadSemaphore.release();
}
reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
} else {
_logger.warn("Failed to find segment: {}, skipping reloading it", segmentName);
}
Expand Down Expand Up @@ -947,12 +1006,7 @@ protected void reloadSegments(List<SegmentDataManager> segmentDataManagers, Inde
CompletableFuture.allOf(segmentDataManagers.stream().map(segmentDataManager -> CompletableFuture.runAsync(() -> {
String segmentName = segmentDataManager.getSegmentName();
try {
_segmentReloadSemaphore.acquire(segmentName, _logger);
try {
reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
} finally {
_segmentReloadSemaphore.release();
}
reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
} catch (Throwable t) {
_logger.error("Caught exception while reloading segment: {}", segmentName, t);
failedSegments.add(segmentName);
Expand All @@ -973,36 +1027,41 @@ protected void reloadSegment(SegmentDataManager segmentDataManager, IndexLoading
boolean forceDownload)
throws Exception {
String segmentName = segmentDataManager.getSegmentName();
if (segmentDataManager instanceof RealtimeSegmentDataManager) {
// Use force commit to reload consuming segment
if (_instanceDataManagerConfig.shouldReloadConsumingSegment()) {
// Force-committing consuming segments is restricted only for tables with inconsistent state configs
// (partial-upsert or dropOutOfOrderRecord=true with replication > 1).
// For these tables, winner selection could incorrectly favor replicas with fewer consumed rows,
// triggering unnecessary reconsumption and resulting in inconsistent upsert state.
// To enable force commit for such tables, change the cluster config
// `pinot.server.consuming.segment.consistency.mode` to PROTECTED for safer reload.
TableConfig tableConfig = indexLoadingConfig.getTableConfig();
ConsumingSegmentConsistencyModeListener config = ConsumingSegmentConsistencyModeListener.getInstance();
boolean isTableTypeInconsistentDuringConsumption =
TableConfigUtils.isTableTypeInconsistentDuringConsumption(tableConfig);
// Allow force commit if:
// 1. Table doesn't have inconsistent configs (non-upsert or standard upsert tables), OR
// 2. Consistency mode is PROTECTED or UNSAFE (isForceCommitAllowed = true)
if (tableConfig == null || (isTableTypeInconsistentDuringConsumption && !config.isForceCommitAllowed())) {
_logger.warn("Skipping reload (force commit) on consuming segment: {} due to inconsistent state config. "
+ "Change the cluster config: {} to `PROTECTED` for safer commit", segmentName, config.getConfigKey());
_segmentReloadSemaphore.acquire(segmentName, _logger);
try {
if (segmentDataManager instanceof RealtimeSegmentDataManager) {
// Use force commit to reload consuming segment
if (_instanceDataManagerConfig.shouldReloadConsumingSegment()) {
// Force-committing consuming segments is restricted only for tables with inconsistent state configs
// (partial-upsert or dropOutOfOrderRecord=true with replication > 1).
// For these tables, winner selection could incorrectly favor replicas with fewer consumed rows,
// triggering unnecessary reconsumption and resulting in inconsistent upsert state.
// To enable force commit for such tables, change the cluster config
// `pinot.server.consuming.segment.consistency.mode` to PROTECTED for safer reload.
TableConfig tableConfig = indexLoadingConfig.getTableConfig();
ConsumingSegmentConsistencyModeListener config = ConsumingSegmentConsistencyModeListener.getInstance();
boolean isTableTypeInconsistentDuringConsumption =
TableConfigUtils.isTableTypeInconsistentDuringConsumption(tableConfig);
// Allow force commit if:
// 1. Table doesn't have inconsistent configs (non-upsert or standard upsert tables), OR
// 2. Consistency mode is PROTECTED or UNSAFE (isForceCommitAllowed = true)
if (tableConfig == null || (isTableTypeInconsistentDuringConsumption && !config.isForceCommitAllowed())) {
_logger.warn("Skipping reload (force commit) on consuming segment: {} due to inconsistent state config. "
+ "Change the cluster config: {} to `PROTECTED` for safer commit", segmentName, config.getConfigKey());
} else {
_logger.info("Reloading (force committing) consuming segment: {}", segmentName);
((RealtimeSegmentDataManager) segmentDataManager).forceCommit();
}
} else {
_logger.info("Reloading (force committing) consuming segment: {}", segmentName);
((RealtimeSegmentDataManager) segmentDataManager).forceCommit();
_logger.warn("Skip reloading consuming segment: {} as configured", segmentName);
}
} else {
_logger.warn("Skip reloading consuming segment: {} as configured", segmentName);
SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
SegmentMetadata localMetadata = segmentDataManager.getSegment().getSegmentMetadata();
reloadSegment(segmentName, indexLoadingConfig, zkMetadata, localMetadata, forceDownload);
}
} else {
SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
SegmentMetadata localMetadata = segmentDataManager.getSegment().getSegmentMetadata();
reloadSegment(segmentName, indexLoadingConfig, zkMetadata, localMetadata, forceDownload);
} finally {
_segmentReloadSemaphore.release();
}
}

Expand Down Expand Up @@ -1410,6 +1469,25 @@ protected void removeBackup(File indexDir)

@Override
public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) {
ImmutableSegment segment = tryLoadExistingSegmentInternal(zkMetadata, indexLoadingConfig);
if (segment == null) {
return false;
}
addSegment(segment, zkMetadata);
return true;
}

/**
* Just Loads a segment from the existing on-disk copy without registering it in {@code _segmentDataManagerMap} or
* invoking other hooks. Returns {@code null} when the on-disk copy is absent, has a stale CRC under
* {@code shouldCheckCRCOnSegmentLoad}, or fails to load — callers fall back to a fresh download in that case.
* Single-segment callers should use {@link #tryLoadExistingSegment} which performs the registration step.
* Multi-segment managers can compose this with {@link #downloadSegment} to load all members before wrapping them
* under a single map entry.
*/
@Nullable
protected ImmutableSegment tryLoadExistingSegmentInternal(SegmentZKMetadata zkMetadata,
IndexLoadingConfig indexLoadingConfig) {
String segmentName = zkMetadata.getSegmentName();
Preconditions.checkState(!_shutDown,
"Table data manager is already shut down, cannot load existing segment: %s of table: %s", segmentName,
Expand Down Expand Up @@ -1442,14 +1520,14 @@ public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoading
if (segmentMetadata == null) {
_logger.info("Segment: {} does not exist", segmentName);
closeSegmentDirectoryQuietly(segmentDirectory);
return false;
return null;
}
if (isSegmentStatusCompleted(zkMetadata) && !hasSameCRC(zkMetadata, segmentMetadata)) {
_logger.warn("Segment: {} has CRC changed from: {} to: {}", segmentName, segmentMetadata.getCrc(),
zkMetadata.getCrc());
if (_instanceDataManagerConfig.shouldCheckCRCOnSegmentLoad()) {
closeSegmentDirectoryQuietly(segmentDirectory);
return false;
return null;
}
_logger.info("Skipping CRC check for segment: {} as configured. Proceed to load segment.", segmentName);
}
Expand All @@ -1470,15 +1548,14 @@ public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoading
indexLoadingConfig, zkMetadata);
}
ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory, indexLoadingConfig);
addSegment(segment, zkMetadata);
_logger.info("Loaded existing segment: {} with CRC: {} on tier: {}", segmentName, zkMetadata.getCrc(),
TierConfigUtils.normalizeTierName(segmentTier));
return true;
return segment;
} catch (Exception e) {
_logger.error("Failed to load existing segment: {} with CRC: {} on tier: {}", segmentName, zkMetadata.getCrc(),
TierConfigUtils.normalizeTierName(segmentTier), e);
closeSegmentDirectoryQuietly(segmentDirectory);
return false;
return null;
}
}

Expand Down Expand Up @@ -1542,8 +1619,7 @@ public List<StaleSegment> getStaleSegments() {
return staleSegments;
}

@VisibleForTesting
StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig, SegmentDataManager segmentDataManager) {
protected StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig, SegmentDataManager segmentDataManager) {
TableConfig tableConfig = indexLoadingConfig.getTableConfig();
Schema schema = indexLoadingConfig.getSchema();
assert tableConfig != null && schema != null;
Expand Down
Loading
Loading