diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index 00db262b8d4a..f8de515bf5b5 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -434,6 +434,19 @@ public static SegmentZKMetadata getSegmentZKMetadata(ZkHelixPropertyStore propertyStore, + String tableNameWithType, String segmentName, Stat stat) { + ZNRecord znRecord = propertyStore.get(constructPropertyStorePathForSegment(tableNameWithType, segmentName), stat, + AccessOption.PERSISTENT); + return znRecord != null ? new SegmentZKMetadata(znRecord) : null; + } + @Nullable public static UserConfig getUserConfig(ZkHelixPropertyStore propertyStore, String username) { ZNRecord znRecord = diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java index 730994cab78c..3a7af3b421ef 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java @@ -198,6 +198,18 @@ public void setUseDataCrc(boolean useDataCrc) { } } + public boolean isBeingDeleted() { + return Boolean.parseBoolean(_simpleFields.get(Segment.IS_BEING_DELETED)); + } + + public void setBeingDeleted(boolean beingDeleted) { + if (beingDeleted) { + _simpleFields.put(Segment.IS_BEING_DELETED, "true"); + } else { + _simpleFields.remove(Segment.IS_BEING_DELETED); + } + } + public String getTier() { return _simpleFields.get(Segment.TIER); } diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java index 5b9764c88da0..8091447fb4bc 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java @@ -81,6 +81,26 @@ public void offlineSegmentZKMetadataConvertionTest() { new SegmentZKMetadata(offlineSegmentMetadata.toZNRecord()).hashCode()); } + @Test + public void beingDeletedFlagRoundTripTest() { + // Default-on-missing: a freshly built ZNRecord with no key reads as false. + SegmentZKMetadata fresh = new SegmentZKMetadata(new ZNRecord("seg")); + Assert.assertFalse(fresh.isBeingDeleted()); + Assert.assertFalse(fresh.toZNRecord().getSimpleFields().containsKey(CommonConstants.Segment.IS_BEING_DELETED)); + + // Set true: round-trips through ZNRecord and the simple field is "true". + fresh.setBeingDeleted(true); + ZNRecord serialized = fresh.toZNRecord(); + assertEquals(serialized.getSimpleField(CommonConstants.Segment.IS_BEING_DELETED), "true"); + SegmentZKMetadata reparsed = new SegmentZKMetadata(serialized); + Assert.assertTrue(reparsed.isBeingDeleted()); + + // Set false: the key is REMOVED (not stored as "false") to keep unrelated znodes clean. + reparsed.setBeingDeleted(false); + Assert.assertFalse(reparsed.isBeingDeleted()); + Assert.assertFalse(reparsed.toZNRecord().getSimpleFields().containsKey(CommonConstants.Segment.IS_BEING_DELETED)); + } + @Test public void segmentPartitionMetadataTest() throws IOException { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index ddab54f6bb9c..436f3bec3df9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1059,6 +1059,7 @@ public PinotResourceManagerResponse deleteSegments(String tableNameWithType, Lis LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, tableNameWithType); Preconditions.checkArgument(TableNameBuilder.isTableResource(tableNameWithType), "Table name: %s is not a valid table name with type suffix", tableNameWithType); + markSegmentsAsBeingDeleted(tableNameWithType, segmentNames); HelixHelper.removeSegmentsFromIdealState(_helixZkManager, tableNameWithType, segmentNames); if (retentionPeriod != null) { _segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, @@ -1074,6 +1075,30 @@ public PinotResourceManagerResponse deleteSegments(String tableNameWithType, Lis } } + /** + * Sets the indicative {@code isBeingDeleted} flag on each segment's ZK metadata via a version-checked write so + * concurrent operations can observe an in-flight deletion and skip the segment. The flag is advisory only — it + * states that the segment is either being deleted or may be deleted in the near future, not that the segment is + * deleted. Subsequent IS removal and segment znode / deep-store cleanup may still fail or be retried later, so a + * flagged segment may remain present transiently. Aborts on the first version mismatch — the caller is expected + * to retry on the next tick. + */ + private void markSegmentsAsBeingDeleted(String tableNameWithType, List segmentNames) { + for (String segmentName : segmentNames) { + Stat stat = new Stat(); + SegmentZKMetadata metadata = + ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, tableNameWithType, segmentName, stat); + if (metadata == null || metadata.isBeingDeleted()) { + continue; + } + metadata.setBeingDeleted(true); + if (!ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, tableNameWithType, metadata, stat.getVersion())) { + throw new IllegalStateException( + "Failed to mark segment: " + segmentName + " of table: " + tableNameWithType + " as being deleted"); + } + } + } + /** * Delete a single segment from ideal state and remove it from the local storage. * diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java index bc643b8c92e4..52e4c99f1a00 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java @@ -1051,6 +1051,42 @@ public void testUpdateTargetTier() assertTrue(tierToSegmentsMap.isEmpty()); } + @Test + public void testDeleteSegmentsMarksFlagBeforeIdealStateRemoval() + throws Exception { + addDummySchema(RAW_TABLE_NAME); + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME) + .setServerTenant(SERVER_TENANT_NAME).build(); + waitForEVToDisappear(tableConfig.getTableName()); + _helixResourceManager.addTable(tableConfig); + + String segmentName = "segDeleteFlag"; + _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, segmentName), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, segmentName)); + + // Sanity: znode exists with the flag unset and the segment is in the IS. + SegmentZKMetadata before = + ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, segmentName); + assertNotNull(before); + assertFalse(before.isBeingDeleted()); + assertTrue(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getPartitionSet().contains(segmentName)); + + // SegmentDeletionManager schedules znode removal via a delayed executor task, so the segment znode (with whatever + // simpleFields the controller wrote during deleteSegments) remains observable for a short window after the call. + assertTrue(_helixResourceManager.deleteSegments(OFFLINE_TABLE_NAME, List.of(segmentName)).isSuccessful()); + + // After deleteSegments returns: the segment is gone from the Ideal State and the znode is marked. + assertFalse(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getPartitionSet().contains(segmentName)); + SegmentZKMetadata after = + ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, segmentName); + assertNotNull(after, "Segment znode should still exist (deletion is scheduled with a delay)"); + assertTrue(after.isBeingDeleted(), "isBeingDeleted flag should be set on the znode after deleteSegments"); + + _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME); + } + /** * Tests the code path where a subset of merged segments (from the original segmentsTo list) * is passed to the endReplace API. diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 916ca208a4aa..6e7d907d4bbd 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -1955,6 +1955,20 @@ public static class Offline { */ public static final String SEGMENT_UPLOAD_START_TIME = "segment.upload.start.time"; + /** + * Indicative (not authoritative) marker that the segment is either being deleted or may be deleted in the + * near future. Set on the segment ZK metadata before the segment is removed from the Ideal State and before + * the segment znode is deleted, so concurrent operations can observe an in-flight deletion and skip the segment. + * + *

The flag does NOT guarantee the segment is or will be deleted: the deletion may fail or be retried later, + * and a flagged segment may still be present in the Ideal State / External View transiently. Conversely, the + * absence of the flag does not guarantee the segment is live — it may be deleted by a later request. Treat this + * as a hint to avoid acting on a segment whose lifecycle is unstable, not as proof of its state. + * + *

Stored as a string (`"true"` or absent); absent is interpreted as `false`. + */ + public static final String IS_BEING_DELETED = "segment.is.being.deleted"; + public static final String SEGMENT_BACKUP_DIR_SUFFIX = ".segment.bak"; public static final String SEGMENT_TEMP_DIR_SUFFIX = ".segment.tmp";