Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.cloud.s3;

/** Access intent passed to {@link CometS3CredentialProvider#getCredentialsForPath}. */
public enum CometS3AccessMode {
/** GET / HEAD / LIST. All Comet native scan paths request this today. */
READ,
/** PUT / POST / DELETE / multipart. Reserved for future native write paths. */
WRITE
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.cloud.s3;

import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* JNI entry point invoked from native code to resolve {@link CometS3CredentialProvider}.
*
* <p>The provider is resolved once via {@link ServiceLoader} and cached in a {@code static final}
* field. A query falling back from Comet to Spark mid-execution therefore sees identical
* credentials, since both paths resolve from the same executor classpath.
*
* <p>Multiple registered impls fail fast at class-load; chaining is a vendor-side concern.
*/
public final class CometS3CredentialDispatcher {

private static final Logger LOG = LoggerFactory.getLogger(CometS3CredentialDispatcher.class);

private static final CometS3CredentialProvider PROVIDER = resolve();
private static final CometS3AccessMode[] MODES = CometS3AccessMode.values();

private CometS3CredentialDispatcher() {}

public static boolean isProviderRegistered() {
return PROVIDER != null;
}

/** Invoked by native code. {@code mode} is the {@link CometS3AccessMode} ordinal. */
public static CometS3Credentials getCredentialsForPath(String bucket, String path, int mode)
throws Exception {
if (PROVIDER == null) {
throw new IllegalStateException(
"No CometS3CredentialProvider registered; check META-INF/services on the classpath");
}
if (mode < 0 || mode >= MODES.length) {
throw new IllegalArgumentException("Invalid CometS3AccessMode ordinal: " + mode);
}
CometS3AccessMode accessMode = MODES[mode];
if (LOG.isDebugEnabled()) {
LOG.debug("Fetching credentials for bucket={} path={} mode={}", bucket, path, accessMode);
}
return PROVIDER.getCredentialsForPath(bucket, path, accessMode);
}

private static CometS3CredentialProvider resolve() {
List<CometS3CredentialProvider> impls = new ArrayList<>();
for (CometS3CredentialProvider impl : ServiceLoader.load(CometS3CredentialProvider.class)) {
impls.add(impl);
}
if (impls.isEmpty()) {
LOG.info(
"No CometS3CredentialProvider registered; native S3 readers will use the default "
+ "AWS credential chain");
return null;
}
if (impls.size() > 1) {
List<String> names =
impls.stream().map(p -> p.getClass().getName()).collect(Collectors.toList());
throw new IllegalStateException(
"Multiple CometS3CredentialProvider impls on classpath: " + names);
}
CometS3CredentialProvider provider = impls.get(0);
LOG.info("Registered CometS3CredentialProvider: {}", provider.getClass().getName());
return provider;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.cloud.s3;

/**
* SPI for supplying AWS credentials to Comet's native S3 readers, which bypass Spark's Hadoop S3A
* code path and cannot reach signer-based or path-aware credential mechanisms through the standard
* parameterless {@code AWSCredentialsProvider.getCredentials()} contract.
*
* <p>Peer to {@code org.apache.hadoop.fs.s3a.AwsSignerInitializer} (Hadoop S3A) and {@code
* org.apache.iceberg.aws.AwsClientFactory} (Iceberg-Java): the same shape vendors already implement
* for those two, with a smaller surface (one method).
*
* <h2>Why a new SPI?</h2>
*
* No existing contract carries per-path AWS credentials from vendor code to Comet's native readers:
*
* <ul>
* <li>{@code org.apache.spark.deploy.security.cloud.CloudCredentialsProvider} yields a single JWT
* per service name. No path argument and does not return AWS credentials.
* <li>Hadoop S3A custom signers hide path-aware logic inside {@code Signer.sign(request,
* credentials)}. Credentials never leave the signing pipeline, and the underlying secret key
* is an HMAC key (not present in the signed output), so running the signer on a synthesized
* request cannot recover it.
* <li>Reflecting into vendor singletons encodes per-vendor class and lifecycle details in Comet
* and breaks silently on vendor upgrades.
* <li>A Comet-specific HTTP STS endpoint would push a new public API onto every vendor; vendors
* ship this logic as Java code, not HTTP.
* </ul>
*
* <p>Vendors register an implementation via {@code
* META-INF/services/org.apache.comet.cloud.s3.CometS3CredentialProvider}. {@link
* #getCredentialsForPath} may be invoked concurrently from many native tokio tasks, so
* implementations must be thread-safe.
*
* <p>Returns credentials or throws; there is no fall-through return value. A provider that is only
* authoritative for some paths should resolve the default AWS chain itself for the rest. See the
* user guide on cloud credential providers.
*/
public interface CometS3CredentialProvider {

/**
* @param bucket S3 bucket name (no scheme, no path)
* @param path object key or prefix, leading slash included (matches the URL path component)
* @param mode access intent for this request
* @return non-null credentials; {@code null} is a contract violation
*/
CometS3Credentials getCredentialsForPath(String bucket, String path, CometS3AccessMode mode)
throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.cloud.s3;

import java.util.Objects;

/**
* Credentials returned by a {@link CometS3CredentialProvider}. Fields are read back over JNI by
* name, so the field names are part of the cross-language contract.
*
* <p>{@code sessionToken} is null for non-STS credentials. {@code expirationEpochMillis} of {@code
* 0} means "unknown"; the Iceberg path then caps opendal's cache at a short fallback to avoid
* serving stale credentials for the executor lifetime.
*/
public final class CometS3Credentials {

private final String accessKeyId;
private final String secretAccessKey;
private final String sessionToken;
private final long expirationEpochMillis;

public CometS3Credentials(
String accessKeyId, String secretAccessKey, String sessionToken, long expirationEpochMillis) {
this.accessKeyId = Objects.requireNonNull(accessKeyId, "accessKeyId");
this.secretAccessKey = Objects.requireNonNull(secretAccessKey, "secretAccessKey");
this.sessionToken = sessionToken;
this.expirationEpochMillis = expirationEpochMillis;
}

public String getAccessKeyId() {
return accessKeyId;
}

public String getSecretAccessKey() {
return secretAccessKey;
}

public String getSessionToken() {
return sessionToken;
}

public long getExpirationEpochMillis() {
return expirationEpochMillis;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.cloud.s3;

import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

public class CometS3CredentialDispatcherTest {

private static final int READ = CometS3AccessMode.READ.ordinal();
private static final int WRITE = CometS3AccessMode.WRITE.ordinal();

@Before
public void resetTestProvider() {
TestCometS3CredentialProvider.reset();
}

@Test
public void providerIsRegisteredFromTestClasspath() {
assertTrue(CometS3CredentialDispatcher.isProviderRegistered());
}

@Test
public void getCredentialsRoundTripsThroughProvider() throws Exception {
CometS3Credentials creds =
CometS3CredentialDispatcher.getCredentialsForPath("my-bucket", "path/to/object", READ);

assertNotNull(creds);
assertEquals("AKIATEST", creds.getAccessKeyId());
assertEquals("secret", creds.getSecretAccessKey());
assertEquals("session-tok", creds.getSessionToken());
assertEquals(0L, creds.getExpirationEpochMillis());

assertEquals(1, TestCometS3CredentialProvider.callCount.get());
assertEquals("my-bucket", TestCometS3CredentialProvider.lastBucket);
assertEquals("path/to/object", TestCometS3CredentialProvider.lastPath);
assertEquals(CometS3AccessMode.READ, TestCometS3CredentialProvider.lastMode);
}

@Test
public void writeModeIsForwarded() throws Exception {
CometS3CredentialDispatcher.getCredentialsForPath("b", "k", WRITE);
assertEquals(CometS3AccessMode.WRITE, TestCometS3CredentialProvider.lastMode);
}

@Test
public void unknownModeRejected() {
assertThrows(
IllegalArgumentException.class,
() -> CometS3CredentialDispatcher.getCredentialsForPath("b", "k", 99));
}

@Test
public void providerExceptionsPropagate() {
IllegalStateException boom = new IllegalStateException("simulated STS failure");
TestCometS3CredentialProvider.throwOnNext = boom;

Exception thrown =
assertThrows(
Exception.class,
() -> CometS3CredentialDispatcher.getCredentialsForPath("b", "k", READ));
assertSame(boom, thrown);
}

@Test
public void nullSessionTokenAllowed() throws Exception {
TestCometS3CredentialProvider.nextResult = new CometS3Credentials("AKIA", "sec", null, 0L);

CometS3Credentials creds = CometS3CredentialDispatcher.getCredentialsForPath("b", "k", READ);

assertNull(creds.getSessionToken());
}

@Test
public void providerReceivesEachCallSeparately() throws Exception {
CometS3CredentialDispatcher.getCredentialsForPath("b1", "k1", READ);
CometS3CredentialDispatcher.getCredentialsForPath("b2", "k2", READ);
CometS3CredentialDispatcher.getCredentialsForPath("b3", "k3", READ);

assertEquals(3, TestCometS3CredentialProvider.callCount.get());
assertEquals("b3", TestCometS3CredentialProvider.lastBucket);
assertEquals("k3", TestCometS3CredentialProvider.lastPath);
}
}
Loading
Loading