diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala index 0cb2f52a886f..c3ff821f0e1d 100644 --- a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala +++ b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala @@ -180,6 +180,18 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra .toString } + def extractChangeFeedStateForRanges + ( + changeFeedState: ChangeFeedState, + feedRanges: Seq[NormalizedRange] + ): Seq[String] = { + val cosmosRanges = feedRanges.map(toCosmosRange).asJava + changeFeedState + .extractForEffectiveRanges(cosmosRanges) + .asScala + .map(_.toString) + } + def parseChangeFeedState(changeFeedStateJsonString: String): ChangeFeedState = { assert(!Strings.isNullOrWhiteSpace(changeFeedStateJsonString), s"Argument 'changeFeedStateJsonString' must not be null or empty.") ChangeFeedState.fromString(changeFeedStateJsonString) diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedBatch.scala b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedBatch.scala index b27aafba9b79..cc0b4ee879ac 100644 --- a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedBatch.scala +++ b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedBatch.scala @@ -186,15 +186,16 @@ private class ChangeFeedBatch // Latest offset above has the EndLsn specified based on the point-in-time latest offset // For batch mode instead we need to reset it so that the change feed will get fully drained val parsedInitialOffset = SparkBridgeImplementationInternal.parseChangeFeedState(initialOffsetJson) - val inputPartitions = latestOffset - .inputPartitions - .get - .map(partition => partition - .withContinuationState( - SparkBridgeImplementationInternal - .extractChangeFeedStateForRange(parsedInitialOffset, partition.feedRange), - clearEndLsn = !hasBatchCheckpointLocation)) - .map(_.asInstanceOf[InputPartition]) + val partitions = latestOffset.inputPartitions.get + val continuationStates = SparkBridgeImplementationInternal + .extractChangeFeedStateForRanges(parsedInitialOffset, partitions.map(_.feedRange)) + val inputPartitions = partitions + .zip(continuationStates) + .map { case (partition, continuationState) => + partition + .withContinuationState(continuationState, clearEndLsn = !hasBatchCheckpointLocation) + .asInstanceOf[InputPartition] + } log.logInfo(s"<-- planInputPartitions $batchId (creating ${inputPartitions.length} partitions)") inputPartitions diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ChangeFeedStateTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ChangeFeedStateTest.java index 9e23cc5e6781..4f45b817ce73 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ChangeFeedStateTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ChangeFeedStateTest.java @@ -17,6 +17,7 @@ import java.nio.charset.StandardCharsets; import java.time.Instant; +import java.util.ArrayList; import java.util.Base64; import java.util.HashMap; import java.util.List; @@ -364,6 +365,410 @@ public void changeFeedState_merge() { .isEqualTo(continuationCCToEE); } + private ChangeFeedState createStateWithManyTokens(int tokenCount) { + String containerRid = "/cols/" + UUID.randomUUID(); + String pkRangeId = UUID.randomUUID().toString(); + FeedRangePartitionKeyRangeImpl feedRange = new FeedRangePartitionKeyRangeImpl(pkRangeId); + + StringBuilder continuationEntries = new StringBuilder(); + for (int i = 0; i < tokenCount; i++) { + if (i > 0) { + continuationEntries.append(","); + } + String min = String.format("%06X", i); + String max = String.format("%06X", i + 1); + String token = "token_" + i; + continuationEntries.append( + String.format("{\"token\":\"%s\",\"range\":{\"min\":\"%s\",\"max\":\"%s\"}}", token, min, max)); + } + + String continuationJson = String.format( + "{\"V\":1,\"Rid\":\"%s\",\"Continuation\":[%s],\"PKRangeId\":\"%s\"}", + containerRid, + continuationEntries, + pkRangeId); + + FeedRangeContinuation continuation = FeedRangeContinuation.convert(continuationJson); + return new ChangeFeedStateV1( + containerRid, + feedRange, + ChangeFeedMode.INCREMENTAL, + ChangeFeedStartFromInternal.createFromNow(), + continuation); + } + + @Test(groups = "unit") + public void changeFeedState_extractForEffectiveRange_multipleCallsProduceCorrectResults() { + // Verify that calling extractForEffectiveRanges (batch API) returns correct + // tokens for multiple ranges from the same continuation (sort-once). + int tokenCount = 100; + ChangeFeedState state = createStateWithManyTokens(tokenCount); + + List> ranges = new ArrayList<>(); + // First range: tokens 0-9 + String min0 = String.format("%06X", 0); + String max10 = String.format("%06X", 10); + ranges.add(new Range<>(min0, max10, true, false)); + + // Middle range: tokens 50-59 + String min50 = String.format("%06X", 50); + String max60 = String.format("%06X", 60); + ranges.add(new Range<>(min50, max60, true, false)); + + // Last range: tokens 90-99 + String min90 = String.format("%06X", 90); + String max100 = String.format("%06X", 100); + ranges.add(new Range<>(min90, max100, true, false)); + + // Single token + String min25 = String.format("%06X", 25); + String max26 = String.format("%06X", 26); + ranges.add(new Range<>(min25, max26, true, false)); + + List results = state.extractForEffectiveRanges(ranges); + assertThat(results).hasSize(4); + + // Verify first range: tokens 0-9 + List tokens1 = results.get(0).extractContinuationTokens(); + assertThat(tokens1).hasSize(10); + for (int i = 0; i < 10; i++) { + assertThat(tokens1.get(i).getToken()).isEqualTo("token_" + i); + assertThat(tokens1.get(i).getRange().getMin()).isEqualTo(String.format("%06X", i)); + assertThat(tokens1.get(i).getRange().getMax()).isEqualTo(String.format("%06X", i + 1)); + } + + // Verify middle range: tokens 50-59 + List tokens2 = results.get(1).extractContinuationTokens(); + assertThat(tokens2).hasSize(10); + for (int i = 0; i < 10; i++) { + assertThat(tokens2.get(i).getToken()).isEqualTo("token_" + (50 + i)); + } + + // Verify last range: tokens 90-99 + List tokens3 = results.get(2).extractContinuationTokens(); + assertThat(tokens3).hasSize(10); + for (int i = 0; i < 10; i++) { + assertThat(tokens3.get(i).getToken()).isEqualTo("token_" + (90 + i)); + } + + // Verify single token + List tokens4 = results.get(3).extractContinuationTokens(); + assertThat(tokens4).hasSize(1); + assertThat(tokens4.get(0).getToken()).isEqualTo("token_25"); + } + + @Test(groups = "unit") + public void changeFeedState_extractForEffectiveRange_largeScale() { + // Test with a moderate number of tokens to verify the optimization prevents + // quadratic behavior without being flaky on slow CI agents. + int tokenCount = 5000; + ChangeFeedState state = createStateWithManyTokens(tokenCount); + + // Build all ranges upfront outside the timed section + List> ranges = new ArrayList<>(tokenCount); + for (int i = 0; i < tokenCount; i++) { + ranges.add(new Range<>( + String.format("%06X", i), + String.format("%06X", i + 1), + true, false)); + } + + long startTime = System.nanoTime(); + + // Extract for all individual partition ranges at once (simulates Spark planning) + List results = state.extractForEffectiveRanges(ranges); + + assertThat(results).hasSize(tokenCount); + for (int i = 0; i < tokenCount; i++) { + List tokens = results.get(i).extractContinuationTokens(); + assertThat(tokens).hasSize(1); + assertThat(tokens.get(0).getToken()).isEqualTo("token_" + i); + } + + long elapsedMs = (System.nanoTime() - startTime) / 1_000_000; + + // Sanity check only: the optimized O(T log T + P log T) approach should + // complete in well under 60 seconds. + assertThat(elapsedMs) + .as("5,000 extractions took %d ms, should be < 60,000 ms", elapsedMs) + .isLessThan(60_000); + } + + @Test(groups = "unit") + public void changeFeedState_microbenchmark_30k_batchVsSingleCall() { + // Microbenchmark simulating a realistic Spark planning scenario with 30K feed ranges. + // Compares: + // (a) batch API: extractForEffectiveRanges (sort once + binary search per range) + // (b) single-call loop: extractForEffectiveRange per range (sort per call) + // Both should produce identical results; batch should be significantly faster. + int tokenCount = 30_000; + ChangeFeedState state = createStateWithManyTokens(tokenCount); + + // Build all ranges upfront + List> ranges = new ArrayList<>(tokenCount); + for (int i = 0; i < tokenCount; i++) { + ranges.add(new Range<>( + String.format("%06X", i), + String.format("%06X", i + 1), + true, false)); + } + + // Warmup: one small extraction to trigger class loading / JIT + state.extractForEffectiveRange(ranges.get(0)); + + // (a) Batch API + long batchStart = System.nanoTime(); + List batchResults = state.extractForEffectiveRanges(ranges); + long batchElapsedMs = (System.nanoTime() - batchStart) / 1_000_000; + + assertThat(batchResults).hasSize(tokenCount); + + // Spot-check correctness at boundaries and middle + for (int checkIdx : new int[]{0, 1, tokenCount / 2, tokenCount - 2, tokenCount - 1}) { + List tokens = batchResults.get(checkIdx).extractContinuationTokens(); + assertThat(tokens).hasSize(1); + assertThat(tokens.get(0).getToken()).isEqualTo("token_" + checkIdx); + } + + // (b) Single-call loop (extractForEffectiveRange per range) + long singleStart = System.nanoTime(); + List singleResults = new ArrayList<>(tokenCount); + for (Range range : ranges) { + singleResults.add(state.extractForEffectiveRange(range)); + } + long singleElapsedMs = (System.nanoTime() - singleStart) / 1_000_000; + + assertThat(singleResults).hasSize(tokenCount); + + // Verify parity between batch and single-call results + for (int checkIdx : new int[]{0, tokenCount / 4, tokenCount / 2, 3 * tokenCount / 4, tokenCount - 1}) { + assertThat(batchResults.get(checkIdx).toString()) + .as("Batch vs single mismatch at index %d", checkIdx) + .isEqualTo(singleResults.get(checkIdx).toString()); + } + + // Log timing for manual inspection — not a strict assertion since CI perf varies + System.out.println(String.format( + "[Microbenchmark] 30K feed ranges: batch API = %d ms, single-call loop = %d ms, speedup = %.1fx", + batchElapsedMs, + singleElapsedMs, + singleElapsedMs > 0 ? (double) singleElapsedMs / batchElapsedMs : Double.NaN)); + + // The batch API should complete in well under 30 seconds for 30K ranges. + // The old quadratic approach would take minutes. + assertThat(batchElapsedMs) + .as("Batch extraction of 30K ranges took %d ms, should be < 30,000 ms", batchElapsedMs) + .isLessThan(30_000); + } + + @Test(groups = "unit") + public void changeFeedState_extractContinuationTokens_nullContinuation() { + String containerRid = "/cols/" + UUID.randomUUID(); + String pkRangeId = UUID.randomUUID().toString(); + FeedRangePartitionKeyRangeImpl feedRange = new FeedRangePartitionKeyRangeImpl(pkRangeId); + + ChangeFeedState state = new ChangeFeedStateV1( + containerRid, + feedRange, + ChangeFeedMode.INCREMENTAL, + ChangeFeedStartFromInternal.createFromNow(), + null); + + // With null continuation, extractContinuationTokens returns empty + List tokens = state.extractContinuationTokens(); + assertThat(tokens).isEmpty(); + } + + @Test(groups = "unit", expectedExceptions = IllegalArgumentException.class) + public void changeFeedState_extractForEffectiveRange_nullContinuation_throws() { + String containerRid = "/cols/" + UUID.randomUUID(); + String pkRangeId = UUID.randomUUID().toString(); + FeedRangePartitionKeyRangeImpl feedRange = new FeedRangePartitionKeyRangeImpl(pkRangeId); + + ChangeFeedState state = new ChangeFeedStateV1( + containerRid, + feedRange, + ChangeFeedMode.INCREMENTAL, + ChangeFeedStartFromInternal.createFromNow(), + null); + + // With null continuation, extractForEffectiveRange passes an empty token list to + // FeedRangeContinuation.create(), which throws IllegalArgumentException. + state.extractForEffectiveRange(new Range<>("AA", "BB", true, false)); + } + + @Test(groups = "unit") + public void changeFeedState_extractForEffectiveRange_singleToken() { + String containerRid = "/cols/" + UUID.randomUUID(); + String pkRangeId = UUID.randomUUID().toString(); + FeedRangePartitionKeyRangeImpl feedRange = new FeedRangePartitionKeyRangeImpl(pkRangeId); + + String continuationJson = String.format( + "{\"V\":1,\"Rid\":\"%s\",\"Continuation\":[" + + "{\"token\":\"tok1\",\"range\":{\"min\":\"AA\",\"max\":\"CC\"}}" + + "],\"PKRangeId\":\"%s\"}", + containerRid, pkRangeId); + + FeedRangeContinuation continuation = FeedRangeContinuation.convert(continuationJson); + ChangeFeedState state = new ChangeFeedStateV1( + containerRid, feedRange, ChangeFeedMode.INCREMENTAL, + ChangeFeedStartFromInternal.createFromNow(), continuation); + + // Full range overlap + List tokens = + state.extractForEffectiveRange(new Range<>("AA", "CC", true, false)) + .extractContinuationTokens(); + assertThat(tokens).hasSize(1); + assertThat(tokens.get(0).getToken()).isEqualTo("tok1"); + assertThat(tokens.get(0).getRange()).isEqualTo(new Range<>("AA", "CC", true, false)); + + // Partial overlap + tokens = state.extractForEffectiveRange(new Range<>("AB", "BB", true, false)) + .extractContinuationTokens(); + assertThat(tokens).hasSize(1); + assertThat(tokens.get(0).getToken()).isEqualTo("tok1"); + assertThat(tokens.get(0).getRange()).isEqualTo(new Range<>("AB", "BB", true, false)); + } + + @Test(groups = "unit") + public void changeFeedState_extractForEffectiveRange_lastTokenExactMatch() { + // When query range exactly matches the last token, verify it is found correctly. + ChangeFeedState state = createStateWithManyTokens(5); + + // Range overlapping the last token + String min4 = String.format("%06X", 4); + String max5 = String.format("%06X", 5); + List tokens = + state.extractForEffectiveRange(new Range<>(min4, max5, true, false)) + .extractContinuationTokens(); + + assertThat(tokens).hasSize(1); + assertThat(tokens.get(0).getToken()).isEqualTo("token_4"); + } + + @Test(groups = "unit") + public void changeFeedState_extractForEffectiveRange_fullRange() { + int tokenCount = 20; + ChangeFeedState state = createStateWithManyTokens(tokenCount); + + // Extract full range should return all tokens + List tokens = state.extractContinuationTokens(); + + assertThat(tokens).hasSize(tokenCount); + for (int i = 0; i < tokenCount; i++) { + assertThat(tokens.get(i).getToken()).isEqualTo("token_" + i); + } + } + + @Test(groups = "unit") + public void changeFeedState_extractForEffectiveRange_partialOverlapAcrossTokenBoundary() { + // Two tokens: [AA, CC) and [CC, EE), query range [BB, DD) + // should get both tokens trimmed to [BB, CC) and [CC, DD) + String continuationAAToCC = "tok_aa_cc"; + String continuationCCToEE = "tok_cc_ee"; + ChangeFeedState state = this.createDefaultStateWithContinuation(continuationAAToCC, continuationCCToEE); + + List tokens = + state.extractForEffectiveRange(new Range<>("BB", "DD", true, false)) + .extractContinuationTokens(); + + assertThat(tokens).hasSize(2); + assertThat(tokens.get(0).getRange()).isEqualTo(new Range<>("BB", "CC", true, false)); + assertThat(tokens.get(0).getToken()).isEqualTo(continuationAAToCC); + assertThat(tokens.get(1).getRange()).isEqualTo(new Range<>("CC", "DD", true, false)); + assertThat(tokens.get(1).getToken()).isEqualTo(continuationCCToEE); + } + + @Test(groups = "unit", expectedExceptions = IllegalArgumentException.class) + public void changeFeedState_extractForEffectiveRange_noOverlapReturnsEmpty() { + // Query range entirely outside all token ranges should cause extractForEffectiveRange + // to throw, because FeedRangeContinuation.create rejects empty token lists. + ChangeFeedState state = createStateWithManyTokens(5); + + // All tokens are in [000000, 000005). Query a range beyond all tokens. + state.extractForEffectiveRange(new Range<>("000010", "000011", true, false)); + } + + @Test(groups = "unit") + public void changeFeedState_extractForEffectiveRange_unsortedInput() { + // Tokens provided in reverse order to verify that the sort is actually exercised. + // Without sorting, binary search and overlap detection would produce wrong results. + ChangeFeedState state = createStateWithTokenRanges(new String[][] { + {"CC", "DD"}, + {"AA", "BB"}, + {"EE", "FF"} + }); + + // After sorting, tokens should be [AA,BB), [CC,DD), [EE,FF) + // Query [AA, FF) should return all three + List allTokens = + state.extractForEffectiveRange(new Range<>("AA", "FF", true, false)) + .extractContinuationTokens(); + + assertThat(allTokens).hasSize(3); + // Verify sorted order in results + assertThat(allTokens.get(0).getRange().getMin()).isEqualTo("AA"); + assertThat(allTokens.get(1).getRange().getMin()).isEqualTo("CC"); + assertThat(allTokens.get(2).getRange().getMin()).isEqualTo("EE"); + + // Query a specific middle range + List middleToken = + state.extractForEffectiveRange(new Range<>("CC", "DD", true, false)) + .extractContinuationTokens(); + + assertThat(middleToken).hasSize(1); + assertThat(middleToken.get(0).getRange()).isEqualTo(new Range<>("CC", "DD", true, false)); + } + + @Test(groups = "unit") + public void changeFeedState_extractForEffectiveRanges_singleBatchAndSingleCallParity() { + // Verifies that extractForEffectiveRange (single) and extractForEffectiveRanges (batch) + // produce identical results for the same input ranges. + ChangeFeedState state = createStateWithManyTokens(20); + + List> ranges = new ArrayList<>(); + ranges.add(new Range<>(String.format("%06X", 0), String.format("%06X", 5), true, false)); + ranges.add(new Range<>(String.format("%06X", 10), String.format("%06X", 15), true, false)); + ranges.add(new Range<>(String.format("%06X", 18), String.format("%06X", 20), true, false)); + + List batchResults = state.extractForEffectiveRanges(ranges); + + for (int i = 0; i < ranges.size(); i++) { + ChangeFeedState singleResult = state.extractForEffectiveRange(ranges.get(i)); + assertThat(batchResults.get(i).toString()) + .as("Batch result at index %d should match single-range result", i) + .isEqualTo(singleResult.toString()); + } + } + + private ChangeFeedState createStateWithTokenRanges(String[][] tokenRanges) { + String containerRid = "/cols/" + UUID.randomUUID(); + String pkRangeId = UUID.randomUUID().toString(); + FeedRangePartitionKeyRangeImpl feedRange = new FeedRangePartitionKeyRangeImpl(pkRangeId); + + StringBuilder entries = new StringBuilder(); + for (int i = 0; i < tokenRanges.length; i++) { + if (i > 0) { + entries.append(","); + } + entries.append(String.format( + "{\"token\":\"tok_%d\",\"range\":{\"min\":\"%s\",\"max\":\"%s\"}}", + i, tokenRanges[i][0], tokenRanges[i][1])); + } + + String continuationJson = String.format( + "{\"V\":1,\"Rid\":\"%s\",\"Continuation\":[%s],\"PKRangeId\":\"%s\"}", + containerRid, entries, pkRangeId); + + FeedRangeContinuation continuation = FeedRangeContinuation.convert(continuationJson); + return new ChangeFeedStateV1( + containerRid, + feedRange, + ChangeFeedMode.INCREMENTAL, + ChangeFeedStartFromInternal.createFromNow(), + continuation); + } + @Test(dataProvider = "populateRequestArgProvider", groups = "unit") public void changeFeedState_populateRequest( ChangeFeedMode changeFeedMode, diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 13c8130c8f1d..e7eff2f75301 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed quadratic O(P × T log T) performance in `ChangeFeedState.extractForEffectiveRange` that caused excessive driver-side microbatch planning time in the Spark change feed connector for containers with many feed ranges / continuation tokens. Added batch API `extractForEffectiveRanges(List)` that sorts continuation tokens once and uses binary search for overlap detection (matching the `InMemoryCollectionRoutingMap` pattern), reducing complexity to O(T log T + P log T). - See [PR 49084](https://github.com/Azure/azure-sdk-for-java/pull/49084) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedState.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedState.java index 727a25586f72..c51279933e44 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedState.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedState.java @@ -33,6 +33,7 @@ public abstract class ChangeFeedState extends JsonSerializable { private static final Comparator> MIN_RANGE_COMPARATOR = new Range.MinComparator<>(); private static final Comparator> MAX_RANGE_COMPARATOR = new Range.MaxComparator<>(); + ChangeFeedState() { } @@ -91,40 +92,129 @@ public String toString() { public abstract void populateRequest(RxDocumentServiceRequest request, int maxItemCount); public List extractContinuationTokens() { - return extractContinuationTokens(PartitionKeyInternalHelper.FullRange).getLeft(); + return extractContinuationTokensForRange( + PartitionKeyInternalHelper.FullRange, + getSortedContinuationTokens()).getLeft(); } - private Pair, Range> extractContinuationTokens( - Range effectiveRange) { - + /** + * Extracts a {@link ChangeFeedState} for a single effective range. + *

+ * For callers that need to extract states for multiple ranges from the same + * continuation, prefer {@link #extractForEffectiveRanges(List)} which sorts + * the continuation tokens once and reuses the sorted list across all ranges. + * + * @param effectiveRange the partition range to extract for + * @return a new {@link ChangeFeedState} scoped to the given range + */ + public ChangeFeedState extractForEffectiveRange(Range effectiveRange) { checkNotNull(effectiveRange); + return extractForEffectiveRanges(Collections.singletonList(effectiveRange)).get(0); + } - List extractedContinuationTokens = new ArrayList<>(); + /** + * Extracts a list of {@link ChangeFeedState} instances, one per input effective range. + *

+ * Continuation tokens are sorted once (O(T log T)) and then each range lookup uses + * binary search (O(log T)) to find the starting position, followed by a forward scan + * through contiguous overlapping tokens. This follows the same binary search pattern as + * {@link com.azure.cosmos.implementation.routing.InMemoryCollectionRoutingMap#getOverlappingRanges}. + *

+ * The returned list preserves the input order: result.get(i) corresponds to + * effectiveRanges.get(i). + * + * @param effectiveRanges the list of partition ranges to extract for + * @return a list of {@link ChangeFeedState}, one per input range, in the same order + */ + public List extractForEffectiveRanges(List> effectiveRanges) { + checkNotNull(effectiveRanges, "Argument 'effectiveRanges' must not be null."); + checkArgument(!effectiveRanges.isEmpty(), "Argument 'effectiveRanges' must not be empty."); + + List sortedTokens = getSortedContinuationTokens(); + + List results = new ArrayList<>(effectiveRanges.size()); + for (Range effectiveRange : effectiveRanges) { + checkNotNull(effectiveRange, "Effective range must not be null."); + + Pair, Range> extracted = + extractContinuationTokensForRange(effectiveRange, sortedTokens); + + List tokens = extracted.getLeft(); + Range totalRange = extracted.getRight(); + + FeedRangeEpkImpl feedRange = new FeedRangeEpkImpl(totalRange); + + results.add(new ChangeFeedStateV1( + this.getContainerRid(), + feedRange, + this.getMode(), + this.getStartFromSettings(), + FeedRangeContinuation.create( + this.getContainerRid(), + feedRange, + tokens + ) + )); + } + + return results; + } + + private List getSortedContinuationTokens() { FeedRangeContinuation continuation = this.getContinuation(); + if (continuation == null) { + return Collections.emptyList(); + } + + List sortedTokens = new ArrayList<>(); + Collections.addAll(sortedTokens, continuation.getCurrentContinuationTokens()); + sortedTokens.sort(ContinuationTokenRangeComparator.SINGLETON_INSTANCE); + return sortedTokens; + } + + /** + * Finds overlapping continuation tokens for an effective range using binary search + * to locate the starting position, then scanning forward through contiguous overlapping + * tokens. This follows the same pattern as + * {@link com.azure.cosmos.implementation.routing.InMemoryCollectionRoutingMap#getOverlappingRanges} + * and assumes non-overlapping, contiguous partition ranges (Cosmos DB contract). + */ + private Pair, Range> extractContinuationTokensForRange( + Range effectiveRange, + List sortedTokens) { + + checkNotNull(effectiveRange); + + List extractedTokens = new ArrayList<>(); String min = null; String max = null; - if (continuation != null) { - List continuationTokensSnapshot = new ArrayList<>(); - Collections.addAll(continuationTokensSnapshot, continuation.getCurrentContinuationTokens()); - continuationTokensSnapshot.sort(ContinuationTokenRangeComparator.SINGLETON_INSTANCE); - for (CompositeContinuationToken compositeContinuationToken : continuationTokensSnapshot) { - if (Range.checkOverlapping(effectiveRange, compositeContinuationToken.getRange())) { - Range overlappingRange = - getOverlappingRange(effectiveRange, compositeContinuationToken.getRange()); - - extractedContinuationTokens.add( - new CompositeContinuationToken(compositeContinuationToken.getToken(), - overlappingRange)); + if (!sortedTokens.isEmpty()) { + // Binary search for the first potential overlap position, + // matching InMemoryCollectionRoutingMap.getOverlappingRanges pattern + int startIndex = Collections.binarySearch( + sortedTokens, new CompositeContinuationToken(null, effectiveRange), + ContinuationTokenRangeComparator.SINGLETON_INSTANCE); + if (startIndex < 0) { + // Insertion point - 1: back up to catch a token whose range.min <= effectiveRange.min + // but whose range.max extends past effectiveRange.min + startIndex = Math.max(0, -startIndex - 2); + } + // Scan forward from startIndex through contiguous overlapping tokens + for (int i = startIndex; i < sortedTokens.size(); i++) { + CompositeContinuationToken token = sortedTokens.get(i); + if (Range.checkOverlapping(effectiveRange, token.getRange())) { + Range overlappingRange = getOverlappingRange(effectiveRange, token.getRange()); + extractedTokens.add(new CompositeContinuationToken( + token.getToken(), overlappingRange)); if (min == null) { min = overlappingRange.getMin(); } max = overlappingRange.getMax(); - } else { - if (extractedContinuationTokens.size() > 0) { - break; - } + } else if (!extractedTokens.isEmpty()) { + // Non-overlapping after finding overlaps — contiguous ranges are exhausted + break; } } } @@ -135,31 +225,7 @@ private Pair, Range> extractContinuatio true, false); - return Pair.of(extractedContinuationTokens, totalRange); - } - - public ChangeFeedState extractForEffectiveRange(Range effectiveRange) { - checkNotNull(effectiveRange); - - Pair, Range> effectiveTokensAndMinMax = - this.extractContinuationTokens(effectiveRange); - - List extractedContinuationTokens = effectiveTokensAndMinMax.getLeft(); - Range totalRange = effectiveTokensAndMinMax.getRight(); - - FeedRangeEpkImpl feedRange = new FeedRangeEpkImpl(totalRange); - - return new ChangeFeedStateV1( - this.getContainerRid(), - feedRange, - this.getMode(), - this.getStartFromSettings(), - FeedRangeContinuation.create( - this.getContainerRid(), - feedRange, - extractedContinuationTokens - ) - ); + return Pair.of(extractedTokens, totalRange); } private Range getOverlappingRange(Range left, Range right) {