diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java index 2dec40e7f897..3236aee6c44e 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java @@ -58,6 +58,7 @@ public S3Client s3() { .applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations) .applyMutation(s3FileIOProperties::applyUserAgentConfigurations) .applyMutation(s3FileIOProperties::applyRetryConfigurations) + .applyMutation(s3FileIOProperties::applyMetricsPublisherConfiguration) .build(); } @@ -76,6 +77,7 @@ public S3AsyncClient s3Async() { .applyMutation(awsClientProperties::applyClientCredentialConfigurations) .applyMutation(awsClientProperties::applyLegacyMd5Plugin) .applyMutation(s3FileIOProperties::applyEndpointConfigurations) + .applyMutation(s3FileIOProperties::applyMetricsPublisherConfiguration) .build(); } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java index 922010d61d27..e39517e893cf 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java @@ -49,6 +49,7 @@ import software.amazon.awssdk.core.retry.conditions.RetryCondition; import software.amazon.awssdk.core.retry.conditions.RetryOnExceptionsCondition; import software.amazon.awssdk.core.retry.conditions.TokenBucketRetryCondition; +import software.amazon.awssdk.metrics.MetricPublisher; import software.amazon.awssdk.services.s3.S3BaseClientBuilder; import software.amazon.awssdk.services.s3.S3ClientBuilder; import software.amazon.awssdk.services.s3.S3Configuration; @@ -506,6 +507,14 @@ public class S3FileIOProperties implements Serializable { public static final boolean S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY_DEFAULT = true; + /** + * Configure a custom {@link MetricPublisher} implementation for the S3 client. + * + *

The class must implement {@link MetricPublisher}. It will be instantiated via a static + * {@code create(Map)} factory method if available, otherwise via a no-arg constructor. + */ + public static final String METRICS_PUBLISHER_IMPL = "s3.metrics-publisher-impl"; + private String sseType; private String sseKey; private String sseMd5; @@ -547,6 +556,7 @@ public class S3FileIOProperties implements Serializable { private long s3RetryMaxWaitMs; private boolean s3DirectoryBucketListPrefixAsDirectory; + private final String metricsPublisherImpl; private final Map allProperties; public S3FileIOProperties() { @@ -590,6 +600,7 @@ public S3FileIOProperties() { this.s3AnalyticsacceleratorProperties = Maps.newHashMap(); this.isS3CRTEnabled = S3_CRT_ENABLED_DEFAULT; this.s3CrtMaxConcurrency = S3_CRT_MAX_CONCURRENCY_DEFAULT; + this.metricsPublisherImpl = null; this.allProperties = Maps.newHashMap(); ValidationException.check( @@ -715,6 +726,7 @@ public S3FileIOProperties(Map properties) { this.s3CrtMaxConcurrency = PropertyUtil.propertyAsInt( properties, S3_CRT_MAX_CONCURRENCY, S3_CRT_MAX_CONCURRENCY_DEFAULT); + this.metricsPublisherImpl = properties.get(METRICS_PUBLISHER_IMPL); ValidationException.check( keyIdAccessKeyBothConfigured(), @@ -1197,4 +1209,58 @@ private T loadSdkPluginConfigurations(String impl, Map prope public Map properties() { return allProperties; } + + public String metricsPublisherImpl() { + return metricsPublisherImpl; + } + + /** + * Configure a custom {@link MetricPublisher} for an S3 client. + * + *

Sample usage: + * + *

+   *     S3Client.builder().applyMutation(s3FileIOProperties::applyMetricsPublisherConfiguration)
+   * 
+ */ + public > void applyMetricsPublisherConfiguration(T builder) { + if (metricsPublisherImpl != null) { + builder.overrideConfiguration(c -> c.addMetricPublisher(loadMetricPublisher())); + } + } + + private MetricPublisher loadMetricPublisher() { + // Phase 1: look up the factory. A NoSuchMethodException here means the class does not + // declare `create(Map)` — fall back to the no-arg constructor path. Any OTHER exception + // from a factory that DOES exist (e.g. the factory itself throws) should surface via the + // wrapping IllegalArgumentException in phase 2 so users can distinguish "wrong signature" + // from "misconfigured factory". + DynMethods.StaticMethod factory = null; + try { + factory = + DynMethods.builder("create") + .hiddenImpl(metricsPublisherImpl, Map.class) + .buildStaticChecked(); + } catch (NoSuchMethodException e) { + // Expected when the implementation doesn't provide a create(Map) factory — fall through. + } + + // Phase 2: invoke whichever path we found. Exceptions here are real failures and are + // surfaced with the precise path that failed so the user can diagnose. + try { + if (factory != null) { + return (MetricPublisher) factory.invoke(allProperties); + } + return Class.forName(metricsPublisherImpl) + .asSubclass(MetricPublisher.class) + .getDeclaredConstructor() + .newInstance(); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format( + "Cannot create MetricPublisher from class %s via %s", + metricsPublisherImpl, factory != null ? "create(Map)" : "no-arg constructor"), + e); + } + } } diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java index 953f73d45d4a..9fb38035c532 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java @@ -37,6 +37,8 @@ import org.mockito.Mockito; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.metrics.MetricCollection; +import software.amazon.awssdk.metrics.MetricPublisher; import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; @@ -587,4 +589,137 @@ public void testChunkedEncodingDisabled() { .as("chunked encoding should be disabled when explicitly set to false") .isFalse(); } + + @Test + public void testApplyMetricsPublisherConfigurationWithFactoryMethod() { + Map properties = Maps.newHashMap(); + properties.put( + S3FileIOProperties.METRICS_PUBLISHER_IMPL, FactoryMetricPublisher.class.getName()); + S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties); + + S3ClientBuilder builder = S3Client.builder(); + s3FileIOProperties.applyMetricsPublisherConfiguration(builder); + + assertThat(builder.overrideConfiguration()).isNotNull(); + assertThat(builder.overrideConfiguration().metricPublishers()).hasSize(1); + assertThat(builder.overrideConfiguration().metricPublishers().get(0)) + .isInstanceOf(FactoryMetricPublisher.class); + } + + @Test + public void testApplyMetricsPublisherConfigurationWithNoArgConstructor() { + Map properties = Maps.newHashMap(); + properties.put(S3FileIOProperties.METRICS_PUBLISHER_IMPL, NoArgMetricPublisher.class.getName()); + S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties); + + S3ClientBuilder builder = S3Client.builder(); + s3FileIOProperties.applyMetricsPublisherConfiguration(builder); + + assertThat(builder.overrideConfiguration()).isNotNull(); + assertThat(builder.overrideConfiguration().metricPublishers()).hasSize(1); + assertThat(builder.overrideConfiguration().metricPublishers().get(0)) + .isInstanceOf(NoArgMetricPublisher.class); + } + + @Test + public void testApplyMetricsPublisherConfigurationDisabled() { + Map properties = Maps.newHashMap(); + S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties); + + S3ClientBuilder builder = S3Client.builder(); + s3FileIOProperties.applyMetricsPublisherConfiguration(builder); + + assertThat(s3FileIOProperties.metricsPublisherImpl()).isNull(); + } + + @Test + public void testApplyMetricsPublisherConfigurationInvalidClass() { + Map properties = Maps.newHashMap(); + properties.put(S3FileIOProperties.METRICS_PUBLISHER_IMPL, "com.invalid.NonExistentClass"); + S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties); + + S3ClientBuilder builder = S3Client.builder(); + assertThatThrownBy(() -> s3FileIOProperties.applyMetricsPublisherConfiguration(builder)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot create MetricPublisher from class"); + } + + @Test + public void testApplyMetricsPublisherConfigurationFactoryThrows() { + // The implementation provides a static create(Map) factory that throws. The error message + // must identify the factory path so the user can diagnose a bad implementation rather than + // being misled into thinking they need to add a no-arg constructor. + Map properties = Maps.newHashMap(); + properties.put( + S3FileIOProperties.METRICS_PUBLISHER_IMPL, ThrowingFactoryMetricPublisher.class.getName()); + S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties); + + S3ClientBuilder builder = S3Client.builder(); + assertThatThrownBy(() -> s3FileIOProperties.applyMetricsPublisherConfiguration(builder)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("via create(Map)") + .hasMessageContaining(ThrowingFactoryMetricPublisher.class.getName()); + } + + @Test + public void testApplyMetricsPublisherConfigurationNoArgConstructorThrows() { + // The implementation does not declare create(Map) but its no-arg constructor throws. + // The error message must identify the no-arg constructor path. + Map properties = Maps.newHashMap(); + properties.put( + S3FileIOProperties.METRICS_PUBLISHER_IMPL, ThrowingNoArgMetricPublisher.class.getName()); + S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties); + + S3ClientBuilder builder = S3Client.builder(); + assertThatThrownBy(() -> s3FileIOProperties.applyMetricsPublisherConfiguration(builder)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("via no-arg constructor") + .hasMessageContaining(ThrowingNoArgMetricPublisher.class.getName()); + } + + public static class FactoryMetricPublisher implements MetricPublisher { + public static FactoryMetricPublisher create(Map properties) { + return new FactoryMetricPublisher(); + } + + @Override + public void publish(MetricCollection metricCollection) {} + + @Override + public void close() {} + } + + public static class NoArgMetricPublisher implements MetricPublisher { + public NoArgMetricPublisher() {} + + @Override + public void publish(MetricCollection metricCollection) {} + + @Override + public void close() {} + } + + public static class ThrowingFactoryMetricPublisher implements MetricPublisher { + public static ThrowingFactoryMetricPublisher create(Map properties) { + throw new IllegalStateException("factory boom"); + } + + @Override + public void publish(MetricCollection metricCollection) {} + + @Override + public void close() {} + } + + public static class ThrowingNoArgMetricPublisher implements MetricPublisher { + public ThrowingNoArgMetricPublisher() { + throw new IllegalStateException("ctor boom"); + } + + @Override + public void publish(MetricCollection metricCollection) {} + + @Override + public void close() {} + } } diff --git a/docs/docs/aws.md b/docs/docs/aws.md index fba4921f73a5..e2a1fe814f4c 100644 --- a/docs/docs/aws.md +++ b/docs/docs/aws.md @@ -659,6 +659,21 @@ spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCata For more details on using S3 Dual-stack, please refer [Using dual-stack endpoints from the AWS CLI and the AWS SDKs](https://docs.aws.amazon.com/AmazonS3/latest/userguide/dual-stack-endpoints.html#dual-stack-endpoints-cli) +### S3 Custom MetricPublisher + +A custom `MetricPublisher` implementation can be plugged into the S3 client by setting the `s3.metrics-publisher-impl` catalog property to the fully qualified class name of a class that implements `software.amazon.awssdk.metrics.MetricPublisher`. + +The class will be instantiated via a static `create(Map)` factory method if available, otherwise via a no-arg constructor. + +For example, to use a custom MetricPublisher with Spark 3.5, you can start the Spark SQL shell with: +``` +spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket2/my/key/prefix \ + --conf spark.sql.catalog.my_catalog.type=glue \ + --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ + --conf spark.sql.catalog.my_catalog.s3.metrics-publisher-impl=org.example.MyMetricPublisher +``` + ## AWS Client Customization Many organizations have customized their way of configuring AWS clients with their own credential provider, access proxy, retry strategy, etc.