Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -976,20 +976,23 @@ void copy(OzoneVolume volume, DigestInputStream src, long srcKeyLen,
ReplicationConfig replication,
Map<String, String> metadata,
PerformanceStringBuilder perf, long startNanos,
Map<String, String> tags)
Map<String, String> tags,
S3ConditionalRequest.WriteConditions writeConditions)
throws IOException {
long copyLength;

if (isDatastreamEnabled() && !(replication != null &&
replication.getReplicationType() == EC) &&
srcKeyLen > getDatastreamMinLength()) {
perf.appendStreamMode();
copyLength = ObjectEndpointStreaming
.copyKeyWithStream(volume.getBucket(destBucket), destKey, srcKeyLen,
getChunkSize(), replication, metadata, src, perf, startNanos, tags);
getChunkSize(), replication, metadata, src, perf, startNanos, tags,
writeConditions);
} else {
try (OzoneOutputStream dest = getClientProtocol()
.createKey(volume.getName(), destBucket, destKey, srcKeyLen,
replication, metadata, tags)) {
try (OzoneOutputStream dest = openKeyForPut(
volume.getName(), destBucket, destKey, srcKeyLen,
replication, metadata, tags, writeConditions)) {
long metadataLatencyNs =
getMetrics().updateCopyKeyMetadataStats(startNanos);
perf.appendMetaLatencyNanos(metadataLatencyNs);
Expand Down Expand Up @@ -1050,6 +1053,14 @@ private CopyObjectResponse copyObject(OzoneVolume volume,
return copyObjectResponse;
}
}

String sourceKeyPath = sourceBucket + "/" + sourceKey;
S3ConditionalRequest.evaluateCopySourcePreconditions(
getHeaders(), sourceKeyPath, sourceKeyDetails);

S3ConditionalRequest.WriteConditions writeConditions =
S3ConditionalRequest.parseWriteConditions(getHeaders(), destkey);

long sourceKeyLen = sourceKeyDetails.getDataSize();

// Object tagging in copyObject with tagging directive
Expand Down Expand Up @@ -1091,7 +1102,7 @@ private CopyObjectResponse copyObject(OzoneVolume volume,
getMetrics().updateCopyKeyMetadataStats(startNanos);
sourceDigestInputStream = new DigestInputStream(src, getMD5DigestInstance());
copy(volume, sourceDigestInputStream, sourceKeyLen, destkey, destBucket, replicationConfig,
customMetadata, perf, startNanos, tags);
customMetadata, perf, startNanos, tags, writeConditions);
}

final OzoneKeyDetails destKeyDetails = getClientProtocol().getKeyDetails(
Expand All @@ -1104,9 +1115,17 @@ private CopyObjectResponse copyObject(OzoneVolume volume,
return copyObjectResponse;
} catch (OMException ex) {
if (ex.getResult() == ResultCodes.KEY_NOT_FOUND) {
if (getHeaders().getHeaderString(S3Consts.IF_MATCH_HEADER) != null) {
throw newError(PRECOND_FAILED, destkey, ex);
}
throw newError(S3ErrorTable.NO_SUCH_KEY, sourceKey, ex);
} else if (ex.getResult() == ResultCodes.BUCKET_NOT_FOUND) {
throw newError(S3ErrorTable.NO_SUCH_BUCKET, sourceBucket, ex);
} else if (ex.getResult() == ResultCodes.ATOMIC_WRITE_CONFLICT
|| ex.getResult() == ResultCodes.KEY_ALREADY_EXISTS
|| ex.getResult() == ResultCodes.ETAG_MISMATCH
|| ex.getResult() == ResultCodes.ETAG_NOT_AVAILABLE) {
throw newError(PRECOND_FAILED, destkey, ex);
}
throw newError(destBucket + "/" + destkey, ex);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,13 @@ public static long copyKeyWithStream(
ReplicationConfig replicationConfig,
Map<String, String> keyMetadata,
DigestInputStream body, PerformanceStringBuilder perf, long startNanos,
Map<String, String> tags)
Map<String, String> tags,
S3ConditionalRequest.WriteConditions writeConditions)
throws IOException {
long writeLen;
try (OzoneDataStreamOutput streamOutput = bucket.createStreamKey(keyPath,
length, replicationConfig, keyMetadata, tags)) {
try (OzoneDataStreamOutput streamOutput = openStreamKeyForPut(bucket,
keyPath, length, replicationConfig, keyMetadata, tags,
writeConditions)) {
long metadataLatencyNs =
METRICS.updateCopyKeyMetadataStats(startNanos);
writeLen = writeToStreamOutput(streamOutput, body, bufferSize, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,46 @@ static Response evaluateReadPreconditions(HttpHeaders headers,
return null;
}

/**
* Evaluates copy source preconditions for CopyObject operation.
* Checks x-amz-copy-source-if-match, x-amz-copy-source-if-none-match,
* x-amz-copy-source-if-modified-since, x-amz-copy-source-if-unmodified-since.
*
* @param headers HTTP headers containing the conditional headers
* @param sourceKeyPath path of the source key for error messages
* @param sourceKey the source key metadata
* @throws OS3Exception with 412 Precondition Failed if conditions are not met
*/
static void evaluateCopySourcePreconditions(HttpHeaders headers,
Comment thread
YutaLin marked this conversation as resolved.
Outdated
String sourceKeyPath, OzoneKey sourceKey) throws OS3Exception {
String currentETag = sourceKey.getMetadata().get(OzoneConsts.ETAG);

String ifMatch = headers.getHeaderString(S3Consts.COPY_SOURCE_IF_MATCH);
if (ifMatch != null && !eTagMatches(ifMatch, currentETag)) {
throw newError(PRECOND_FAILED, sourceKeyPath);
}

String ifUnmodifiedSince = headers.getHeaderString(
S3Consts.COPY_SOURCE_IF_UNMODIFIED_SINCE);
if (ifMatch == null && ifUnmodifiedSince != null
&& !matchesIfUnmodifiedSince(sourceKey, ifUnmodifiedSince)) {
throw newError(PRECOND_FAILED, sourceKeyPath);
}

String ifNoneMatch = headers.getHeaderString(
S3Consts.COPY_SOURCE_IF_NONE_MATCH);
if (ifNoneMatch != null && eTagMatches(ifNoneMatch, currentETag)) {
throw newError(PRECOND_FAILED, sourceKeyPath);
}

String ifModifiedSince = headers.getHeaderString(
S3Consts.COPY_SOURCE_IF_MODIFIED_SINCE);
if (ifNoneMatch == null && ifModifiedSince != null
&& !matchesIfModifiedSince(sourceKey, ifModifiedSince)) {
throw newError(PRECOND_FAILED, sourceKeyPath);
}
}

static boolean checkCopySourceModificationTime(Long lastModificationTime,
String copySourceIfModifiedSinceStr,
String copySourceIfUnmodifiedSinceStr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public final class S3Consts {

// Constants related to Range Header
public static final String COPY_SOURCE_IF_PREFIX = "x-amz-copy-source-if-";
public static final String COPY_SOURCE_IF_MATCH =
COPY_SOURCE_IF_PREFIX + "match";
public static final String COPY_SOURCE_IF_NONE_MATCH =
COPY_SOURCE_IF_PREFIX + "none-match";
public static final String COPY_SOURCE_IF_MODIFIED_SINCE =
COPY_SOURCE_IF_PREFIX + "modified-since";
public static final String COPY_SOURCE_IF_UNMODIFIED_SINCE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
import org.apache.hadoop.ozone.s3.util.S3Consts;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -460,6 +461,123 @@ public void testCopyObjectWithTags() throws Exception {
assertThat(e.getErrorMessage()).contains("The tagging copy directive specified is invalid");
}

@Test
void testCopyObjectWithSourceIfMatchSuccess() throws Exception {
assertSucceeds(() -> putObject(CONTENT));
OzoneKeyDetails sourceKey = bucket.getKey(KEY_NAME);
String sourceETag = sourceKey.getMetadata().get(OzoneConsts.ETAG);

when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
BUCKET_NAME + "/" + urlEncode(KEY_NAME));
when(headers.getHeaderString(S3Consts.COPY_SOURCE_IF_MATCH)).thenReturn("\"" + sourceETag + "\"");

assertSucceeds(() -> put(objectEndpoint, DEST_BUCKET_NAME, DEST_KEY, CONTENT));
assertKeyContent(destBucket, DEST_KEY, CONTENT);
}

@Test
void testCopyObjectWithSourceIfMatchFails() throws Exception {
assertSucceeds(() -> putObject(CONTENT));

when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
BUCKET_NAME + "/" + urlEncode(KEY_NAME));
when(headers.getHeaderString(S3Consts.COPY_SOURCE_IF_MATCH)).thenReturn("\"wrong-etag\"");

assertErrorResponse(S3ErrorTable.PRECOND_FAILED,
() -> put(objectEndpoint, DEST_BUCKET_NAME, DEST_KEY, CONTENT));
}

@Test
void testCopyObjectWithSourceIfNoneMatchSuccess() throws Exception {
assertSucceeds(() -> putObject(CONTENT));

when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
BUCKET_NAME + "/" + urlEncode(KEY_NAME));
when(headers.getHeaderString(S3Consts.COPY_SOURCE_IF_NONE_MATCH)).thenReturn("\"different-etag\"");

assertSucceeds(() -> put(objectEndpoint, DEST_BUCKET_NAME, DEST_KEY, CONTENT));
assertKeyContent(destBucket, DEST_KEY, CONTENT);
}

@Test
void testCopyObjectWithSourceIfNoneMatchFails() throws Exception {
assertSucceeds(() -> putObject(CONTENT));
OzoneKeyDetails sourceKey = bucket.getKey(KEY_NAME);
String sourceETag = sourceKey.getMetadata().get(OzoneConsts.ETAG);

when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
BUCKET_NAME + "/" + urlEncode(KEY_NAME));
when(headers.getHeaderString(S3Consts.COPY_SOURCE_IF_NONE_MATCH)).thenReturn("\"" + sourceETag + "\"");

assertErrorResponse(S3ErrorTable.PRECOND_FAILED,
() -> put(objectEndpoint, DEST_BUCKET_NAME, DEST_KEY, CONTENT));
}

@Test
void testCopyObjectWithDestinationIfNoneMatchSuccess() throws Exception {
assertSucceeds(() -> putObject(CONTENT));

when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
BUCKET_NAME + "/" + urlEncode(KEY_NAME));
when(headers.getHeaderString(S3Consts.IF_NONE_MATCH_HEADER)).thenReturn("*");

assertSucceeds(() -> put(objectEndpoint, DEST_BUCKET_NAME, DEST_KEY, CONTENT));
assertKeyContent(destBucket, DEST_KEY, CONTENT);
}

@Test
void testCopyObjectWithDestinationIfNoneMatchFails() throws Exception {
assertSucceeds(() -> putObject(CONTENT));

when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
BUCKET_NAME + "/" + urlEncode(KEY_NAME));
assertSucceeds(() -> put(objectEndpoint, DEST_BUCKET_NAME, DEST_KEY, CONTENT));

when(headers.getHeaderString(S3Consts.IF_NONE_MATCH_HEADER)).thenReturn("*");
assertErrorResponse(S3ErrorTable.PRECOND_FAILED,
() -> put(objectEndpoint, DEST_BUCKET_NAME, DEST_KEY, CONTENT));
}

@Test
void testCopyObjectWithDestinationIfMatchSuccess() throws Exception {
assertSucceeds(() -> putObject(CONTENT));

when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
BUCKET_NAME + "/" + urlEncode(KEY_NAME));
assertSucceeds(() -> put(objectEndpoint, DEST_BUCKET_NAME, DEST_KEY, CONTENT));
OzoneKeyDetails destKey = destBucket.getKey(DEST_KEY);
String destETag = destKey.getMetadata().get(OzoneConsts.ETAG);

when(headers.getHeaderString(S3Consts.IF_MATCH_HEADER)).thenReturn("\"" + destETag + "\"");
assertSucceeds(() -> put(objectEndpoint, DEST_BUCKET_NAME, DEST_KEY, CONTENT));
}

@Test
void testCopyObjectWithDestinationIfMatchFails() throws Exception {
assertSucceeds(() -> putObject(CONTENT));

when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
BUCKET_NAME + "/" + urlEncode(KEY_NAME));
assertSucceeds(() -> put(objectEndpoint, DEST_BUCKET_NAME, DEST_KEY, CONTENT));

when(headers.getHeaderString(S3Consts.IF_MATCH_HEADER)).thenReturn("\"wrong-etag\"");

assertErrorResponse(S3ErrorTable.PRECOND_FAILED,
() -> put(objectEndpoint, DEST_BUCKET_NAME, DEST_KEY, CONTENT));
}

@Test
void testCopyObjectWithDestinationIfMatchKeyNotFound() throws Exception {
assertSucceeds(() -> putObject(CONTENT));

when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
BUCKET_NAME + "/" + urlEncode(KEY_NAME));
when(headers.getHeaderString(S3Consts.IF_MATCH_HEADER)).thenReturn("\"some-etag\"");

assertErrorResponse(S3ErrorTable.PRECOND_FAILED,
() -> put(objectEndpoint, DEST_BUCKET_NAME, DEST_KEY, CONTENT));
}

@Test
void testInvalidStorageType() {
when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn("random");
Expand Down