Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,19 @@ public static SegmentZKMetadata getSegmentZKMetadata(ZkHelixPropertyStore<ZNReco
return znRecord != null ? new SegmentZKMetadata(znRecord) : null;
}

/**
* Reads the segment ZK metadata and populates {@code stat} with the znode's current version (and other ZK stat
* fields) so the caller can perform a version-checked write via
* {@link #setSegmentZKMetadata(ZkHelixPropertyStore, String, SegmentZKMetadata, int)}.
*/
@Nullable
public static SegmentZKMetadata getSegmentZKMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore,
String tableNameWithType, String segmentName, Stat stat) {
ZNRecord znRecord = propertyStore.get(constructPropertyStorePathForSegment(tableNameWithType, segmentName), stat,
AccessOption.PERSISTENT);
return znRecord != null ? new SegmentZKMetadata(znRecord) : null;
}

@Nullable
public static UserConfig getUserConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String username) {
ZNRecord znRecord =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@ public void setUseDataCrc(boolean useDataCrc) {
}
}

public boolean isBeingDeleted() {
return Boolean.parseBoolean(_simpleFields.get(Segment.IS_BEING_DELETED));
}

public void setBeingDeleted(boolean beingDeleted) {
if (beingDeleted) {
_simpleFields.put(Segment.IS_BEING_DELETED, "true");
} else {
_simpleFields.remove(Segment.IS_BEING_DELETED);
}
}

public String getTier() {
return _simpleFields.get(Segment.TIER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,26 @@ public void offlineSegmentZKMetadataConvertionTest() {
new SegmentZKMetadata(offlineSegmentMetadata.toZNRecord()).hashCode());
}

@Test
public void beingDeletedFlagRoundTripTest() {
// Default-on-missing: a freshly built ZNRecord with no key reads as false.
SegmentZKMetadata fresh = new SegmentZKMetadata(new ZNRecord("seg"));
Assert.assertFalse(fresh.isBeingDeleted());
Assert.assertFalse(fresh.toZNRecord().getSimpleFields().containsKey(CommonConstants.Segment.IS_BEING_DELETED));

// Set true: round-trips through ZNRecord and the simple field is "true".
fresh.setBeingDeleted(true);
ZNRecord serialized = fresh.toZNRecord();
assertEquals(serialized.getSimpleField(CommonConstants.Segment.IS_BEING_DELETED), "true");
SegmentZKMetadata reparsed = new SegmentZKMetadata(serialized);
Assert.assertTrue(reparsed.isBeingDeleted());

// Set false: the key is REMOVED (not stored as "false") to keep unrelated znodes clean.
reparsed.setBeingDeleted(false);
Assert.assertFalse(reparsed.isBeingDeleted());
Assert.assertFalse(reparsed.toZNRecord().getSimpleFields().containsKey(CommonConstants.Segment.IS_BEING_DELETED));
}

@Test
public void segmentPartitionMetadataTest()
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,7 @@ public PinotResourceManagerResponse deleteSegments(String tableNameWithType, Lis
LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, tableNameWithType);
Preconditions.checkArgument(TableNameBuilder.isTableResource(tableNameWithType),
"Table name: %s is not a valid table name with type suffix", tableNameWithType);
markSegmentsAsBeingDeleted(tableNameWithType, segmentNames);
HelixHelper.removeSegmentsFromIdealState(_helixZkManager, tableNameWithType, segmentNames);
if (retentionPeriod != null) {
_segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames,
Expand All @@ -1074,6 +1075,28 @@ public PinotResourceManagerResponse deleteSegments(String tableNameWithType, Lis
}
}

/**
* Sets the {@code isBeingDeleted} flag on each segment's ZK metadata via a version-checked write so concurrent
* operations can observe an in-flight deletion and skip the segment. The flag is advisory; the segment znode and
* deep-store cleanup still happen in the subsequent steps. Aborts on the first version mismatch — the caller is
* expected to retry on the next tick.
*/
private void markSegmentsAsBeingDeleted(String tableNameWithType, List<String> segmentNames) {
Comment thread
krishan1390 marked this conversation as resolved.
for (String segmentName : segmentNames) {
Stat stat = new Stat();
SegmentZKMetadata metadata =
ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, tableNameWithType, segmentName, stat);
if (metadata == null || metadata.isBeingDeleted()) {
continue;
}
metadata.setBeingDeleted(true);
if (!ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, tableNameWithType, metadata, stat.getVersion())) {
throw new IllegalStateException(
"Failed to mark segment: " + segmentName + " of table: " + tableNameWithType + " as being deleted");
}
}
}

/**
* Delete a single segment from ideal state and remove it from the local storage.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,42 @@ public void testUpdateTargetTier()
assertTrue(tierToSegmentsMap.isEmpty());
}

@Test
public void testDeleteSegmentsMarksFlagBeforeIdealStateRemoval()
throws Exception {
addDummySchema(RAW_TABLE_NAME);
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME)
.setServerTenant(SERVER_TENANT_NAME).build();
waitForEVToDisappear(tableConfig.getTableName());
_helixResourceManager.addTable(tableConfig);

String segmentName = "segDeleteFlag";
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, segmentName),
getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, segmentName));

// Sanity: znode exists with the flag unset and the segment is in the IS.
SegmentZKMetadata before =
ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, segmentName);
assertNotNull(before);
assertFalse(before.isBeingDeleted());
assertTrue(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getPartitionSet().contains(segmentName));

// SegmentDeletionManager schedules znode removal via a delayed executor task, so the segment znode (with whatever
// simpleFields the controller wrote during deleteSegments) remains observable for a short window after the call.
assertTrue(_helixResourceManager.deleteSegments(OFFLINE_TABLE_NAME, List.of(segmentName)).isSuccessful());

// After deleteSegments returns: the segment is gone from the Ideal State and the znode is marked.
assertFalse(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getPartitionSet().contains(segmentName));
SegmentZKMetadata after =
ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, segmentName);
assertNotNull(after, "Segment znode should still exist (deletion is scheduled with a delay)");
assertTrue(after.isBeingDeleted(), "isBeingDeleted flag should be set on the znode after deleteSegments");

_helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
}

/**
* Tests the code path where a subset of merged segments (from the original segmentsTo list)
* is passed to the endReplace API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1955,6 +1955,13 @@ public static class Offline {
*/
public static final String SEGMENT_UPLOAD_START_TIME = "segment.upload.start.time";

/**
* Marker indicating the segment is mid-deletion. Set on the segment ZK metadata before the segment is removed
* from the Ideal State and before the segment znode is deleted, so concurrent operations can observe an in-flight
* deletion and skip the segment. Stored as a string (`"true"` or absent); absent is interpreted as `false`.
Comment thread
krishan1390 marked this conversation as resolved.
Outdated
*/
public static final String IS_BEING_DELETED = "segment.is.being.deleted";

public static final String SEGMENT_BACKUP_DIR_SUFFIX = ".segment.bak";
public static final String SEGMENT_TEMP_DIR_SUFFIX = ".segment.tmp";

Expand Down
Loading