Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "java",
"TagPrefix": "java/storage/azure-storage-blob",
"Tag": "java/storage/azure-storage-blob_1f689f90f0"
"Tag": "java/storage/azure-storage-blob_f0eadf5927"
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

Expand Down Expand Up @@ -875,6 +876,74 @@ public void uploadChunkedRandomSizesRoundTripDataIntegrity() {
+ ")");
}

// ---------- Fuzzy parallel upload (async) ----------

@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadPutBlobReplayableCases")
public void fuzzyParallelUploadPutBlobReplayableRoundTrip(int payloadBytes, long segmentBytes, int maxConcurrency) {
assertParallelUploadFuzzyRoundTripAsync("putBlobReplay", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // Staging-only cases: Put Block URLs include random IDs.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadSmallPayloadStagingCases")
public void fuzzyParallelUploadSmallPayloadRoundTripRequiresLiveStaging(int payloadBytes, long segmentBytes,
int maxConcurrency) {
assertParallelUploadFuzzyRoundTripAsync("smallPayloadStaging", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // payload > segment for every tuple; always staging/Put Block.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadSub4MiBCases")
public void fuzzyParallelUploadSubFourMiBlobRoundTrip(int payloadBytes, long segmentBytes, int maxConcurrency) {
Comment thread
ibrandes marked this conversation as resolved.
Outdated
assertParallelUploadFuzzyRoundTripAsync("subFourMiB", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // Staging-only cases.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadFourMiBBoundaryStagingCases")
public void fuzzyParallelUploadFourMiBBoundaryRoundTripRequiresLiveStaging(int payloadBytes, long segmentBytes,
int maxConcurrency) {
assertParallelUploadFuzzyRoundTripAsync("fourMiBBoundaryStaging", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // Chunked uploads only.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadMediumMultiPartCases")
public void fuzzyParallelUploadMediumMultiPartRoundTrip(int payloadBytes, long segmentBytes, int maxConcurrency) {
assertParallelUploadFuzzyRoundTripAsync("mediumMultiPart", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // Large chunked uploads.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadLargeMultiPartCases")
public void fuzzyParallelUploadLargeMultiPartRoundTrip(int payloadBytes, long segmentBytes, int maxConcurrency) {
assertParallelUploadFuzzyRoundTripAsync("largeMultiPart", payloadBytes, segmentBytes, maxConcurrency);
}

private void assertParallelUploadFuzzyRoundTripAsync(String caseKind, int payloadBytes, long segmentBytes,
int maxConcurrency) {
BlobAsyncClient client = createBlobAsyncClientWithRequestSniffer(new CopyOnWriteArrayList<>());

byte[] randomData = getRandomByteArray(payloadBytes);
Flux<ByteBuffer> data = Flux.just(ByteBuffer.wrap(randomData));

Comment thread
ibrandes marked this conversation as resolved.
Outdated
ParallelTransferOptions parallelOptions = new ParallelTransferOptions().setBlockSizeLong(segmentBytes)
.setMaxSingleUploadSizeLong(segmentBytes)
.setMaxConcurrency(maxConcurrency);

BlobParallelUploadOptions options
= new BlobParallelUploadOptions(data).setParallelTransferOptions(parallelOptions)
.setRequestConditions(new BlobRequestConditions())
.setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64);

client.uploadWithResponse(options).block();

byte[] downloaded = client.downloadContent().block().toBytes();
assertArrayEquals(randomData, downloaded, "Fuzzy parallel upload [" + caseKind + "] payloadBytes="
+ payloadBytes + ", segmentBytes=" + segmentBytes + ", maxConcurrency=" + maxConcurrency);
Comment thread
ibrandes marked this conversation as resolved.
Outdated
}

@LiveOnly // This test is too large for the test proxy.
@Test
public void blockBlobSimpleUploadRandomSizeRoundTripDataIntegrity() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.ByteArrayInputStream;
import java.io.File;
Expand Down Expand Up @@ -1133,6 +1134,74 @@ public void uploadChunkedRandomSizesRoundTripDataIntegrity() {
+ ")");
}

// ---------- Fuzzy parallel upload (deterministic grids) ----------

@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadPutBlobReplayableCases")
public void fuzzyParallelUploadPutBlobReplayableRoundTrip(int payloadBytes, long segmentBytes, int maxConcurrency) {
assertParallelUploadFuzzyRoundTrip("putBlobReplay", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // Staging-only cases: Put Block URLs include random IDs; see class comment above.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadSmallPayloadStagingCases")
public void fuzzyParallelUploadSmallPayloadRoundTripRequiresLiveStaging(int payloadBytes, long segmentBytes,
int maxConcurrency) {
assertParallelUploadFuzzyRoundTrip("smallPayloadStaging", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // payload > segment for every tuple; always staging/Put Block.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadSub4MiBCases")
public void fuzzyParallelUploadSubFourMiBlobRoundTrip(int payloadBytes, long segmentBytes, int maxConcurrency) {
Comment thread
ibrandes marked this conversation as resolved.
Outdated
assertParallelUploadFuzzyRoundTrip("subFourMiB", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // Staging-only cases; deterministic Put Blob rows are in fuzzyParallelUpload_putBlobReplayable_roundTrip.
Comment thread
ibrandes marked this conversation as resolved.
Outdated
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadFourMiBBoundaryStagingCases")
public void fuzzyParallelUploadFourMiBBoundaryRoundTripRequiresLiveStaging(int payloadBytes, long segmentBytes,
int maxConcurrency) {
assertParallelUploadFuzzyRoundTrip("fourMiBBoundaryStaging", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // payload > segment throughout; chunked upload.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadMediumMultiPartCases")
public void fuzzyParallelUploadMediumMultiPartRoundTrip(int payloadBytes, long segmentBytes, int maxConcurrency) {
assertParallelUploadFuzzyRoundTrip("mediumMultiPart", payloadBytes, segmentBytes, maxConcurrency);
}

@LiveOnly // payload >> segment throughout; chunked upload / large payloads.
@ParameterizedTest
@MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelUploadLargeMultiPartCases")
public void fuzzyParallelUploadLargeMultiPartRoundTrip(int payloadBytes, long segmentBytes, int maxConcurrency) {
assertParallelUploadFuzzyRoundTrip("largeMultiPart", payloadBytes, segmentBytes, maxConcurrency);
}

private void assertParallelUploadFuzzyRoundTrip(String caseKind, int payloadBytes, long segmentBytes,
int maxConcurrency) {
BlobClient client = createBlobClientWithRequestSniffer(new CopyOnWriteArrayList<>());

byte[] randomData = getRandomByteArray(payloadBytes);
InputStream data = new ByteArrayInputStream(randomData);

Comment thread
ibrandes marked this conversation as resolved.
Outdated
ParallelTransferOptions parallelOptions = new ParallelTransferOptions().setBlockSizeLong(segmentBytes)
.setMaxSingleUploadSizeLong(segmentBytes)
.setMaxConcurrency(maxConcurrency);

BlobParallelUploadOptions options
= new BlobParallelUploadOptions(data).setParallelTransferOptions(parallelOptions)
.setRequestConditions(new BlobRequestConditions())
.setContentValidationAlgorithm(ContentValidationAlgorithm.CRC64);

client.uploadWithResponse(options, null, Context.NONE);

byte[] downloaded = client.downloadContent().toBytes();
assertArrayEquals(randomData, downloaded, "Fuzzy parallel upload [" + caseKind + "] payloadBytes="
+ payloadBytes + ", segmentBytes=" + segmentBytes + ", maxConcurrency=" + maxConcurrency);
Comment thread
ibrandes marked this conversation as resolved.
Outdated
}

@LiveOnly // This test is too large for the test proxy.
@Test
public void blockBlobSimpleUploadRandomSizeRoundTripDataIntegrity() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1518,4 +1518,129 @@ protected static long expectedStructuredMessageEncodedLengthChunked(int totalUne
}
return sum;
}

/**
* Every tuple keeps payloadBytes <= segmentBytes, so the parallel upload path issues a single Put Blob (no
* Put Block IDs), which replays under the test proxy unlike staging-heavy cases.
* <p>
* Sizes are deliberately non-power-of-two (e.g. 7 * KB + 3) and use mixed segment ceilings (64 KiB
* through multi-MiB) to catch alignment and buffer edge cases; rows include the exact 4 MiB service boundary
* and several concurrency values (1–8) to exercise parallel request fan-out without live-only staging.
*/
protected static Stream<Arguments> fuzzyParallelUploadPutBlobReplayableCases() {
return Stream.of(Arguments.of(7 * Constants.KB + 3, 64L * Constants.KB, 1),
Arguments.of(7 * Constants.KB + 3, 128L * Constants.KB, 4),
Arguments.of(41 * Constants.KB + 17, 256L * Constants.KB, 1),
Arguments.of(41 * Constants.KB + 17, 256L * Constants.KB, 8),
Arguments.of(199 * Constants.KB + 5, 512L * Constants.KB, 2),
Arguments.of(512 * Constants.KB - 31, 1L * Constants.MB, 8),
Arguments.of(896 * Constants.KB + 101, 1L * Constants.MB, 6),
Arguments.of(4 * Constants.MB, 4L * Constants.MB, 1),
Arguments.of(4 * Constants.MB, 7L * Constants.MB + 919, 3));
}

/**
* payloadBytes > segmentBytes, so uploads still go through Put Block staging even though totals are only
* hundreds of KiB—too small for the proxy when block IDs vary per run.
* <p>
* One row pairs a ~200 KiB payload with a 64 KiB segment and moderate concurrency; the other uses a
* ~512 KiB payload with a 1 KiB segment to force many tiny blocks (stress scheduling and per-block CRC64
* framing) without the cost of the large multi-part grids.
*/
protected static Stream<Arguments> fuzzyParallelUploadSmallPayloadStagingCases() {
return Stream.of(Arguments.of(200 * Constants.KB, 64L * Constants.KB, 3),
Arguments.of(512 * Constants.KB - 31, 1L * Constants.KB, 1));
}

/**
* payloadBytes > segmentBytes and payloadBytes <= 4 * Constants.MB - 1 (the ceiling
* field), so the blob stays strictly under the 4 MiB transactional CRC64-header path while uploads remain
* chunked—live-only because of Put Block identity churn.
Comment thread
ibrandes marked this conversation as resolved.
Outdated
* <p>
* Values mix MiB/KiB segment sizes with offsets (e.g. + 19, - 903) so part counts and last-block
* lengths are not powers of two; the last rows hug ceiling with awkward divisors in segmentBytes to
* stress remainder handling near the sub-4 MiB limit.
*/
protected static Stream<Arguments> fuzzyParallelUploadSub4MiBCases() {
final int ceiling = (4 * Constants.MB) - 1;
return Stream.of(Arguments.of(1 * Constants.MB + 1, 1L * Constants.MB, 1),
Arguments.of(1 * Constants.MB + 1, 2L * Constants.KB, 8),
Arguments.of((5 * Constants.MB) / 4 + 19, 256L * Constants.KB, 4),
Arguments.of(2 * Constants.MB - 903, 1L * Constants.MB, 2),
Arguments.of(2 * Constants.MB + 33, 1L * Constants.KB, 1),
Arguments.of(2 * Constants.MB + 33, 1L * Constants.MB, 8),
Arguments.of((11 * Constants.MB) / 4 - 17, 512L * Constants.KB, 6),
Arguments.of(3 * Constants.MB - 777, 2L * Constants.MB, 8),
Arguments.of(3 * Constants.MB - 1, 1L * Constants.MB, 1), Arguments.of(ceiling - 511, 1L * Constants.MB, 4),
Arguments.of(ceiling - 511, 1L * Constants.MB + 511, 2),
Arguments.of(ceiling, (long) (ceiling / 7 + 17), 3), Arguments.of(ceiling, (long) (ceiling / 2 + 1), 8));
Comment thread
ibrandes marked this conversation as resolved.
}

/**
* Centers on 4 * Constants.MB - 1, exactly 4 * Constants.MB, and just above 4 MiB, with segment
* sizes spanning KiB through multi-MiB—exercising the SDK/service boundary where single-shot vs block staging and
* CRC64 header vs structured-message rules flip, while keeping deterministic Put Blob coverage in the replayable
* supplier above.
* <p>
* Includes near-boundary payloads (e.g. -8192, +31, +8191 from 4 MiB) so neither total size nor last segment
* length aligns to typical buffer multiples.
*/
protected static Stream<Arguments> fuzzyParallelUploadFourMiBBoundaryStagingCases() {
final int minus = (4 * Constants.MB) - 1;
final int plus = (4 * Constants.MB) + 1;
return Stream.of(Arguments.of(minus, 1L * Constants.MB, 1), Arguments.of(minus, 512L * Constants.KB, 6),
Arguments.of(minus, 2L * Constants.MB, 8), Arguments.of((4 * Constants.MB) - 8192, 1L * Constants.KB, 4),
Arguments.of(4 * Constants.MB, (long) (4 * Constants.MB / 2), 8),
Arguments.of(4 * Constants.MB, 256L * Constants.KB, 2), Arguments.of(plus, 1L * Constants.MB, 1),
Arguments.of(plus, 2L * Constants.MB, 8), Arguments.of(plus, 1L * Constants.KB, 7),
Arguments.of((4 * Constants.MB) + 31, 511L * Constants.KB + 409, 4),
Arguments.of((4 * Constants.MB) + 8191, 3L * Constants.MB - 413, 6));
}

/**
* All rows keep payloadBytes > segmentBytes with totals roughly 6–80 MiB—large enough for meaningful parallel
* block fan-out and structured-message segments, but cheaper than {@link #fuzzyParallelUpload_largeMultiPartCases}.
Comment thread
ibrandes marked this conversation as resolved.
Outdated
* <p>
* Block sizes step through common service limits (1–8 MiB, half-MiB tail values); concurrency 1–8 pairs with
* imbalanced payloads (e.g. 701, 333) to flush merge/retry edge cases.
*/
protected static Stream<Arguments> fuzzyParallelUploadMediumMultiPartCases() {
return Stream.of(Arguments.of(6 * Constants.MB + 701, Constants.MB, 1),
Arguments.of(6 * Constants.MB + 701, 3L * Constants.MB + 271, 4),
Arguments.of(9 * Constants.MB + 333, 2L * Constants.MB, 1),
Arguments.of(9 * Constants.MB + 333, 3L * Constants.MB + 199, 8),
Arguments.of(12 * Constants.MB + 901, 4L * Constants.MB + 901, 2),
Arguments.of(14 * Constants.MB, 500L * Constants.KB + 13, 6),
Arguments.of(18 * Constants.MB - 4021, 5L * Constants.MB - 701, 3),
Arguments.of(24 * Constants.MB, 8L * Constants.MB, 8),
Arguments.of(28 * Constants.MB + 56789, 7L * Constants.MB + 13, 2),
Arguments.of(31 * Constants.MB, 1024L * Constants.KB + 17, 4),
Arguments.of(40 * Constants.MB + 12345, 7L * Constants.MB + 13, 3),
Arguments.of(48 * Constants.MB - 777, 5L * Constants.MB + 809L * Constants.KB, 6),
Arguments.of(56 * Constants.MB + 19, 9L * Constants.MB + 4096, 8),
Arguments.of(72 * Constants.MB, 4L * Constants.MB + 65536, 8),
Arguments.of(80 * Constants.MB + 321, 13L * Constants.MB - 3073, 1));
}

/**
* Stresses high block counts and long-running parallel uploads (~96–320 MiB payloads) with service-realistic segment
* sizes (8–61 MiB class) and heavy concurrency.
* <p>
* The final rows use named near-256/288/320 MiB totals with irregular byte tails to keep total bytes and
* block remainders off common multiples while still bounding runtime for Live-only CI.
*/
protected static Stream<Arguments> fuzzyParallelUploadLargeMultiPartCases() {
final int payload257MiBPlus = (int) (257L * Constants.MB + 18881);
final int payload288MiBPlus = (int) (288L * Constants.MB + 7777);
final int payload320MiBPlus = (int) (320L * Constants.MB + 1999);
return Stream.of(Arguments.of(96 * Constants.MB + 17, 8L * Constants.MB + 511, 2),
Arguments.of(112 * Constants.MB, 15L * Constants.MB + 4096, 8),
Arguments.of(128 * Constants.MB + 45673, 17L * Constants.MB - 11264 + 173, 4),
Arguments.of(160 * Constants.MB + 12345, 12L * Constants.MB + 8192, 8),
Arguments.of(192 * Constants.MB + 9876, 31L * Constants.MB - 513, 8),
Arguments.of(224 * Constants.MB, 23L * Constants.MB + 524288, 8),
Arguments.of(payload257MiBPlus, 61L * Constants.MB + 23L * Constants.KB, 6),
Arguments.of(payload288MiBPlus, 36L * Constants.MB + 513, 8),
Arguments.of(payload320MiBPlus, 16L * Constants.MB + 511, 8));
}
}
Loading