Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.StreamBuffer;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
Expand Down Expand Up @@ -255,7 +257,10 @@ void commitKey(long offset) throws IOException {
commitUploadPartInfo =
omClient.commitMultipartUploadPart(buildKeyArgs(), openID);
} else {
omClient.commitKey(buildKeyArgs(), openID);
final OptionalLong modificationTime = omClient.commitKeyWithModificationTime(buildKeyArgs(), openID);
if (modificationTime.isPresent()) {
metadata.put(OzoneConsts.MODIFICATION_TIME, String.valueOf(modificationTime.getAsLong()));
}
}
} else {
LOG.warn("Closing KeyDataStreamOutput, but key args is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.apache.hadoop.hdds.client.ContainerBlockID;
Expand All @@ -39,6 +40,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
Expand Down Expand Up @@ -330,7 +332,10 @@ void commitKey(long offset) throws IOException {
commitUploadPartInfo =
omClient.commitMultipartUploadPart(buildKeyArgs(), openID);
} else {
omClient.commitKey(buildKeyArgs(), openID);
final OptionalLong modificationTime = omClient.commitKeyWithModificationTime(buildKeyArgs(), openID);
if (modificationTime.isPresent()) {
metadata.put(OzoneConsts.MODIFICATION_TIME, String.valueOf(modificationTime.getAsLong()));
}
}
} else {
LOG.warn("Closing KeyOutputStream, but key args is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.hadoop.ozone.om.helpers;

import java.util.OptionalLong;

/**
* This class holds information about the response from commit multipart
* upload part request.
Expand All @@ -27,9 +29,16 @@ public class OmMultipartCommitUploadPartInfo {

private final String eTag;

private final Long modificationTime;

public OmMultipartCommitUploadPartInfo(String partName, String eTag) {
this(partName, eTag, null);
}

public OmMultipartCommitUploadPartInfo(String partName, String eTag, Long modificationTime) {
this.partName = partName;
this.eTag = eTag;
this.modificationTime = modificationTime;
}

public String getETag() {
Expand All @@ -39,4 +48,8 @@ public String getETag() {
public String getPartName() {
return partName;
}

public OptionalLong getModificationTime() {
return modificationTime == null ? OptionalLong.empty() : OptionalLong.of(modificationTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.UUID;
import org.apache.hadoop.fs.SafeModeAction;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
Expand Down Expand Up @@ -251,6 +252,17 @@ default void commitKey(OmKeyArgs args, long clientID)
"this to be implemented, as write requests use a new approach.");
}

/**
* Commit a key and optionally return the stored modification time (epoch millis).
* <p>
* This is backward compatible with older OM versions which do not return
* the value in {@code CommitKeyResponse}.
*/
default OptionalLong commitKeyWithModificationTime(OmKeyArgs args, long clientID) throws IOException {
commitKey(args, clientID);
return OptionalLong.empty();
}

/**
* Synchronize the key length. This will make the change from the client
* visible. The client is identified by the clientID.
Expand Down Expand Up @@ -1096,7 +1108,6 @@ default CancelPrepareResponse cancelOzoneManagerPrepare() throws IOException {
EchoRPCResponse echoRPCReq(byte[] payloadReq, int payloadSizeResp,
boolean writeToRatis) throws IOException;


/**
* Start the lease recovery of a file.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class S3Auth {
private String userPrincipal;
// Optional STS session token when using temporary credentials
private String sessionToken;
// S3 action without s3: prefix (e.g. PutObject), set by S3 Gateway for use in finer-grained STS permissions.
private String s3Action;

public S3Auth(final String stringToSign,
final String signature,
Expand Down Expand Up @@ -67,4 +69,12 @@ public String getSessionToken() {
public void setSessionToken(String sessionToken) {
this.sessionToken = sessionToken;
}

public String getS3Action() {
return s3Action;
}

public void setS3Action(String s3Action) {
this.s3Action = s3Action;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -328,6 +329,9 @@ private OMResponse submitRequest(OMRequest omRequest)
if (threadLocalS3Auth.get().getSessionToken() != null) {
s3AuthBuilder.setSessionToken(threadLocalS3Auth.get().getSessionToken());
}
if (threadLocalS3Auth.get().getS3Action() != null) {
s3AuthBuilder.setS3Action(threadLocalS3Auth.get().getS3Action());
}

builder.setS3Authentication(s3AuthBuilder.build());
}
Expand Down Expand Up @@ -818,6 +822,11 @@ public void commitKey(OmKeyArgs args, long clientId)
updateKey(args, clientId, false, false);
}

@Override
public OptionalLong commitKeyWithModificationTime(OmKeyArgs args, long clientId) throws IOException {
return updateKeyAndGetModificationTime(args, clientId, false, false);
}

@Override
public void recoverKey(OmKeyArgs args, long clientId)
throws IOException {
Expand All @@ -839,6 +848,12 @@ public static void setReplicationConfig(ReplicationConfig replication,

private void updateKey(OmKeyArgs args, long clientId, boolean hsync, boolean recovery)
throws IOException {
// Preserve legacy behavior (ignore response payload).
updateKeyAndGetModificationTime(args, clientId, hsync, recovery);
}

private OptionalLong updateKeyAndGetModificationTime(OmKeyArgs args, long clientId, boolean hsync, boolean recovery)
throws IOException {
CommitKeyRequest.Builder req = CommitKeyRequest.newBuilder();
List<OmKeyLocationInfo> locationInfoList = args.getLocationInfoList();
Objects.requireNonNull(locationInfoList, "locationInfoList == null");
Expand All @@ -864,7 +879,11 @@ private void updateKey(OmKeyArgs args, long clientId, boolean hsync, boolean rec
.setCommitKeyRequest(req)
.build();

handleError(submitRequest(omRequest));
final OMResponse resp = handleError(submitRequest(omRequest));
if (resp.hasCommitKeyResponse() && resp.getCommitKeyResponse().hasModificationTime()) {
return OptionalLong.of(resp.getCommitKeyResponse().getModificationTime());
}
return OptionalLong.empty();
}

@Override
Expand Down Expand Up @@ -1703,10 +1722,8 @@ public OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
handleError(submitRequest(omRequest))
.getCommitMultiPartUploadResponse();

OmMultipartCommitUploadPartInfo info = new
OmMultipartCommitUploadPartInfo(response.getPartName(),
response.getETag());
return info;
final Long modificationTime = response.hasModificationTime() ? response.getModificationTime() : null;
return new OmMultipartCommitUploadPartInfo(response.getPartName(), response.getETag(), modificationTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1572,7 +1572,8 @@ message CommitKeyRequest {
}

message CommitKeyResponse {

// Modification time of the committed key, set by OM during preExecute.
optional uint64 modificationTime = 1;
}

message AllocateBlockRequest {
Expand Down Expand Up @@ -1777,6 +1778,8 @@ message MultipartCommitUploadPartResponse {
optional string partName = 1;
// This one is returned as Etag for S3.
optional string eTag = 2;
// Modification time of the committed part key, set by OM during preExecute.
optional uint64 modificationTime = 3;
}

message MultipartUploadCompleteRequest {
Expand Down Expand Up @@ -2311,6 +2314,17 @@ message S3Authentication {
// If present, indicates this request uses STS temporary credentials
// and carries the base64-encoded session token for validation.
optional string sessionToken = 4;
// The following fields are resolved from the STS session token by OM.
// They are used to enforce STS session policies during Ratis apply.
// They must be written or cleared by the OM leader when the token is validated.
optional string resolvedStsSessionPolicy = 5;
optional string resolvedStsRoleArn = 6;
optional string resolvedStsOriginalAccessKeyId = 7;
optional string resolvedStsTempAccessKeyId = 8;
optional string resolvedStsSecretKeyId = 9;
// S3 action without the s3: prefix for this request (e.g. GetObject), set by S3 Gateway for use
// in finer-grained STS permissions.
optional string s3Action = 10;
}

message RecoverLeaseRequest {
Expand Down
18 changes: 16 additions & 2 deletions hadoop-ozone/interface-client/src/main/resources/proto.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5803,7 +5803,15 @@
]
},
{
"name": "CommitKeyResponse"
"name": "CommitKeyResponse",
"fields": [
{
"id": 1,
"name": "modificationTime",
"type": "uint64",
"optional": true
}
]
},
{
"name": "AllocateBlockRequest",
Expand Down Expand Up @@ -6372,6 +6380,12 @@
"name": "eTag",
"type": "string",
"optional": true
},
{
"id": 3,
"name": "modificationTime",
"type": "uint64",
"optional": true
}
]
},
Expand Down Expand Up @@ -8656,4 +8670,4 @@
}
}
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatusLight;
import org.apache.hadoop.ozone.om.helpers.S3VolumeContext;
import org.apache.hadoop.ozone.om.protocolPB.grpc.GrpcClientConstants;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3Authentication;
import org.apache.hadoop.ozone.security.STSTokenIdentifier;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
Expand Down Expand Up @@ -236,9 +237,7 @@ public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
try {
if (isAclEnabled) {
if (isStsS3Request()) {
// We need to be able to tell the difference between being able to download a file and merely seeing the file
// name in a list. Use READ for download ability and LIST (here) for listing.
// When listPrefix is set (original S3 ListObjects prefix), authorize LIST on that prefix for the whole
// When listPrefix is set (original S3 ListObjects prefix), authorize READ on that prefix for the whole
// listing, including FSO traversal where keyName is an internal directory (e.g. userA) under prefix user.
final String listPrefix = args.getListPrefix();
final String keyName = args.getKeyName();
Expand All @@ -258,7 +257,7 @@ public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
} else {
aclKey = "*";
}
checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.LIST, bucket.realVolume(), bucket.realBucket(), aclKey);
checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ, bucket.realVolume(), bucket.realBucket(), aclKey);
} else {
checkAcls(getResourceType(args), StoreType.OZONE, ACLType.READ,
bucket, args.getKeyName());
Expand Down Expand Up @@ -304,12 +303,7 @@ public OzoneFileStatus getFileStatus(OmKeyArgs args) throws IOException {

try {
if (isAclEnabled) {
if (isStsS3Request()) {
checkAcls(getResourceType(args), StoreType.OZONE, ACLType.LIST, bucket, args.getKeyName());
} else {
checkAcls(getResourceType(args), StoreType.OZONE, ACLType.READ,
bucket, args.getKeyName());
}
checkAcls(getResourceType(args), StoreType.OZONE, ACLType.READ, bucket, args.getKeyName());
}
metrics.incNumGetFileStatus();
return keyManager.getFileStatus(args, getClientAddress());
Expand Down Expand Up @@ -384,7 +378,7 @@ public ListKeysResult listKeys(String volumeName, String bucketName,
final String aclKey = (keyPrefix == null || keyPrefix.isEmpty()) ? "*" : keyPrefix;
captureLatencyNs(
perfMetrics.getListKeysAclCheckLatencyNs(), () -> checkAcls(
ResourceType.KEY, StoreType.OZONE, ACLType.LIST, bucket.realVolume(), bucket.realBucket(), aclKey));
ResourceType.KEY, StoreType.OZONE, ACLType.READ, bucket.realVolume(), bucket.realBucket(), aclKey));
} else {
captureLatencyNs(perfMetrics.getListKeysAclCheckLatencyNs(), () ->
checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.LIST,
Expand Down Expand Up @@ -634,7 +628,8 @@ public boolean checkAcls(ResourceType resType, StoreType storeType,
public boolean checkAcls(OzoneObj obj, RequestContext context,
boolean throwIfPermissionDenied) throws OMException {

final RequestContext normalizedRequestContext = maybeAttachSessionPolicyFromThreadLocal(context);
final RequestContext normalizedRequestContext = maybeAttachS3ActionFromThreadLocal(
maybeAttachSessionPolicyFromThreadLocal(context));

if (!captureLatencyNs(perfMetrics::setCheckAccessLatencyNs,
() -> accessAuthorizer.checkAccess(obj, normalizedRequestContext))) {
Expand Down Expand Up @@ -692,6 +687,22 @@ private RequestContext maybeAttachSessionPolicyFromThreadLocal(RequestContext co
.build();
}

/**
* Attaches s3 action to RequestContext if an S3Authentication is found in the Ozone Manager thread local,
* and it has an s3 action. Otherwise, returns the RequestContext as it was before.
* @param context the original RequestContext
* @return RequestContext as before or with s3 action embedded
*/
private RequestContext maybeAttachS3ActionFromThreadLocal(RequestContext context) {
final S3Authentication s3Authentication = OzoneManager.getS3Auth();
if (s3Authentication == null || !s3Authentication.hasS3Action()) {
return context;
}
return context.toBuilder()
.setS3Action(s3Authentication.getS3Action())
.build();
}

static String getClientAddress() {
String clientMachine = Server.getRemoteAddress();
if (clientMachine == null) { //not a RPC client
Expand Down
Loading