From 53a60c050d5b813f246df0aa46b46ef9a47c7e91 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Thu, 7 May 2026 02:21:51 +0800 Subject: [PATCH 1/8] HDDS-15193. Remove old atomic key creation --- .../design/overwrite-key-only-if-unchanged.md | 2 +- .../ozone/client/io/ECKeyOutputStream.java | 15 ---------- .../ozone/client/io/KeyDataStreamOutput.java | 29 ++----------------- .../ozone/client/io/KeyOutputStream.java | 24 --------------- .../hadoop/ozone/client/rpc/RpcClient.java | 13 +-------- .../hadoop/ozone/client/TestOzoneClient.java | 13 ++++----- 6 files changed, 9 insertions(+), 87 deletions(-) diff --git a/hadoop-hdds/docs/content/design/overwrite-key-only-if-unchanged.md b/hadoop-hdds/docs/content/design/overwrite-key-only-if-unchanged.md index c4d4211cabfa..991617c78345 100644 --- a/hadoop-hdds/docs/content/design/overwrite-key-only-if-unchanged.md +++ b/hadoop-hdds/docs/content/design/overwrite-key-only-if-unchanged.md @@ -91,7 +91,7 @@ The advantage of this alternative approach is that it does not require the expec However the client code required to implement this appears more complex due to having different key commit logic for Ratis and EC and the parameter needing to be passed through many method calls. -PR [#5524](https://github.com/apache/ozone/pull/5524) illustrates this approach for the atomicKeyCreation feature which was added to S3. +PR [#5524](https://github.com/apache/ozone/pull/5524) previously illustrated this approach for S3 object creation. The existing implementation for key creation stores various attributes (metadata, creation time, ACLs, ReplicationConfig) in the openKey table, so storing the expectedGeneration keeps with that convention, which is less confusing for future developers. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java index ee5c75487573..9f94384d4df2 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java @@ -74,14 +74,6 @@ public final class ECKeyOutputStream extends KeyOutputStream private final Future flushFuture; private final AtomicLong flushCheckpoint; - /** - * Indicates if an atomic write is required. When set to true, - * the amount of data written must match the declared size during the commit. - * A mismatch will prevent the commit from succeeding. - * This is essential for operations like S3 put to ensure atomicity. - */ - private boolean atomicKeyCreation; - private volatile boolean closed; private volatile boolean closing; // how much of data is actually written yet to underlying stream @@ -130,7 +122,6 @@ private ECKeyOutputStream(Builder builder) { return flushStripeFromQueue(); }); this.flushCheckpoint = new AtomicLong(0); - this.atomicKeyCreation = builder.getAtomicKeyCreation(); } @Override @@ -489,12 +480,6 @@ public void close() throws IOException { Preconditions.checkArgument(writeOffset == offset, "Expected writeOffset= " + writeOffset + " Expected offset=" + offset); - if (atomicKeyCreation) { - long expectedSize = blockOutputStreamEntryPool.getDataSize(); - Preconditions.checkState(expectedSize == offset, String.format( - "Expected: %d and actual %d write sizes do not match", - expectedSize, offset)); - } for (CheckedRunnable preCommit : preCommits) { preCommit.run(); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java index ceacd624e935..36372bed778f 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java @@ -77,14 +77,6 @@ public class KeyDataStreamOutput extends AbstractDataStreamOutput private long clientID; - /** - * Indicates if an atomic write is required. When set to true, - * the amount of data written must match the declared size during the commit. - * A mismatch will prevent the commit from succeeding. - * This is essential for operations like S3 put to ensure atomicity. - */ - private boolean atomicKeyCreation; - private List> preCommits = Collections.emptyList(); public void setPreCommits(@Nonnull List> preCommits) { @@ -129,7 +121,6 @@ public KeyDataStreamOutput() { this.writeOffset = 0; this.clientID = 0L; - this.atomicKeyCreation = false; } @SuppressWarnings({"parameternumber", "squid:S00107"}) @@ -140,8 +131,7 @@ public KeyDataStreamOutput( OzoneManagerProtocol omClient, int chunkSize, String requestId, ReplicationConfig replicationConfig, String uploadID, int partNumber, boolean isMultipart, - boolean unsafeByteBufferConversion, - boolean atomicKeyCreation + boolean unsafeByteBufferConversion ) { super(HddsClientUtils.getRetryPolicyByException( config.getMaxRetryCount(), config.getRetryInterval())); @@ -162,7 +152,6 @@ public KeyDataStreamOutput( // encrypted bucket. this.writeOffset = 0; this.clientID = handler.getId(); - this.atomicKeyCreation = atomicKeyCreation; } /** @@ -457,12 +446,6 @@ public void close() throws IOException { if (!isException()) { Preconditions.checkArgument(writeOffset == offset); } - if (atomicKeyCreation) { - long expectedSize = blockDataStreamOutputEntryPool.getDataSize(); - Preconditions.checkArgument(expectedSize == offset, - String.format("Expected: %d and actual %d write sizes do not match", - expectedSize, offset)); - } for (CheckedRunnable preCommit : preCommits) { preCommit.run(); } @@ -501,8 +484,6 @@ public static class Builder { private boolean unsafeByteBufferConversion; private OzoneClientConfig clientConfig; private ReplicationConfig replicationConfig; - private boolean atomicKeyCreation = false; - public Builder setMultipartUploadID(String uploadID) { this.multipartUploadID = uploadID; return this; @@ -553,11 +534,6 @@ public Builder setReplicationConfig(ReplicationConfig replConfig) { return this; } - public Builder setAtomicKeyCreation(boolean atomicKey) { - this.atomicKeyCreation = atomicKey; - return this; - } - public KeyDataStreamOutput build() { return new KeyDataStreamOutput( clientConfig, @@ -570,8 +546,7 @@ public KeyDataStreamOutput build() { multipartUploadID, multipartNumber, isMultipartKey, - unsafeByteBufferConversion, - atomicKeyCreation); + unsafeByteBufferConversion); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 2f9edfa94ea8..791709eb52ed 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -98,13 +98,6 @@ public class KeyOutputStream extends OutputStream private long clientID; private StreamBufferArgs streamBufferArgs; - /** - * Indicates if an atomic write is required. When set to true, - * the amount of data written must match the declared size during the commit. - * A mismatch will prevent the commit from succeeding. - * This is essential for operations like S3 put to ensure atomicity. - */ - private boolean atomicKeyCreation; private ContainerClientMetrics clientMetrics; private OzoneManagerVersion ozoneManagerVersion; private final Lock writeLock = new ReentrantLock(); @@ -186,7 +179,6 @@ public KeyOutputStream(Builder b) { this.isException = false; this.writeOffset = 0; this.clientID = b.getOpenHandler().getId(); - this.atomicKeyCreation = b.getAtomicKeyCreation(); this.streamBufferArgs = b.getStreamBufferArgs(); this.clientMetrics = b.getClientMetrics(); this.ozoneManagerVersion = b.ozoneManagerVersion; @@ -656,12 +648,6 @@ private void closeInternal() throws IOException { if (!isException) { Preconditions.checkArgument(writeOffset == offset); } - if (atomicKeyCreation) { - long expectedSize = blockOutputStreamEntryPool.getDataSize(); - Preconditions.checkState(expectedSize == offset, - String.format("Expected: %d and actual %d write sizes do not match", - expectedSize, offset)); - } for (CheckedRunnable preCommit : preCommits) { preCommit.run(); } @@ -701,7 +687,6 @@ public static class Builder { private OzoneClientConfig clientConfig; private ReplicationConfig replicationConfig; private ContainerClientMetrics clientMetrics; - private boolean atomicKeyCreation = false; private StreamBufferArgs streamBufferArgs; private Supplier executorServiceSupplier; private OzoneManagerVersion ozoneManagerVersion; @@ -800,11 +785,6 @@ public Builder setReplicationConfig(ReplicationConfig replConfig) { return this; } - public Builder setAtomicKeyCreation(boolean atomicKey) { - this.atomicKeyCreation = atomicKey; - return this; - } - public Builder setClientMetrics(ContainerClientMetrics clientMetrics) { this.clientMetrics = clientMetrics; return this; @@ -814,10 +794,6 @@ public ContainerClientMetrics getClientMetrics() { return clientMetrics; } - public boolean getAtomicKeyCreation() { - return atomicKeyCreation; - } - public Builder setExecutorServiceSupplier(Supplier executorServiceSupplier) { this.executorServiceSupplier = executorServiceSupplier; return this; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 0b058362a30f..ad5769ba06d8 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -1473,13 +1473,6 @@ private OmKeyArgs.Builder createWriteKeyArgsBuilder(String volumeName, private OzoneOutputStream openOutputStream(OmKeyArgs keyArgs, long size) throws IOException { OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs); - // For bucket with layout OBJECT_STORE, when create an empty file (size=0), - // OM will set DataSize to OzoneConfigKeys#OZONE_SCM_BLOCK_SIZE, - // which will cause S3G's atomic write length check to fail, - // so reset size to 0 here. - if (isS3GRequest.get() && size == 0) { - openKey.getKeyInfo().setDataSize(0); - } return createOutputStream(openKey); } @@ -2588,15 +2581,12 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey) } private KeyDataStreamOutput.Builder newKeyOutputStreamBuilder() { - // Amazon S3 never adds partial objects, So for S3 requests we need to - // set atomicKeyCreation to true // refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html return new KeyDataStreamOutput.Builder() .setXceiverClientManager(xceiverClientManager) .setOmClient(ozoneManagerClient) .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) - .setConfig(clientConfig) - .setAtomicKeyCreation(isS3GRequest.get()); + .setConfig(clientConfig); } private OzoneOutputStream createOutputStream(OpenKeySession openKey) @@ -2670,7 +2660,6 @@ private KeyOutputStream.Builder createKeyOutputStream( .setOmClient(ozoneManagerClient) .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) .setConfig(clientConfig) - .setAtomicKeyCreation(isS3GRequest.get()) .setClientMetrics(clientMetrics) .setExecutorServiceSupplier(writeExecutor) .setStreamBufferArgs(streamBufferArgs) diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java index 84b423a28cab..356fc14d9873 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java @@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE; import static org.apache.ozone.test.GenericTestUtils.getTestStartTime; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -221,10 +222,8 @@ public void testPutKeyWithECReplicationConfig() throws IOException { } /** - * This test validates that for S3G, - * the key upload process needs to be atomic. - * It simulates two mismatch scenarios where the actual write data size does - * not match the expected size. + * This test validates that S3G object atomicity does not depend on the + * legacy client-side close check matching the expected and written sizes. */ @Test public void testPutKeySizeMismatch() throws IOException { @@ -238,8 +237,7 @@ public void testPutKeySizeMismatch() throws IOException { value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE, new HashMap<>()); out1.write(value.substring(0, value.length() - 1).getBytes(UTF_8)); - assertThrows(IllegalStateException.class, out1::close, - "Expected IllegalArgumentException due to size mismatch."); + assertDoesNotThrow(out1::close); // Simulating second mismatch: Write more data than expected OzoneOutputStream out2 = bucket.createKey(keyName, @@ -247,8 +245,7 @@ public void testPutKeySizeMismatch() throws IOException { new HashMap<>()); value += "1"; out2.write(value.getBytes(UTF_8)); - assertThrows(IllegalStateException.class, out2::close, - "Expected IllegalArgumentException due to size mismatch."); + assertDoesNotThrow(out2::close); } finally { client.getProxy().setIsS3Request(false); } From 484154b4107670b74fc41c7c84c8ec37cccf671b Mon Sep 17 00:00:00 2001 From: peterxcli Date: Thu, 7 May 2026 02:24:55 +0800 Subject: [PATCH 2/8] HDDS-15193. Revert unrelated design doc change --- .../docs/content/design/overwrite-key-only-if-unchanged.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/docs/content/design/overwrite-key-only-if-unchanged.md b/hadoop-hdds/docs/content/design/overwrite-key-only-if-unchanged.md index 991617c78345..c4d4211cabfa 100644 --- a/hadoop-hdds/docs/content/design/overwrite-key-only-if-unchanged.md +++ b/hadoop-hdds/docs/content/design/overwrite-key-only-if-unchanged.md @@ -91,7 +91,7 @@ The advantage of this alternative approach is that it does not require the expec However the client code required to implement this appears more complex due to having different key commit logic for Ratis and EC and the parameter needing to be passed through many method calls. -PR [#5524](https://github.com/apache/ozone/pull/5524) previously illustrated this approach for S3 object creation. +PR [#5524](https://github.com/apache/ozone/pull/5524) illustrates this approach for the atomicKeyCreation feature which was added to S3. The existing implementation for key creation stores various attributes (metadata, creation time, ACLs, ReplicationConfig) in the openKey table, so storing the expectedGeneration keeps with that convention, which is less confusing for future developers. From b97a8e75d30db1cc0df9b5dd0943cce031be2e47 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Thu, 7 May 2026 02:27:48 +0800 Subject: [PATCH 3/8] HDDS-15193. Remove obsolete size mismatch test --- .../hadoop/ozone/client/TestOzoneClient.java | 31 ------------------- 1 file changed, 31 deletions(-) diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java index 356fc14d9873..c96fe8bfc5bf 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java @@ -20,7 +20,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE; import static org.apache.ozone.test.GenericTestUtils.getTestStartTime; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -221,36 +220,6 @@ public void testPutKeyWithECReplicationConfig() throws IOException { } } - /** - * This test validates that S3G object atomicity does not depend on the - * legacy client-side close check matching the expected and written sizes. - */ - @Test - public void testPutKeySizeMismatch() throws IOException { - String value = new String(new byte[1024], UTF_8); - OzoneBucket bucket = getOzoneBucket(); - String keyName = UUID.randomUUID().toString(); - try { - // Simulating first mismatch: Write less data than expected - client.getProxy().setIsS3Request(true); - OzoneOutputStream out1 = bucket.createKey(keyName, - value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE, - new HashMap<>()); - out1.write(value.substring(0, value.length() - 1).getBytes(UTF_8)); - assertDoesNotThrow(out1::close); - - // Simulating second mismatch: Write more data than expected - OzoneOutputStream out2 = bucket.createKey(keyName, - value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE, - new HashMap<>()); - value += "1"; - out2.write(value.getBytes(UTF_8)); - assertDoesNotThrow(out2::close); - } finally { - client.getProxy().setIsS3Request(false); - } - } - private OzoneBucket getOzoneBucket() throws IOException { String volumeName = UUID.randomUUID().toString(); String bucketName = UUID.randomUUID().toString(); From a4e515338da4890e06379db254d127aa0d05855a Mon Sep 17 00:00:00 2001 From: peterxcli Date: Thu, 7 May 2026 02:39:24 +0800 Subject: [PATCH 4/8] HDDS-15193. Fix client checkstyle --- .../org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java index 36372bed778f..b38fd4d4dcac 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java @@ -484,6 +484,7 @@ public static class Builder { private boolean unsafeByteBufferConversion; private OzoneClientConfig clientConfig; private ReplicationConfig replicationConfig; + public Builder setMultipartUploadID(String uploadID) { this.multipartUploadID = uploadID; return this; From e2d2218bdaf20d7d9b3059628bd75f6e0936499e Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sat, 9 May 2026 04:15:48 +0800 Subject: [PATCH 5/8] use key output stream pre commit to do the content length validation Signed-off-by: peterxcli --- .../ozone/s3/endpoint/EndpointBase.java | 15 ++++++++++++ .../ozone/s3/endpoint/ObjectEndpoint.java | 8 +++++-- .../s3/endpoint/ObjectEndpointStreaming.java | 9 +++++-- .../hadoop/ozone/client/OzoneBucketStub.java | 17 +++++++------ .../ozone/s3/endpoint/EndpointTestUtils.java | 24 ++++++++++++++++--- .../ozone/s3/endpoint/TestObjectPut.java | 8 +++++++ .../ozone/s3/endpoint/TestPartUpload.java | 15 ++++++++++++ 7 files changed, 82 insertions(+), 14 deletions(-) diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java index 649b14b49cd7..9db960d6e921 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java @@ -29,6 +29,7 @@ import static org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_KEY; import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_ARGUMENT; +import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST; import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_TAG; import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.newError; import static org.apache.hadoop.ozone.s3.util.S3Consts.AWS_TAG_PREFIX; @@ -107,6 +108,7 @@ import org.apache.hadoop.ozone.s3.util.S3Utils; import org.apache.http.NameValuePair; import org.apache.http.client.utils.URLEncodedUtils; +import org.apache.ratis.util.function.CheckedRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -680,6 +682,19 @@ public static MessageDigest getSha256DigestInstance() { return SHA_256_PROVIDER.get(); } + protected static CheckedRunnable validateContentLength( + long expectedLength, long actualLength, String keyPath) { + return () -> { + if (actualLength != expectedLength) { + OS3Exception ex = newError(INVALID_REQUEST, keyPath); + ex.setErrorMessage(String.format( + "Request body length %d does not match expected length %d", + actualLength, expectedLength)); + throw ex; + } + }; + } + protected static String extractPartsCount(String eTag) { if (eTag.contains("-")) { String[] parts = eTag.replace("\"", "").split("-"); diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 5e301950465e..a79b7f090c0d 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -55,7 +55,6 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.ArrayList; -import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -298,6 +297,8 @@ customMetadata, tags, multiDigestInputStream, getHeaders(), output.getMetadata().put(OzoneConsts.ETAG, md5Hash); List> preCommits = new ArrayList<>(); + preCommits.add(validateContentLength( + length, putLength, keyPath)); String clientContentMD5 = getHeaders().getHeaderString(S3Consts.CHECKSUM_HEADER); if (clientContentMD5 != null) { @@ -912,13 +913,16 @@ private Response createMultipartKey(OzoneVolume volume, OzoneBucket ozoneBucket, new byte[getIOBufferSize(length)]); byte[] digest = multiDigestInputStream.getMessageDigest(OzoneConsts.MD5_HASH).digest(); String md5Hash = DatatypeConverter.printHexBinary(digest).toLowerCase(); + List> preCommits = new ArrayList<>(); + preCommits.add(validateContentLength(length, putLength, key)); String clientContentMD5 = getHeaders().getHeaderString(S3Consts.CHECKSUM_HEADER); if (clientContentMD5 != null) { CheckedRunnable checkContentMD5Hook = () -> { S3Utils.validateContentMD5(clientContentMD5, md5Hash, key); }; - ozoneOutputStream.getKeyOutputStream().setPreCommits(Collections.singletonList(checkContentMD5Hook)); + preCommits.add(checkContentMD5Hook); } + ozoneOutputStream.getKeyOutputStream().setPreCommits(preCommits); ozoneOutputStream.getMetadata().put(OzoneConsts.ETAG, md5Hash); outputStream = ozoneOutputStream; } diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java index 63f8281177f4..482ddaaff2e6 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java @@ -29,7 +29,6 @@ import java.security.DigestInputStream; import java.security.MessageDigest; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import javax.ws.rs.core.HttpHeaders; @@ -133,6 +132,8 @@ public static Pair putKeyWithStream( ((KeyMetadataAware)streamOutput).getMetadata().put(OzoneConsts.ETAG, md5Hash); List> preCommits = new ArrayList<>(); + preCommits.add(EndpointBase.validateContentLength( + length, writeLen, keyPath)); String clientContentMD5 = headers.getHeaderString(S3Consts.CHECKSUM_HEADER); if (clientContentMD5 != null) { CheckedRunnable checkContentMD5Hook = () -> { @@ -235,13 +236,17 @@ public static Response createMultipartKey(OzoneBucket ozoneBucket, String key, writeToStreamOutput(streamOutput, body, chunkSize, length); eTag = DatatypeConverter.printHexBinary( body.getMessageDigest(OzoneConsts.MD5_HASH).digest()).toLowerCase(); + List> preCommits = new ArrayList<>(); + preCommits.add(EndpointBase.validateContentLength( + length, putLength, key)); String clientContentMD5 = headers.getHeaderString(S3Consts.CHECKSUM_HEADER); if (clientContentMD5 != null) { CheckedRunnable checkContentMD5Hook = () -> { S3Utils.validateContentMD5(clientContentMD5, eTag, key); }; - streamOutput.getKeyDataStreamOutput().setPreCommits(Collections.singletonList(checkContentMD5Hook)); + preCommits.add(checkContentMD5Hook); } + streamOutput.getKeyDataStreamOutput().setPreCommits(preCommits); ((KeyMetadataAware)streamOutput).getMetadata().put(OzoneConsts.ETAG, eTag); METRICS.incPutKeySuccessLength(putLength); perf.appendMetaLatencyNanos(metadataLatencyNs); diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java index 8a8864c398bf..0795d36bb23a 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java @@ -145,7 +145,9 @@ public OzoneOutputStream createKey(String key, long size, new KeyMetadataAwareOutputStream(metadata) { @Override public void close() throws IOException { - keyContents.put(key, toByteArray()); + byte[] bytes = toByteArray(); + super.close(); + keyContents.put(key, bytes); keyDetails.put(key, new OzoneKeyDetails( getVolumeName(), getName(), @@ -158,7 +160,6 @@ public void close() throws IOException { UserGroupInformation.getCurrentUser().getShortUserName(), tags )); - super.close(); } }; @@ -179,7 +180,9 @@ public OzoneOutputStream rewriteKey(String keyName, long size, long existingKeyG new KeyMetadataAwareOutputStream(metadata) { @Override public void close() throws IOException { - keyContents.put(keyName, toByteArray()); + byte[] bytes = toByteArray(); + super.close(); + keyContents.put(keyName, bytes); keyDetails.put(keyName, new OzoneKeyDetails( getVolumeName(), getName(), @@ -190,7 +193,6 @@ public void close() throws IOException { new ArrayList<>(), finalReplicationCon, metadata, null, () -> readKey(keyName), true, null, null )); - super.close(); } }; @@ -518,8 +520,10 @@ public OzoneOutputStream createMultipartKey(String key, long size, new KeyMetadataAwareOutputStream((int) size, new HashMap<>()) { @Override public void close() throws IOException { - Part part = new Part(key + size, - toByteArray(), getMetadata().get(ETAG)); + byte[] bytes = toByteArray(); + String eTag = getMetadata().get(ETAG); + super.close(); + Part part = new Part(key + size, bytes, eTag); if (partList.get(key) == null) { Map parts = new TreeMap<>(); parts.put(partNumber, part); @@ -527,7 +531,6 @@ public void close() throws IOException { } else { partList.get(key).put(partNumber, part); } - super.close(); } }; return new OzoneOutputStreamStub(keyOutputStream, key + size); diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java index 11ba8daf8042..b2fd5ad6ab91 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java @@ -123,13 +123,28 @@ public static Response put( int partNumber, String uploadID, String content + ) throws IOException, OS3Exception { + return put(subject, bucket, key, partNumber, uploadID, + contentLength(content), content); + } + + /** Put with content, part number, upload ID, and explicit Content-Length. */ + public static Response put( + ObjectEndpoint subject, + String bucket, + String key, + int partNumber, + String uploadID, + long contentLength, + String content ) throws IOException, OS3Exception { if (uploadID != null) { subject.queryParamsForTest().set(S3Consts.QueryParams.UPLOAD_ID, uploadID); } subject.queryParamsForTest().setInt(S3Consts.QueryParams.PART_NUMBER, partNumber); when(subject.getContext().getMethod()).thenReturn(HttpMethod.PUT); - setLengthHeader(subject, content); + when(subject.getHeaders().getHeaderString(HttpHeaders.CONTENT_LENGTH)) + .thenReturn(String.valueOf(contentLength)); if (content == null) { return subject.put(bucket, key, null); @@ -264,9 +279,12 @@ public static OS3Exception assertErrorResponse(S3ErrorTable expected, CheckedSup } private static void setLengthHeader(ObjectEndpoint subject, String content) { - final long length = content != null ? content.length() : 0; when(subject.getHeaders().getHeaderString(HttpHeaders.CONTENT_LENGTH)) - .thenReturn(String.valueOf(length)); + .thenReturn(String.valueOf(contentLength(content))); + } + + private static long contentLength(String content) { + return content != null ? content.getBytes(UTF_8).length : 0; } private EndpointTestUtils() { diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java index 15398577b58b..8e8b387f6332 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java @@ -509,6 +509,14 @@ public void testPutEmptyObject() throws Exception { assertEquals(0, bucket.getKey(KEY_NAME).getDataSize()); } + @Test + public void testPutObjectRejectsIncompleteBody() { + assertErrorResponse(INVALID_REQUEST, + () -> put(objectEndpoint, BUCKET_NAME, KEY_NAME, 0, + null, CONTENT.length() + 1, CONTENT)); + assertThrows(IOException.class, () -> bucket.getKey(KEY_NAME)); + } + @Test public void testPutObjectWithContentMD5() throws Exception { // GIVEN diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java index fa9562597a82..f52648cc8409 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java @@ -132,6 +132,21 @@ public void testPartUploadWithIncorrectUploadID() { () -> put(rest, OzoneConsts.S3_BUCKET, OzoneConsts.KEY, 1, "random", "any")); } + @Test + public void testPartUploadRejectsIncompleteBody() throws Exception { + String uploadID = initiateMultipartUpload(rest, OzoneConsts.S3_BUCKET, + OzoneConsts.KEY); + String content = "Multipart Upload"; + + assertErrorResponse(S3ErrorTable.INVALID_REQUEST, + () -> put(rest, OzoneConsts.S3_BUCKET, OzoneConsts.KEY, + 1, uploadID, content.length() + 1, content)); + OzoneMultipartUploadPartListParts parts = + client.getObjectStore().getS3Bucket(OzoneConsts.S3_BUCKET) + .listParts(OzoneConsts.KEY, uploadID, 0, 100); + assertEquals(0, parts.getPartInfoList().size()); + } + @Test public void testPartUploadStreamContentLength() throws IOException, OS3Exception { From 5697574c1db911c6aa51d7582ae141594c518c67 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sat, 9 May 2026 17:44:16 +0800 Subject: [PATCH 6/8] Remove unused S3 request handling from RpcClient and EndpointBase; update ClientProtocolStub accordingly. Signed-off-by: peterxcli --- .../java/org/apache/hadoop/ozone/client/rpc/RpcClient.java | 7 ------- .../org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java | 1 - .../org/apache/hadoop/ozone/client/ClientProtocolStub.java | 5 ----- 3 files changed, 13 deletions(-) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index ad5769ba06d8..d1f302a5cbae 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -55,7 +55,6 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; import javax.crypto.Cipher; @@ -223,7 +222,6 @@ public class RpcClient implements ClientProtocol { private final MemoizedSupplier ecReconstructExecutor; private final ContainerClientMetrics clientMetrics; private final MemoizedSupplier writeExecutor; - private final AtomicBoolean isS3GRequest = new AtomicBoolean(false); private volatile OzoneFsServerDefaults serverDefaults; private volatile long serverDefaultsLastUpdate; private final long serverDefaultsValidityPeriod; @@ -2763,11 +2761,6 @@ public void setThreadLocalS3Auth( this.s3gUgi = UserGroupInformation.createRemoteUser(getThreadLocalS3Auth().getUserPrincipal()); } - @Override - public void setIsS3Request(boolean s3Request) { - this.isS3GRequest.set(s3Request); - } - @Override public S3Auth getThreadLocalS3Auth() { return ozoneManagerClient.getThreadLocalS3Auth(); diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java index 9db960d6e921..776a779c7663 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java @@ -200,7 +200,6 @@ public void initialization() { ClientProtocol clientProtocol = getClient().getObjectStore().getClientProxy(); clientProtocol.setThreadLocalS3Auth(s3Auth); - clientProtocol.setIsS3Request(true); bufferSize = (int) getOzoneConfiguration().getStorageSize( OZONE_S3G_CLIENT_BUFFER_SIZE_KEY, diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java index 35956bc1df8a..0ef0a7903885 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java @@ -688,11 +688,6 @@ public void setThreadLocalS3Auth(S3Auth s3Auth) { } - @Override - public void setIsS3Request(boolean isS3Request) { - - } - @Override public S3Auth getThreadLocalS3Auth() { return null; From 34555de3f2ca87e48f3f3e18da15aa5ee5487efb Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sat, 9 May 2026 17:49:02 +0800 Subject: [PATCH 7/8] streamline the precommit Signed-off-by: peterxcli --- .../ozone/client/io/KeyCommitOutput.java | 35 ++++++++++ .../ozone/client/io/KeyDataStreamOutput.java | 4 +- .../ozone/client/io/KeyOutputStream.java | 6 +- .../client/io/OzoneDataStreamOutput.java | 67 +++++++++++++------ .../ozone/client/protocol/ClientProtocol.java | 2 - .../s3/endpoint/ObjectEndpointStreaming.java | 4 +- .../hadoop/ozone/client/OzoneBucketStub.java | 25 ++++++- .../ozone/s3/endpoint/TestPartUpload.java | 16 +++++ 8 files changed, 130 insertions(+), 29 deletions(-) create mode 100644 hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyCommitOutput.java diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyCommitOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyCommitOutput.java new file mode 100644 index 000000000000..32a1638f050d --- /dev/null +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyCommitOutput.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client.io; + +import jakarta.annotation.Nonnull; +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; +import org.apache.ratis.util.function.CheckedRunnable; + +/** + * Common commit-time behavior for key output implementations. + */ +interface KeyCommitOutput extends KeyMetadataAware { + + void setPreCommits( + @Nonnull List> preCommits); + + OmMultipartCommitUploadPartInfo getCommitUploadPartInfo(); +} diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java index b38fd4d4dcac..119af2f04e5c 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java @@ -60,7 +60,7 @@ * TODO : currently not support multi-thread access. */ public class KeyDataStreamOutput extends AbstractDataStreamOutput - implements KeyMetadataAware { + implements KeyCommitOutput { private static final Logger LOG = LoggerFactory.getLogger(KeyDataStreamOutput.class); @@ -79,6 +79,7 @@ public class KeyDataStreamOutput extends AbstractDataStreamOutput private List> preCommits = Collections.emptyList(); + @Override public void setPreCommits(@Nonnull List> preCommits) { this.preCommits = preCommits; } @@ -455,6 +456,7 @@ public void close() throws IOException { } } + @Override public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { return blockDataStreamOutputEntryPool.getCommitUploadPartInfo(); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 791709eb52ed..4e22d22a36f7 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -76,7 +76,7 @@ * TODO : currently not support multi-thread access. */ public class KeyOutputStream extends OutputStream - implements Syncable, KeyMetadataAware { + implements Syncable, KeyCommitOutput { private static final Logger LOG = LoggerFactory.getLogger(KeyOutputStream.class); @@ -107,6 +107,7 @@ public class KeyOutputStream extends OutputStream private final KeyOutputStreamSemaphore keyOutputStreamSemaphore; private List> preCommits = Collections.emptyList(); + @Override public void setPreCommits(@Nonnull List> preCommits) { this.preCommits = preCommits; } @@ -657,7 +658,8 @@ private void closeInternal() throws IOException { } } - synchronized OmMultipartCommitUploadPartInfo + @Override + public synchronized OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { return blockOutputStreamEntryPool.getCommitUploadPartInfo(); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java index 7ce3f71b375b..52e79ae5f12d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -27,6 +28,7 @@ import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; +import org.apache.ratis.util.function.CheckedRunnable; /** * OzoneDataStreamOutput is used to write data into Ozone. @@ -100,40 +102,67 @@ public synchronized void close() throws IOException { } public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { - KeyDataStreamOutput keyDataStreamOutput = getKeyDataStreamOutput(); - if (keyDataStreamOutput != null) { - return keyDataStreamOutput.getCommitUploadPartInfo(); + KeyCommitOutput keyCommitOutput = getKeyCommitOutput(); + if (keyCommitOutput != null) { + return keyCommitOutput.getCommitUploadPartInfo(); } // Otherwise return null. return null; } public KeyDataStreamOutput getKeyDataStreamOutput() { + if (byteBufferStreamOutput instanceof KeyDataStreamOutput) { + return ((KeyDataStreamOutput) byteBufferStreamOutput); + } if (byteBufferStreamOutput instanceof OzoneOutputStream) { OutputStream outputStream = ((OzoneOutputStream) byteBufferStreamOutput).getOutputStream(); - if (outputStream instanceof KeyDataStreamOutput) { - return ((KeyDataStreamOutput) outputStream); - } else if (outputStream instanceof CryptoOutputStream) { - OutputStream wrappedStream = - ((CryptoOutputStream) outputStream).getWrappedStream(); - if (wrappedStream instanceof KeyDataStreamOutput) { - return ((KeyDataStreamOutput) wrappedStream); - } - } else if (outputStream instanceof CipherOutputStreamOzone) { - OutputStream wrappedStream = - ((CipherOutputStreamOzone) outputStream).getWrappedStream(); - if (wrappedStream instanceof KeyDataStreamOutput) { - return ((KeyDataStreamOutput) wrappedStream); - } + OutputStream unwrappedStream = unwrap(outputStream); + if (unwrappedStream instanceof KeyDataStreamOutput) { + return ((KeyDataStreamOutput) unwrappedStream); + } + } + // Otherwise return null. + return null; + } + + private KeyCommitOutput getKeyCommitOutput() { + if (byteBufferStreamOutput instanceof KeyCommitOutput) { + return (KeyCommitOutput) byteBufferStreamOutput; + } + if (byteBufferStreamOutput instanceof OzoneOutputStream) { + OutputStream outputStream = + ((OzoneOutputStream) byteBufferStreamOutput).getOutputStream(); + OutputStream unwrappedStream = unwrap(outputStream); + if (unwrappedStream instanceof KeyCommitOutput) { + return (KeyCommitOutput) unwrappedStream; } - } else if (byteBufferStreamOutput instanceof KeyDataStreamOutput) { - return ((KeyDataStreamOutput) byteBufferStreamOutput); } // Otherwise return null. return null; } + public void setPreCommits( + List> preCommits) { + KeyCommitOutput keyCommitOutput = getKeyCommitOutput(); + if (keyCommitOutput != null) { + keyCommitOutput.setPreCommits(preCommits); + return; + } + throw new IllegalStateException( + "Output stream is not backed by KeyCommitOutput: " + + byteBufferStreamOutput.getClass()); + } + + private static OutputStream unwrap(OutputStream outputStream) { + if (outputStream instanceof CryptoOutputStream) { + return ((CryptoOutputStream) outputStream).getWrappedStream(); + } else if (outputStream instanceof CipherOutputStreamOzone) { + return ((CipherOutputStreamOzone) outputStream).getWrappedStream(); + } + return outputStream; + } + @Override public void hflush() throws IOException { hsync(); diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java index c8611043fe44..8653048f8b21 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java @@ -1224,8 +1224,6 @@ OzoneKey headObject(String volumeName, String bucketName, */ void setThreadLocalS3Auth(S3Auth s3Auth); - void setIsS3Request(boolean isS3Request); - /** * Gets the S3 Authentication information that is attached to the thread. * @return S3 Authentication information. diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java index 482ddaaff2e6..2809afa13885 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java @@ -155,7 +155,7 @@ public static Pair putKeyWithStream( preCommits.add(checkSha256Hook); } - streamOutput.getKeyDataStreamOutput().setPreCommits(preCommits); + streamOutput.setPreCommits(preCommits); } return Pair.of(md5Hash, writeLen); } @@ -246,7 +246,7 @@ public static Response createMultipartKey(OzoneBucket ozoneBucket, String key, }; preCommits.add(checkContentMD5Hook); } - streamOutput.getKeyDataStreamOutput().setPreCommits(preCommits); + streamOutput.setPreCommits(preCommits); ((KeyMetadataAware)streamOutput).getMetadata().put(OzoneConsts.ETAG, eTag); METRICS.incPutKeySuccessLength(putLength); perf.appendMetaLatencyNanos(metadataLatencyNs); diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java index 0795d36bb23a..10ed365b2b07 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java @@ -328,6 +328,12 @@ public OzoneDataStreamOutput createMultipartStreamKey(String key, if (multipartInfo == null || !multipartInfo.getUploadId().equals(uploadID)) { throw new OMException(ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR); } else { + if (isECMultipartUpload(multipartInfo)) { + OzoneOutputStream outputStream = + createMultipartKey(key, size, partNumber, uploadID); + return new OzoneDataStreamOutputStub(outputStream, key + size); + } + ByteBufferStreamOutput byteBufferStreamOutput = new KeyMetadataAwareByteBufferStreamOutput(new HashMap<>()) { private final ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024); @@ -366,6 +372,12 @@ public void write(ByteBuffer b, int off, int len) } } + private boolean isECMultipartUpload(MultipartInfoStub multipartInfo) { + ReplicationConfig config = multipartInfo.getReplicationConfig(); + return config != null && + config.getReplicationType() == HddsProtos.ReplicationType.EC; + } + @Override public OzoneInputStream readKey(String key) throws IOException { return new OzoneInputStream(new ByteArrayInputStream(keyContents.get(key))); @@ -504,7 +516,8 @@ public OmMultipartInfo initiateMultipartUpload(String keyName, ReplicationConfig config, Map metadata, Map tags) throws IOException { String uploadID = UUID.randomUUID().toString(); - keyToMultipartUpload.put(keyName, new MultipartInfoStub(uploadID, metadata, tags)); + keyToMultipartUpload.put(keyName, + new MultipartInfoStub(uploadID, config, metadata, tags)); return new OmMultipartInfo(getVolumeName(), getName(), keyName, uploadID); } @@ -914,12 +927,14 @@ public Map getMetadata() { private static class MultipartInfoStub { private final String uploadId; + private final ReplicationConfig replicationConfig; private final Map metadata; private final Map tags; - MultipartInfoStub(String uploadId, Map metadata, - Map tags) { + MultipartInfoStub(String uploadId, ReplicationConfig replicationConfig, + Map metadata, Map tags) { this.uploadId = uploadId; + this.replicationConfig = replicationConfig; this.metadata = metadata; this.tags = tags; } @@ -928,6 +943,10 @@ public String getUploadId() { return uploadId; } + public ReplicationConfig getReplicationConfig() { + return replicationConfig; + } + public Map getMetadata() { return metadata; } diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java index f52648cc8409..bb320f7deeee 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java @@ -58,6 +58,7 @@ import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts; import org.apache.hadoop.ozone.s3.exception.OS3Exception; import org.apache.hadoop.ozone.s3.exception.S3ErrorTable; +import org.apache.hadoop.ozone.s3.util.S3StorageType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.Parameter; @@ -126,6 +127,21 @@ public void testPartUpload() throws Exception { } } + @Test + public void testPartUploadWithStandardIA() throws Exception { + when(headers.getHeaderString(STORAGE_CLASS_HEADER)) + .thenReturn(S3StorageType.STANDARD_IA.name(), (String)null); + String keyName = UUID.randomUUID().toString(); + String uploadID = initiateMultipartUpload(rest, OzoneConsts.S3_BUCKET, keyName); + + String content = "Multipart Upload"; + try (Response response = put(rest, OzoneConsts.S3_BUCKET, keyName, 1, uploadID, content)) { + assertNotNull(response.getHeaderString(OzoneConsts.ETAG)); + assertEquals(200, response.getStatus()); + } + assertContentLength(uploadID, keyName, content.length()); + } + @Test public void testPartUploadWithIncorrectUploadID() { assertErrorResponse(S3ErrorTable.NO_SUCH_UPLOAD, From 834e5654281b4180f743f0e13f5c398d36816af5 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sat, 9 May 2026 17:51:11 +0800 Subject: [PATCH 8/8] style Signed-off-by: peterxcli --- .../org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 3 +-- .../hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java | 6 ++---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index a79b7f090c0d..faedc033b383 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -297,8 +297,7 @@ customMetadata, tags, multiDigestInputStream, getHeaders(), output.getMetadata().put(OzoneConsts.ETAG, md5Hash); List> preCommits = new ArrayList<>(); - preCommits.add(validateContentLength( - length, putLength, keyPath)); + preCommits.add(validateContentLength(length, putLength, keyPath)); String clientContentMD5 = getHeaders().getHeaderString(S3Consts.CHECKSUM_HEADER); if (clientContentMD5 != null) { diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java index 2809afa13885..1fb292422b18 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java @@ -132,8 +132,7 @@ public static Pair putKeyWithStream( ((KeyMetadataAware)streamOutput).getMetadata().put(OzoneConsts.ETAG, md5Hash); List> preCommits = new ArrayList<>(); - preCommits.add(EndpointBase.validateContentLength( - length, writeLen, keyPath)); + preCommits.add(EndpointBase.validateContentLength(length, writeLen, keyPath)); String clientContentMD5 = headers.getHeaderString(S3Consts.CHECKSUM_HEADER); if (clientContentMD5 != null) { CheckedRunnable checkContentMD5Hook = () -> { @@ -237,8 +236,7 @@ public static Response createMultipartKey(OzoneBucket ozoneBucket, String key, eTag = DatatypeConverter.printHexBinary( body.getMessageDigest(OzoneConsts.MD5_HASH).digest()).toLowerCase(); List> preCommits = new ArrayList<>(); - preCommits.add(EndpointBase.validateContentLength( - length, putLength, key)); + preCommits.add(EndpointBase.validateContentLength(length, putLength, key)); String clientContentMD5 = headers.getHeaderString(S3Consts.CHECKSUM_HEADER); if (clientContentMD5 != null) { CheckedRunnable checkContentMD5Hook = () -> {