diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java index ee5c75487573..9f94384d4df2 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java @@ -74,14 +74,6 @@ public final class ECKeyOutputStream extends KeyOutputStream private final Future flushFuture; private final AtomicLong flushCheckpoint; - /** - * Indicates if an atomic write is required. When set to true, - * the amount of data written must match the declared size during the commit. - * A mismatch will prevent the commit from succeeding. - * This is essential for operations like S3 put to ensure atomicity. - */ - private boolean atomicKeyCreation; - private volatile boolean closed; private volatile boolean closing; // how much of data is actually written yet to underlying stream @@ -130,7 +122,6 @@ private ECKeyOutputStream(Builder builder) { return flushStripeFromQueue(); }); this.flushCheckpoint = new AtomicLong(0); - this.atomicKeyCreation = builder.getAtomicKeyCreation(); } @Override @@ -489,12 +480,6 @@ public void close() throws IOException { Preconditions.checkArgument(writeOffset == offset, "Expected writeOffset= " + writeOffset + " Expected offset=" + offset); - if (atomicKeyCreation) { - long expectedSize = blockOutputStreamEntryPool.getDataSize(); - Preconditions.checkState(expectedSize == offset, String.format( - "Expected: %d and actual %d write sizes do not match", - expectedSize, offset)); - } for (CheckedRunnable preCommit : preCommits) { preCommit.run(); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyCommitOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyCommitOutput.java new file mode 100644 index 000000000000..32a1638f050d --- /dev/null +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyCommitOutput.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client.io; + +import jakarta.annotation.Nonnull; +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; +import org.apache.ratis.util.function.CheckedRunnable; + +/** + * Common commit-time behavior for key output implementations. + */ +interface KeyCommitOutput extends KeyMetadataAware { + + void setPreCommits( + @Nonnull List> preCommits); + + OmMultipartCommitUploadPartInfo getCommitUploadPartInfo(); +} diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java index ceacd624e935..119af2f04e5c 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java @@ -60,7 +60,7 @@ * TODO : currently not support multi-thread access. */ public class KeyDataStreamOutput extends AbstractDataStreamOutput - implements KeyMetadataAware { + implements KeyCommitOutput { private static final Logger LOG = LoggerFactory.getLogger(KeyDataStreamOutput.class); @@ -77,16 +77,9 @@ public class KeyDataStreamOutput extends AbstractDataStreamOutput private long clientID; - /** - * Indicates if an atomic write is required. When set to true, - * the amount of data written must match the declared size during the commit. - * A mismatch will prevent the commit from succeeding. - * This is essential for operations like S3 put to ensure atomicity. - */ - private boolean atomicKeyCreation; - private List> preCommits = Collections.emptyList(); + @Override public void setPreCommits(@Nonnull List> preCommits) { this.preCommits = preCommits; } @@ -129,7 +122,6 @@ public KeyDataStreamOutput() { this.writeOffset = 0; this.clientID = 0L; - this.atomicKeyCreation = false; } @SuppressWarnings({"parameternumber", "squid:S00107"}) @@ -140,8 +132,7 @@ public KeyDataStreamOutput( OzoneManagerProtocol omClient, int chunkSize, String requestId, ReplicationConfig replicationConfig, String uploadID, int partNumber, boolean isMultipart, - boolean unsafeByteBufferConversion, - boolean atomicKeyCreation + boolean unsafeByteBufferConversion ) { super(HddsClientUtils.getRetryPolicyByException( config.getMaxRetryCount(), config.getRetryInterval())); @@ -162,7 +153,6 @@ public KeyDataStreamOutput( // encrypted bucket. this.writeOffset = 0; this.clientID = handler.getId(); - this.atomicKeyCreation = atomicKeyCreation; } /** @@ -457,12 +447,6 @@ public void close() throws IOException { if (!isException()) { Preconditions.checkArgument(writeOffset == offset); } - if (atomicKeyCreation) { - long expectedSize = blockDataStreamOutputEntryPool.getDataSize(); - Preconditions.checkArgument(expectedSize == offset, - String.format("Expected: %d and actual %d write sizes do not match", - expectedSize, offset)); - } for (CheckedRunnable preCommit : preCommits) { preCommit.run(); } @@ -472,6 +456,7 @@ public void close() throws IOException { } } + @Override public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { return blockDataStreamOutputEntryPool.getCommitUploadPartInfo(); } @@ -501,7 +486,6 @@ public static class Builder { private boolean unsafeByteBufferConversion; private OzoneClientConfig clientConfig; private ReplicationConfig replicationConfig; - private boolean atomicKeyCreation = false; public Builder setMultipartUploadID(String uploadID) { this.multipartUploadID = uploadID; @@ -553,11 +537,6 @@ public Builder setReplicationConfig(ReplicationConfig replConfig) { return this; } - public Builder setAtomicKeyCreation(boolean atomicKey) { - this.atomicKeyCreation = atomicKey; - return this; - } - public KeyDataStreamOutput build() { return new KeyDataStreamOutput( clientConfig, @@ -570,8 +549,7 @@ public KeyDataStreamOutput build() { multipartUploadID, multipartNumber, isMultipartKey, - unsafeByteBufferConversion, - atomicKeyCreation); + unsafeByteBufferConversion); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 2f9edfa94ea8..4e22d22a36f7 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -76,7 +76,7 @@ * TODO : currently not support multi-thread access. */ public class KeyOutputStream extends OutputStream - implements Syncable, KeyMetadataAware { + implements Syncable, KeyCommitOutput { private static final Logger LOG = LoggerFactory.getLogger(KeyOutputStream.class); @@ -98,13 +98,6 @@ public class KeyOutputStream extends OutputStream private long clientID; private StreamBufferArgs streamBufferArgs; - /** - * Indicates if an atomic write is required. When set to true, - * the amount of data written must match the declared size during the commit. - * A mismatch will prevent the commit from succeeding. - * This is essential for operations like S3 put to ensure atomicity. - */ - private boolean atomicKeyCreation; private ContainerClientMetrics clientMetrics; private OzoneManagerVersion ozoneManagerVersion; private final Lock writeLock = new ReentrantLock(); @@ -114,6 +107,7 @@ public class KeyOutputStream extends OutputStream private final KeyOutputStreamSemaphore keyOutputStreamSemaphore; private List> preCommits = Collections.emptyList(); + @Override public void setPreCommits(@Nonnull List> preCommits) { this.preCommits = preCommits; } @@ -186,7 +180,6 @@ public KeyOutputStream(Builder b) { this.isException = false; this.writeOffset = 0; this.clientID = b.getOpenHandler().getId(); - this.atomicKeyCreation = b.getAtomicKeyCreation(); this.streamBufferArgs = b.getStreamBufferArgs(); this.clientMetrics = b.getClientMetrics(); this.ozoneManagerVersion = b.ozoneManagerVersion; @@ -656,12 +649,6 @@ private void closeInternal() throws IOException { if (!isException) { Preconditions.checkArgument(writeOffset == offset); } - if (atomicKeyCreation) { - long expectedSize = blockOutputStreamEntryPool.getDataSize(); - Preconditions.checkState(expectedSize == offset, - String.format("Expected: %d and actual %d write sizes do not match", - expectedSize, offset)); - } for (CheckedRunnable preCommit : preCommits) { preCommit.run(); } @@ -671,7 +658,8 @@ private void closeInternal() throws IOException { } } - synchronized OmMultipartCommitUploadPartInfo + @Override + public synchronized OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { return blockOutputStreamEntryPool.getCommitUploadPartInfo(); } @@ -701,7 +689,6 @@ public static class Builder { private OzoneClientConfig clientConfig; private ReplicationConfig replicationConfig; private ContainerClientMetrics clientMetrics; - private boolean atomicKeyCreation = false; private StreamBufferArgs streamBufferArgs; private Supplier executorServiceSupplier; private OzoneManagerVersion ozoneManagerVersion; @@ -800,11 +787,6 @@ public Builder setReplicationConfig(ReplicationConfig replConfig) { return this; } - public Builder setAtomicKeyCreation(boolean atomicKey) { - this.atomicKeyCreation = atomicKey; - return this; - } - public Builder setClientMetrics(ContainerClientMetrics clientMetrics) { this.clientMetrics = clientMetrics; return this; @@ -814,10 +796,6 @@ public ContainerClientMetrics getClientMetrics() { return clientMetrics; } - public boolean getAtomicKeyCreation() { - return atomicKeyCreation; - } - public Builder setExecutorServiceSupplier(Supplier executorServiceSupplier) { this.executorServiceSupplier = executorServiceSupplier; return this; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java index 7ce3f71b375b..52e79ae5f12d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -27,6 +28,7 @@ import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; +import org.apache.ratis.util.function.CheckedRunnable; /** * OzoneDataStreamOutput is used to write data into Ozone. @@ -100,40 +102,67 @@ public synchronized void close() throws IOException { } public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { - KeyDataStreamOutput keyDataStreamOutput = getKeyDataStreamOutput(); - if (keyDataStreamOutput != null) { - return keyDataStreamOutput.getCommitUploadPartInfo(); + KeyCommitOutput keyCommitOutput = getKeyCommitOutput(); + if (keyCommitOutput != null) { + return keyCommitOutput.getCommitUploadPartInfo(); } // Otherwise return null. return null; } public KeyDataStreamOutput getKeyDataStreamOutput() { + if (byteBufferStreamOutput instanceof KeyDataStreamOutput) { + return ((KeyDataStreamOutput) byteBufferStreamOutput); + } if (byteBufferStreamOutput instanceof OzoneOutputStream) { OutputStream outputStream = ((OzoneOutputStream) byteBufferStreamOutput).getOutputStream(); - if (outputStream instanceof KeyDataStreamOutput) { - return ((KeyDataStreamOutput) outputStream); - } else if (outputStream instanceof CryptoOutputStream) { - OutputStream wrappedStream = - ((CryptoOutputStream) outputStream).getWrappedStream(); - if (wrappedStream instanceof KeyDataStreamOutput) { - return ((KeyDataStreamOutput) wrappedStream); - } - } else if (outputStream instanceof CipherOutputStreamOzone) { - OutputStream wrappedStream = - ((CipherOutputStreamOzone) outputStream).getWrappedStream(); - if (wrappedStream instanceof KeyDataStreamOutput) { - return ((KeyDataStreamOutput) wrappedStream); - } + OutputStream unwrappedStream = unwrap(outputStream); + if (unwrappedStream instanceof KeyDataStreamOutput) { + return ((KeyDataStreamOutput) unwrappedStream); + } + } + // Otherwise return null. + return null; + } + + private KeyCommitOutput getKeyCommitOutput() { + if (byteBufferStreamOutput instanceof KeyCommitOutput) { + return (KeyCommitOutput) byteBufferStreamOutput; + } + if (byteBufferStreamOutput instanceof OzoneOutputStream) { + OutputStream outputStream = + ((OzoneOutputStream) byteBufferStreamOutput).getOutputStream(); + OutputStream unwrappedStream = unwrap(outputStream); + if (unwrappedStream instanceof KeyCommitOutput) { + return (KeyCommitOutput) unwrappedStream; } - } else if (byteBufferStreamOutput instanceof KeyDataStreamOutput) { - return ((KeyDataStreamOutput) byteBufferStreamOutput); } // Otherwise return null. return null; } + public void setPreCommits( + List> preCommits) { + KeyCommitOutput keyCommitOutput = getKeyCommitOutput(); + if (keyCommitOutput != null) { + keyCommitOutput.setPreCommits(preCommits); + return; + } + throw new IllegalStateException( + "Output stream is not backed by KeyCommitOutput: " + + byteBufferStreamOutput.getClass()); + } + + private static OutputStream unwrap(OutputStream outputStream) { + if (outputStream instanceof CryptoOutputStream) { + return ((CryptoOutputStream) outputStream).getWrappedStream(); + } else if (outputStream instanceof CipherOutputStreamOzone) { + return ((CipherOutputStreamOzone) outputStream).getWrappedStream(); + } + return outputStream; + } + @Override public void hflush() throws IOException { hsync(); diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java index c8611043fe44..8653048f8b21 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java @@ -1224,8 +1224,6 @@ OzoneKey headObject(String volumeName, String bucketName, */ void setThreadLocalS3Auth(S3Auth s3Auth); - void setIsS3Request(boolean isS3Request); - /** * Gets the S3 Authentication information that is attached to the thread. * @return S3 Authentication information. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 0b058362a30f..d1f302a5cbae 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -55,7 +55,6 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; import javax.crypto.Cipher; @@ -223,7 +222,6 @@ public class RpcClient implements ClientProtocol { private final MemoizedSupplier ecReconstructExecutor; private final ContainerClientMetrics clientMetrics; private final MemoizedSupplier writeExecutor; - private final AtomicBoolean isS3GRequest = new AtomicBoolean(false); private volatile OzoneFsServerDefaults serverDefaults; private volatile long serverDefaultsLastUpdate; private final long serverDefaultsValidityPeriod; @@ -1473,13 +1471,6 @@ private OmKeyArgs.Builder createWriteKeyArgsBuilder(String volumeName, private OzoneOutputStream openOutputStream(OmKeyArgs keyArgs, long size) throws IOException { OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs); - // For bucket with layout OBJECT_STORE, when create an empty file (size=0), - // OM will set DataSize to OzoneConfigKeys#OZONE_SCM_BLOCK_SIZE, - // which will cause S3G's atomic write length check to fail, - // so reset size to 0 here. - if (isS3GRequest.get() && size == 0) { - openKey.getKeyInfo().setDataSize(0); - } return createOutputStream(openKey); } @@ -2588,15 +2579,12 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey) } private KeyDataStreamOutput.Builder newKeyOutputStreamBuilder() { - // Amazon S3 never adds partial objects, So for S3 requests we need to - // set atomicKeyCreation to true // refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html return new KeyDataStreamOutput.Builder() .setXceiverClientManager(xceiverClientManager) .setOmClient(ozoneManagerClient) .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) - .setConfig(clientConfig) - .setAtomicKeyCreation(isS3GRequest.get()); + .setConfig(clientConfig); } private OzoneOutputStream createOutputStream(OpenKeySession openKey) @@ -2670,7 +2658,6 @@ private KeyOutputStream.Builder createKeyOutputStream( .setOmClient(ozoneManagerClient) .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) .setConfig(clientConfig) - .setAtomicKeyCreation(isS3GRequest.get()) .setClientMetrics(clientMetrics) .setExecutorServiceSupplier(writeExecutor) .setStreamBufferArgs(streamBufferArgs) @@ -2774,11 +2761,6 @@ public void setThreadLocalS3Auth( this.s3gUgi = UserGroupInformation.createRemoteUser(getThreadLocalS3Auth().getUserPrincipal()); } - @Override - public void setIsS3Request(boolean s3Request) { - this.isS3GRequest.set(s3Request); - } - @Override public S3Auth getThreadLocalS3Auth() { return ozoneManagerClient.getThreadLocalS3Auth(); diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java index 84b423a28cab..c96fe8bfc5bf 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java @@ -220,40 +220,6 @@ public void testPutKeyWithECReplicationConfig() throws IOException { } } - /** - * This test validates that for S3G, - * the key upload process needs to be atomic. - * It simulates two mismatch scenarios where the actual write data size does - * not match the expected size. - */ - @Test - public void testPutKeySizeMismatch() throws IOException { - String value = new String(new byte[1024], UTF_8); - OzoneBucket bucket = getOzoneBucket(); - String keyName = UUID.randomUUID().toString(); - try { - // Simulating first mismatch: Write less data than expected - client.getProxy().setIsS3Request(true); - OzoneOutputStream out1 = bucket.createKey(keyName, - value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE, - new HashMap<>()); - out1.write(value.substring(0, value.length() - 1).getBytes(UTF_8)); - assertThrows(IllegalStateException.class, out1::close, - "Expected IllegalArgumentException due to size mismatch."); - - // Simulating second mismatch: Write more data than expected - OzoneOutputStream out2 = bucket.createKey(keyName, - value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE, - new HashMap<>()); - value += "1"; - out2.write(value.getBytes(UTF_8)); - assertThrows(IllegalStateException.class, out2::close, - "Expected IllegalArgumentException due to size mismatch."); - } finally { - client.getProxy().setIsS3Request(false); - } - } - private OzoneBucket getOzoneBucket() throws IOException { String volumeName = UUID.randomUUID().toString(); String bucketName = UUID.randomUUID().toString(); diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java index 649b14b49cd7..776a779c7663 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java @@ -29,6 +29,7 @@ import static org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_KEY; import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_ARGUMENT; +import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST; import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_TAG; import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.newError; import static org.apache.hadoop.ozone.s3.util.S3Consts.AWS_TAG_PREFIX; @@ -107,6 +108,7 @@ import org.apache.hadoop.ozone.s3.util.S3Utils; import org.apache.http.NameValuePair; import org.apache.http.client.utils.URLEncodedUtils; +import org.apache.ratis.util.function.CheckedRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -198,7 +200,6 @@ public void initialization() { ClientProtocol clientProtocol = getClient().getObjectStore().getClientProxy(); clientProtocol.setThreadLocalS3Auth(s3Auth); - clientProtocol.setIsS3Request(true); bufferSize = (int) getOzoneConfiguration().getStorageSize( OZONE_S3G_CLIENT_BUFFER_SIZE_KEY, @@ -680,6 +681,19 @@ public static MessageDigest getSha256DigestInstance() { return SHA_256_PROVIDER.get(); } + protected static CheckedRunnable validateContentLength( + long expectedLength, long actualLength, String keyPath) { + return () -> { + if (actualLength != expectedLength) { + OS3Exception ex = newError(INVALID_REQUEST, keyPath); + ex.setErrorMessage(String.format( + "Request body length %d does not match expected length %d", + actualLength, expectedLength)); + throw ex; + } + }; + } + protected static String extractPartsCount(String eTag) { if (eTag.contains("-")) { String[] parts = eTag.replace("\"", "").split("-"); diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 5e301950465e..faedc033b383 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -55,7 +55,6 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.ArrayList; -import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -298,6 +297,7 @@ customMetadata, tags, multiDigestInputStream, getHeaders(), output.getMetadata().put(OzoneConsts.ETAG, md5Hash); List> preCommits = new ArrayList<>(); + preCommits.add(validateContentLength(length, putLength, keyPath)); String clientContentMD5 = getHeaders().getHeaderString(S3Consts.CHECKSUM_HEADER); if (clientContentMD5 != null) { @@ -912,13 +912,16 @@ private Response createMultipartKey(OzoneVolume volume, OzoneBucket ozoneBucket, new byte[getIOBufferSize(length)]); byte[] digest = multiDigestInputStream.getMessageDigest(OzoneConsts.MD5_HASH).digest(); String md5Hash = DatatypeConverter.printHexBinary(digest).toLowerCase(); + List> preCommits = new ArrayList<>(); + preCommits.add(validateContentLength(length, putLength, key)); String clientContentMD5 = getHeaders().getHeaderString(S3Consts.CHECKSUM_HEADER); if (clientContentMD5 != null) { CheckedRunnable checkContentMD5Hook = () -> { S3Utils.validateContentMD5(clientContentMD5, md5Hash, key); }; - ozoneOutputStream.getKeyOutputStream().setPreCommits(Collections.singletonList(checkContentMD5Hook)); + preCommits.add(checkContentMD5Hook); } + ozoneOutputStream.getKeyOutputStream().setPreCommits(preCommits); ozoneOutputStream.getMetadata().put(OzoneConsts.ETAG, md5Hash); outputStream = ozoneOutputStream; } diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java index 63f8281177f4..1fb292422b18 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java @@ -29,7 +29,6 @@ import java.security.DigestInputStream; import java.security.MessageDigest; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import javax.ws.rs.core.HttpHeaders; @@ -133,6 +132,7 @@ public static Pair putKeyWithStream( ((KeyMetadataAware)streamOutput).getMetadata().put(OzoneConsts.ETAG, md5Hash); List> preCommits = new ArrayList<>(); + preCommits.add(EndpointBase.validateContentLength(length, writeLen, keyPath)); String clientContentMD5 = headers.getHeaderString(S3Consts.CHECKSUM_HEADER); if (clientContentMD5 != null) { CheckedRunnable checkContentMD5Hook = () -> { @@ -154,7 +154,7 @@ public static Pair putKeyWithStream( preCommits.add(checkSha256Hook); } - streamOutput.getKeyDataStreamOutput().setPreCommits(preCommits); + streamOutput.setPreCommits(preCommits); } return Pair.of(md5Hash, writeLen); } @@ -235,13 +235,16 @@ public static Response createMultipartKey(OzoneBucket ozoneBucket, String key, writeToStreamOutput(streamOutput, body, chunkSize, length); eTag = DatatypeConverter.printHexBinary( body.getMessageDigest(OzoneConsts.MD5_HASH).digest()).toLowerCase(); + List> preCommits = new ArrayList<>(); + preCommits.add(EndpointBase.validateContentLength(length, putLength, key)); String clientContentMD5 = headers.getHeaderString(S3Consts.CHECKSUM_HEADER); if (clientContentMD5 != null) { CheckedRunnable checkContentMD5Hook = () -> { S3Utils.validateContentMD5(clientContentMD5, eTag, key); }; - streamOutput.getKeyDataStreamOutput().setPreCommits(Collections.singletonList(checkContentMD5Hook)); + preCommits.add(checkContentMD5Hook); } + streamOutput.setPreCommits(preCommits); ((KeyMetadataAware)streamOutput).getMetadata().put(OzoneConsts.ETAG, eTag); METRICS.incPutKeySuccessLength(putLength); perf.appendMetaLatencyNanos(metadataLatencyNs); diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java index 35956bc1df8a..0ef0a7903885 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java @@ -688,11 +688,6 @@ public void setThreadLocalS3Auth(S3Auth s3Auth) { } - @Override - public void setIsS3Request(boolean isS3Request) { - - } - @Override public S3Auth getThreadLocalS3Auth() { return null; diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java index 8a8864c398bf..10ed365b2b07 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java @@ -145,7 +145,9 @@ public OzoneOutputStream createKey(String key, long size, new KeyMetadataAwareOutputStream(metadata) { @Override public void close() throws IOException { - keyContents.put(key, toByteArray()); + byte[] bytes = toByteArray(); + super.close(); + keyContents.put(key, bytes); keyDetails.put(key, new OzoneKeyDetails( getVolumeName(), getName(), @@ -158,7 +160,6 @@ public void close() throws IOException { UserGroupInformation.getCurrentUser().getShortUserName(), tags )); - super.close(); } }; @@ -179,7 +180,9 @@ public OzoneOutputStream rewriteKey(String keyName, long size, long existingKeyG new KeyMetadataAwareOutputStream(metadata) { @Override public void close() throws IOException { - keyContents.put(keyName, toByteArray()); + byte[] bytes = toByteArray(); + super.close(); + keyContents.put(keyName, bytes); keyDetails.put(keyName, new OzoneKeyDetails( getVolumeName(), getName(), @@ -190,7 +193,6 @@ public void close() throws IOException { new ArrayList<>(), finalReplicationCon, metadata, null, () -> readKey(keyName), true, null, null )); - super.close(); } }; @@ -326,6 +328,12 @@ public OzoneDataStreamOutput createMultipartStreamKey(String key, if (multipartInfo == null || !multipartInfo.getUploadId().equals(uploadID)) { throw new OMException(ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR); } else { + if (isECMultipartUpload(multipartInfo)) { + OzoneOutputStream outputStream = + createMultipartKey(key, size, partNumber, uploadID); + return new OzoneDataStreamOutputStub(outputStream, key + size); + } + ByteBufferStreamOutput byteBufferStreamOutput = new KeyMetadataAwareByteBufferStreamOutput(new HashMap<>()) { private final ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024); @@ -364,6 +372,12 @@ public void write(ByteBuffer b, int off, int len) } } + private boolean isECMultipartUpload(MultipartInfoStub multipartInfo) { + ReplicationConfig config = multipartInfo.getReplicationConfig(); + return config != null && + config.getReplicationType() == HddsProtos.ReplicationType.EC; + } + @Override public OzoneInputStream readKey(String key) throws IOException { return new OzoneInputStream(new ByteArrayInputStream(keyContents.get(key))); @@ -502,7 +516,8 @@ public OmMultipartInfo initiateMultipartUpload(String keyName, ReplicationConfig config, Map metadata, Map tags) throws IOException { String uploadID = UUID.randomUUID().toString(); - keyToMultipartUpload.put(keyName, new MultipartInfoStub(uploadID, metadata, tags)); + keyToMultipartUpload.put(keyName, + new MultipartInfoStub(uploadID, config, metadata, tags)); return new OmMultipartInfo(getVolumeName(), getName(), keyName, uploadID); } @@ -518,8 +533,10 @@ public OzoneOutputStream createMultipartKey(String key, long size, new KeyMetadataAwareOutputStream((int) size, new HashMap<>()) { @Override public void close() throws IOException { - Part part = new Part(key + size, - toByteArray(), getMetadata().get(ETAG)); + byte[] bytes = toByteArray(); + String eTag = getMetadata().get(ETAG); + super.close(); + Part part = new Part(key + size, bytes, eTag); if (partList.get(key) == null) { Map parts = new TreeMap<>(); parts.put(partNumber, part); @@ -527,7 +544,6 @@ public void close() throws IOException { } else { partList.get(key).put(partNumber, part); } - super.close(); } }; return new OzoneOutputStreamStub(keyOutputStream, key + size); @@ -911,12 +927,14 @@ public Map getMetadata() { private static class MultipartInfoStub { private final String uploadId; + private final ReplicationConfig replicationConfig; private final Map metadata; private final Map tags; - MultipartInfoStub(String uploadId, Map metadata, - Map tags) { + MultipartInfoStub(String uploadId, ReplicationConfig replicationConfig, + Map metadata, Map tags) { this.uploadId = uploadId; + this.replicationConfig = replicationConfig; this.metadata = metadata; this.tags = tags; } @@ -925,6 +943,10 @@ public String getUploadId() { return uploadId; } + public ReplicationConfig getReplicationConfig() { + return replicationConfig; + } + public Map getMetadata() { return metadata; } diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java index 11ba8daf8042..b2fd5ad6ab91 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java @@ -123,13 +123,28 @@ public static Response put( int partNumber, String uploadID, String content + ) throws IOException, OS3Exception { + return put(subject, bucket, key, partNumber, uploadID, + contentLength(content), content); + } + + /** Put with content, part number, upload ID, and explicit Content-Length. */ + public static Response put( + ObjectEndpoint subject, + String bucket, + String key, + int partNumber, + String uploadID, + long contentLength, + String content ) throws IOException, OS3Exception { if (uploadID != null) { subject.queryParamsForTest().set(S3Consts.QueryParams.UPLOAD_ID, uploadID); } subject.queryParamsForTest().setInt(S3Consts.QueryParams.PART_NUMBER, partNumber); when(subject.getContext().getMethod()).thenReturn(HttpMethod.PUT); - setLengthHeader(subject, content); + when(subject.getHeaders().getHeaderString(HttpHeaders.CONTENT_LENGTH)) + .thenReturn(String.valueOf(contentLength)); if (content == null) { return subject.put(bucket, key, null); @@ -264,9 +279,12 @@ public static OS3Exception assertErrorResponse(S3ErrorTable expected, CheckedSup } private static void setLengthHeader(ObjectEndpoint subject, String content) { - final long length = content != null ? content.length() : 0; when(subject.getHeaders().getHeaderString(HttpHeaders.CONTENT_LENGTH)) - .thenReturn(String.valueOf(length)); + .thenReturn(String.valueOf(contentLength(content))); + } + + private static long contentLength(String content) { + return content != null ? content.getBytes(UTF_8).length : 0; } private EndpointTestUtils() { diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java index 15398577b58b..8e8b387f6332 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java @@ -509,6 +509,14 @@ public void testPutEmptyObject() throws Exception { assertEquals(0, bucket.getKey(KEY_NAME).getDataSize()); } + @Test + public void testPutObjectRejectsIncompleteBody() { + assertErrorResponse(INVALID_REQUEST, + () -> put(objectEndpoint, BUCKET_NAME, KEY_NAME, 0, + null, CONTENT.length() + 1, CONTENT)); + assertThrows(IOException.class, () -> bucket.getKey(KEY_NAME)); + } + @Test public void testPutObjectWithContentMD5() throws Exception { // GIVEN diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java index fa9562597a82..bb320f7deeee 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java @@ -58,6 +58,7 @@ import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts; import org.apache.hadoop.ozone.s3.exception.OS3Exception; import org.apache.hadoop.ozone.s3.exception.S3ErrorTable; +import org.apache.hadoop.ozone.s3.util.S3StorageType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.Parameter; @@ -126,12 +127,42 @@ public void testPartUpload() throws Exception { } } + @Test + public void testPartUploadWithStandardIA() throws Exception { + when(headers.getHeaderString(STORAGE_CLASS_HEADER)) + .thenReturn(S3StorageType.STANDARD_IA.name(), (String)null); + String keyName = UUID.randomUUID().toString(); + String uploadID = initiateMultipartUpload(rest, OzoneConsts.S3_BUCKET, keyName); + + String content = "Multipart Upload"; + try (Response response = put(rest, OzoneConsts.S3_BUCKET, keyName, 1, uploadID, content)) { + assertNotNull(response.getHeaderString(OzoneConsts.ETAG)); + assertEquals(200, response.getStatus()); + } + assertContentLength(uploadID, keyName, content.length()); + } + @Test public void testPartUploadWithIncorrectUploadID() { assertErrorResponse(S3ErrorTable.NO_SUCH_UPLOAD, () -> put(rest, OzoneConsts.S3_BUCKET, OzoneConsts.KEY, 1, "random", "any")); } + @Test + public void testPartUploadRejectsIncompleteBody() throws Exception { + String uploadID = initiateMultipartUpload(rest, OzoneConsts.S3_BUCKET, + OzoneConsts.KEY); + String content = "Multipart Upload"; + + assertErrorResponse(S3ErrorTable.INVALID_REQUEST, + () -> put(rest, OzoneConsts.S3_BUCKET, OzoneConsts.KEY, + 1, uploadID, content.length() + 1, content)); + OzoneMultipartUploadPartListParts parts = + client.getObjectStore().getS3Bucket(OzoneConsts.S3_BUCKET) + .listParts(OzoneConsts.KEY, uploadID, 0, 100); + assertEquals(0, parts.getPartInfoList().size()); + } + @Test public void testPartUploadStreamContentLength() throws IOException, OS3Exception {