Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public S3Client s3() {
.applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations)
.applyMutation(s3FileIOProperties::applyUserAgentConfigurations)
.applyMutation(s3FileIOProperties::applyRetryConfigurations)
.applyMutation(s3FileIOProperties::applyMetricsPublisherConfiguration)
.build();
}

Expand All @@ -76,6 +77,7 @@ public S3AsyncClient s3Async() {
.applyMutation(awsClientProperties::applyClientCredentialConfigurations)
.applyMutation(awsClientProperties::applyLegacyMd5Plugin)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.applyMutation(s3FileIOProperties::applyMetricsPublisherConfiguration)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import software.amazon.awssdk.core.retry.conditions.RetryCondition;
import software.amazon.awssdk.core.retry.conditions.RetryOnExceptionsCondition;
import software.amazon.awssdk.core.retry.conditions.TokenBucketRetryCondition;
import software.amazon.awssdk.metrics.MetricPublisher;
import software.amazon.awssdk.services.s3.S3BaseClientBuilder;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
Expand Down Expand Up @@ -506,6 +507,14 @@ public class S3FileIOProperties implements Serializable {

public static final boolean S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY_DEFAULT = true;

/**
* Configure a custom {@link MetricPublisher} implementation for the S3 client.
*
* <p>The class must implement {@link MetricPublisher}. It will be instantiated via a static
* {@code create(Map)} factory method if available, otherwise via a no-arg constructor.
*/
public static final String METRICS_PUBLISHER_IMPL = "s3.metrics-publisher-impl";

private String sseType;
private String sseKey;
private String sseMd5;
Expand Down Expand Up @@ -547,6 +556,7 @@ public class S3FileIOProperties implements Serializable {
private long s3RetryMaxWaitMs;

private boolean s3DirectoryBucketListPrefixAsDirectory;
private final String metricsPublisherImpl;
private final Map<String, String> allProperties;

public S3FileIOProperties() {
Expand Down Expand Up @@ -590,6 +600,7 @@ public S3FileIOProperties() {
this.s3AnalyticsacceleratorProperties = Maps.newHashMap();
this.isS3CRTEnabled = S3_CRT_ENABLED_DEFAULT;
this.s3CrtMaxConcurrency = S3_CRT_MAX_CONCURRENCY_DEFAULT;
this.metricsPublisherImpl = null;
this.allProperties = Maps.newHashMap();

ValidationException.check(
Expand Down Expand Up @@ -715,6 +726,7 @@ public S3FileIOProperties(Map<String, String> properties) {
this.s3CrtMaxConcurrency =
PropertyUtil.propertyAsInt(
properties, S3_CRT_MAX_CONCURRENCY, S3_CRT_MAX_CONCURRENCY_DEFAULT);
this.metricsPublisherImpl = properties.get(METRICS_PUBLISHER_IMPL);

ValidationException.check(
keyIdAccessKeyBothConfigured(),
Expand Down Expand Up @@ -1197,4 +1209,58 @@ private <T> T loadSdkPluginConfigurations(String impl, Map<String, String> prope
public Map<String, String> properties() {
return allProperties;
}

public String metricsPublisherImpl() {
return metricsPublisherImpl;
}

/**
* Configure a custom {@link MetricPublisher} for an S3 client.
*
* <p>Sample usage:
*
* <pre>
* S3Client.builder().applyMutation(s3FileIOProperties::applyMetricsPublisherConfiguration)
* </pre>
*/
public <T extends S3BaseClientBuilder<T, ?>> void applyMetricsPublisherConfiguration(T builder) {
if (metricsPublisherImpl != null) {
builder.overrideConfiguration(c -> c.addMetricPublisher(loadMetricPublisher()));
}
}

private MetricPublisher loadMetricPublisher() {
// Phase 1: look up the factory. A NoSuchMethodException here means the class does not
// declare `create(Map)` — fall back to the no-arg constructor path. Any OTHER exception
// from a factory that DOES exist (e.g. the factory itself throws) should surface via the
// wrapping IllegalArgumentException in phase 2 so users can distinguish "wrong signature"
// from "misconfigured factory".
DynMethods.StaticMethod factory = null;
try {
factory =
DynMethods.builder("create")
.hiddenImpl(metricsPublisherImpl, Map.class)
.buildStaticChecked();
} catch (NoSuchMethodException e) {
// Expected when the implementation doesn't provide a create(Map) factory — fall through.
}

// Phase 2: invoke whichever path we found. Exceptions here are real failures and are
// surfaced with the precise path that failed so the user can diagnose.
try {
if (factory != null) {
return (MetricPublisher) factory.invoke(allProperties);
}
return Class.forName(metricsPublisherImpl)
.asSubclass(MetricPublisher.class)
.getDeclaredConstructor()
.newInstance();
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(
"Cannot create MetricPublisher from class %s via %s",
metricsPublisherImpl, factory != null ? "create(Map)" : "no-arg constructor"),
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.mockito.Mockito;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.metrics.MetricCollection;
import software.amazon.awssdk.metrics.MetricPublisher;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
Expand Down Expand Up @@ -587,4 +589,137 @@ public void testChunkedEncodingDisabled() {
.as("chunked encoding should be disabled when explicitly set to false")
.isFalse();
}

@Test
public void testApplyMetricsPublisherConfigurationWithFactoryMethod() {
Map<String, String> properties = Maps.newHashMap();
properties.put(
S3FileIOProperties.METRICS_PUBLISHER_IMPL, FactoryMetricPublisher.class.getName());
S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties);

S3ClientBuilder builder = S3Client.builder();
s3FileIOProperties.applyMetricsPublisherConfiguration(builder);

assertThat(builder.overrideConfiguration()).isNotNull();
assertThat(builder.overrideConfiguration().metricPublishers()).hasSize(1);
assertThat(builder.overrideConfiguration().metricPublishers().get(0))
.isInstanceOf(FactoryMetricPublisher.class);
}

@Test
public void testApplyMetricsPublisherConfigurationWithNoArgConstructor() {
Map<String, String> properties = Maps.newHashMap();
properties.put(S3FileIOProperties.METRICS_PUBLISHER_IMPL, NoArgMetricPublisher.class.getName());
S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties);

S3ClientBuilder builder = S3Client.builder();
s3FileIOProperties.applyMetricsPublisherConfiguration(builder);

assertThat(builder.overrideConfiguration()).isNotNull();
assertThat(builder.overrideConfiguration().metricPublishers()).hasSize(1);
assertThat(builder.overrideConfiguration().metricPublishers().get(0))
.isInstanceOf(NoArgMetricPublisher.class);
}

@Test
public void testApplyMetricsPublisherConfigurationDisabled() {
Map<String, String> properties = Maps.newHashMap();
S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties);

S3ClientBuilder builder = S3Client.builder();
s3FileIOProperties.applyMetricsPublisherConfiguration(builder);

assertThat(s3FileIOProperties.metricsPublisherImpl()).isNull();
}

@Test
public void testApplyMetricsPublisherConfigurationInvalidClass() {
Map<String, String> properties = Maps.newHashMap();
properties.put(S3FileIOProperties.METRICS_PUBLISHER_IMPL, "com.invalid.NonExistentClass");
S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties);

S3ClientBuilder builder = S3Client.builder();
assertThatThrownBy(() -> s3FileIOProperties.applyMetricsPublisherConfiguration(builder))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot create MetricPublisher from class");
}

@Test
public void testApplyMetricsPublisherConfigurationFactoryThrows() {
// The implementation provides a static create(Map) factory that throws. The error message
// must identify the factory path so the user can diagnose a bad implementation rather than
// being misled into thinking they need to add a no-arg constructor.
Map<String, String> properties = Maps.newHashMap();
properties.put(
S3FileIOProperties.METRICS_PUBLISHER_IMPL, ThrowingFactoryMetricPublisher.class.getName());
S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties);

S3ClientBuilder builder = S3Client.builder();
assertThatThrownBy(() -> s3FileIOProperties.applyMetricsPublisherConfiguration(builder))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("via create(Map)")
.hasMessageContaining(ThrowingFactoryMetricPublisher.class.getName());
}

@Test
public void testApplyMetricsPublisherConfigurationNoArgConstructorThrows() {
// The implementation does not declare create(Map) but its no-arg constructor throws.
// The error message must identify the no-arg constructor path.
Map<String, String> properties = Maps.newHashMap();
properties.put(
S3FileIOProperties.METRICS_PUBLISHER_IMPL, ThrowingNoArgMetricPublisher.class.getName());
S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties);

S3ClientBuilder builder = S3Client.builder();
assertThatThrownBy(() -> s3FileIOProperties.applyMetricsPublisherConfiguration(builder))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("via no-arg constructor")
.hasMessageContaining(ThrowingNoArgMetricPublisher.class.getName());
}

public static class FactoryMetricPublisher implements MetricPublisher {
public static FactoryMetricPublisher create(Map<String, String> properties) {
return new FactoryMetricPublisher();
}

@Override
public void publish(MetricCollection metricCollection) {}

@Override
public void close() {}
}

public static class NoArgMetricPublisher implements MetricPublisher {
public NoArgMetricPublisher() {}

@Override
public void publish(MetricCollection metricCollection) {}

@Override
public void close() {}
}

public static class ThrowingFactoryMetricPublisher implements MetricPublisher {
public static ThrowingFactoryMetricPublisher create(Map<String, String> properties) {
throw new IllegalStateException("factory boom");
}

@Override
public void publish(MetricCollection metricCollection) {}

@Override
public void close() {}
}

public static class ThrowingNoArgMetricPublisher implements MetricPublisher {
public ThrowingNoArgMetricPublisher() {
throw new IllegalStateException("ctor boom");
}

@Override
public void publish(MetricCollection metricCollection) {}

@Override
public void close() {}
}
}
15 changes: 15 additions & 0 deletions docs/docs/aws.md
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,21 @@ spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCata

For more details on using S3 Dual-stack, please refer [Using dual-stack endpoints from the AWS CLI and the AWS SDKs](https://docs.aws.amazon.com/AmazonS3/latest/userguide/dual-stack-endpoints.html#dual-stack-endpoints-cli)

### S3 Custom MetricPublisher

A custom `MetricPublisher` implementation can be plugged into the S3 client by setting the `s3.metrics-publisher-impl` catalog property to the fully qualified class name of a class that implements `software.amazon.awssdk.metrics.MetricPublisher`.

The class will be instantiated via a static `create(Map<String, String>)` factory method if available, otherwise via a no-arg constructor.

For example, to use a custom MetricPublisher with Spark 3.5, you can start the Spark SQL shell with:
```
spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket2/my/key/prefix \
--conf spark.sql.catalog.my_catalog.type=glue \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.s3.metrics-publisher-impl=org.example.MyMetricPublisher
```

## AWS Client Customization

Many organizations have customized their way of configuring AWS clients with their own credential provider, access proxy, retry strategy, etc.
Expand Down