From 3e0f33f3ed92c463c82d0782803eaa671aba4721 Mon Sep 17 00:00:00 2001 From: Anupam Yadav Date: Wed, 6 May 2026 21:57:47 +0000 Subject: [PATCH 1/2] AWS: Add MetricPublisher configuration support for S3FileIO (#15182) Adds support for configuring a custom AWS SDK MetricPublisher for S3FileIO via the s3.metrics-publisher-impl catalog property. This enables observability into S3 SDK-level metrics (latency, retries, errors) for debugging and monitoring. - S3FileIOProperties: adds METRICS_PUBLISHER_IMPL property and the applyMetricsPublisherConfiguration() method. Loads the publisher via a static create(Map) factory when present, or a no-arg constructor otherwise. - DefaultS3FileIOAwsClientFactory: applies the metrics publisher configuration to both the sync s3() and async s3Async() client builders. - Tests cover factory-method, no-arg constructor, disabled, and invalid-class paths. This PR builds on the design and initial implementation from #15122 by @rcjverhoef (PR stale-closed). Review feedback from @geruh on that PR (apply to S3 async client, clean imports, invalid-class test) is addressed here. Closes #15182 Co-authored-by: rocco.verhoef --- .../s3/DefaultS3FileIOAwsClientFactory.java | 2 + .../iceberg/aws/s3/S3FileIOProperties.java | 51 ++++++++++++ .../aws/s3/TestS3FileIOProperties.java | 78 +++++++++++++++++++ docs/docs/aws.md | 15 ++++ 4 files changed, 146 insertions(+) diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java index 2dec40e7f897..3236aee6c44e 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java @@ -58,6 +58,7 @@ public S3Client s3() { .applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations) .applyMutation(s3FileIOProperties::applyUserAgentConfigurations) .applyMutation(s3FileIOProperties::applyRetryConfigurations) + .applyMutation(s3FileIOProperties::applyMetricsPublisherConfiguration) .build(); } @@ -76,6 +77,7 @@ public S3AsyncClient s3Async() { .applyMutation(awsClientProperties::applyClientCredentialConfigurations) .applyMutation(awsClientProperties::applyLegacyMd5Plugin) .applyMutation(s3FileIOProperties::applyEndpointConfigurations) + .applyMutation(s3FileIOProperties::applyMetricsPublisherConfiguration) .build(); } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java index 922010d61d27..537ebe569ac7 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java @@ -49,6 +49,7 @@ import software.amazon.awssdk.core.retry.conditions.RetryCondition; import software.amazon.awssdk.core.retry.conditions.RetryOnExceptionsCondition; import software.amazon.awssdk.core.retry.conditions.TokenBucketRetryCondition; +import software.amazon.awssdk.metrics.MetricPublisher; import software.amazon.awssdk.services.s3.S3BaseClientBuilder; import software.amazon.awssdk.services.s3.S3ClientBuilder; import software.amazon.awssdk.services.s3.S3Configuration; @@ -506,6 +507,14 @@ public class S3FileIOProperties implements Serializable { public static final boolean S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY_DEFAULT = true; + /** + * Configure a custom {@link MetricPublisher} implementation for the S3 client. + * + *

The class must implement {@link MetricPublisher}. It will be instantiated via a static + * {@code create(Map)} factory method if available, otherwise via a no-arg constructor. + */ + public static final String METRICS_PUBLISHER_IMPL = "s3.metrics-publisher-impl"; + private String sseType; private String sseKey; private String sseMd5; @@ -547,6 +556,7 @@ public class S3FileIOProperties implements Serializable { private long s3RetryMaxWaitMs; private boolean s3DirectoryBucketListPrefixAsDirectory; + private final String metricsPublisherImpl; private final Map allProperties; public S3FileIOProperties() { @@ -590,6 +600,7 @@ public S3FileIOProperties() { this.s3AnalyticsacceleratorProperties = Maps.newHashMap(); this.isS3CRTEnabled = S3_CRT_ENABLED_DEFAULT; this.s3CrtMaxConcurrency = S3_CRT_MAX_CONCURRENCY_DEFAULT; + this.metricsPublisherImpl = null; this.allProperties = Maps.newHashMap(); ValidationException.check( @@ -715,6 +726,7 @@ public S3FileIOProperties(Map properties) { this.s3CrtMaxConcurrency = PropertyUtil.propertyAsInt( properties, S3_CRT_MAX_CONCURRENCY, S3_CRT_MAX_CONCURRENCY_DEFAULT); + this.metricsPublisherImpl = properties.get(METRICS_PUBLISHER_IMPL); ValidationException.check( keyIdAccessKeyBothConfigured(), @@ -1197,4 +1209,43 @@ private T loadSdkPluginConfigurations(String impl, Map prope public Map properties() { return allProperties; } + + public String metricsPublisherImpl() { + return metricsPublisherImpl; + } + + /** + * Configure a custom {@link MetricPublisher} for an S3 client. + * + *

Sample usage: + * + *

+   *     S3Client.builder().applyMutation(s3FileIOProperties::applyMetricsPublisherConfiguration)
+   * 
+ */ + public > void applyMetricsPublisherConfiguration(T builder) { + if (metricsPublisherImpl != null) { + builder.overrideConfiguration(c -> c.addMetricPublisher(loadMetricPublisher())); + } + } + + private MetricPublisher loadMetricPublisher() { + try { + return (MetricPublisher) + DynMethods.builder("create") + .hiddenImpl(metricsPublisherImpl, Map.class) + .buildStaticChecked() + .invoke(allProperties); + } catch (NoSuchMethodException e) { + try { + return Class.forName(metricsPublisherImpl) + .asSubclass(MetricPublisher.class) + .getDeclaredConstructor() + .newInstance(); + } catch (Exception ex) { + throw new IllegalArgumentException( + String.format("Cannot create MetricPublisher from class %s", metricsPublisherImpl), ex); + } + } + } } diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java index 953f73d45d4a..9b60ea4a5646 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java @@ -37,6 +37,8 @@ import org.mockito.Mockito; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.metrics.MetricCollection; +import software.amazon.awssdk.metrics.MetricPublisher; import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; @@ -587,4 +589,80 @@ public void testChunkedEncodingDisabled() { .as("chunked encoding should be disabled when explicitly set to false") .isFalse(); } + + @Test + public void testApplyMetricsPublisherConfigurationWithFactoryMethod() { + Map properties = Maps.newHashMap(); + properties.put( + S3FileIOProperties.METRICS_PUBLISHER_IMPL, FactoryMetricPublisher.class.getName()); + S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties); + + S3ClientBuilder builder = S3Client.builder(); + s3FileIOProperties.applyMetricsPublisherConfiguration(builder); + + assertThat(builder.overrideConfiguration()).isNotNull(); + assertThat(builder.overrideConfiguration().metricPublishers()).hasSize(1); + assertThat(builder.overrideConfiguration().metricPublishers().get(0)) + .isInstanceOf(FactoryMetricPublisher.class); + } + + @Test + public void testApplyMetricsPublisherConfigurationWithNoArgConstructor() { + Map properties = Maps.newHashMap(); + properties.put(S3FileIOProperties.METRICS_PUBLISHER_IMPL, NoArgMetricPublisher.class.getName()); + S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties); + + S3ClientBuilder builder = S3Client.builder(); + s3FileIOProperties.applyMetricsPublisherConfiguration(builder); + + assertThat(builder.overrideConfiguration()).isNotNull(); + assertThat(builder.overrideConfiguration().metricPublishers()).hasSize(1); + assertThat(builder.overrideConfiguration().metricPublishers().get(0)) + .isInstanceOf(NoArgMetricPublisher.class); + } + + @Test + public void testApplyMetricsPublisherConfigurationDisabled() { + Map properties = Maps.newHashMap(); + S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties); + + S3ClientBuilder builder = S3Client.builder(); + s3FileIOProperties.applyMetricsPublisherConfiguration(builder); + + assertThat(s3FileIOProperties.metricsPublisherImpl()).isNull(); + } + + @Test + public void testApplyMetricsPublisherConfigurationInvalidClass() { + Map properties = Maps.newHashMap(); + properties.put(S3FileIOProperties.METRICS_PUBLISHER_IMPL, "com.invalid.NonExistentClass"); + S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties); + + S3ClientBuilder builder = S3Client.builder(); + assertThatThrownBy(() -> s3FileIOProperties.applyMetricsPublisherConfiguration(builder)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot create MetricPublisher from class"); + } + + public static class FactoryMetricPublisher implements MetricPublisher { + public static FactoryMetricPublisher create(Map properties) { + return new FactoryMetricPublisher(); + } + + @Override + public void publish(MetricCollection metricCollection) {} + + @Override + public void close() {} + } + + public static class NoArgMetricPublisher implements MetricPublisher { + public NoArgMetricPublisher() {} + + @Override + public void publish(MetricCollection metricCollection) {} + + @Override + public void close() {} + } } diff --git a/docs/docs/aws.md b/docs/docs/aws.md index fba4921f73a5..e2a1fe814f4c 100644 --- a/docs/docs/aws.md +++ b/docs/docs/aws.md @@ -659,6 +659,21 @@ spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCata For more details on using S3 Dual-stack, please refer [Using dual-stack endpoints from the AWS CLI and the AWS SDKs](https://docs.aws.amazon.com/AmazonS3/latest/userguide/dual-stack-endpoints.html#dual-stack-endpoints-cli) +### S3 Custom MetricPublisher + +A custom `MetricPublisher` implementation can be plugged into the S3 client by setting the `s3.metrics-publisher-impl` catalog property to the fully qualified class name of a class that implements `software.amazon.awssdk.metrics.MetricPublisher`. + +The class will be instantiated via a static `create(Map)` factory method if available, otherwise via a no-arg constructor. + +For example, to use a custom MetricPublisher with Spark 3.5, you can start the Spark SQL shell with: +``` +spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket2/my/key/prefix \ + --conf spark.sql.catalog.my_catalog.type=glue \ + --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ + --conf spark.sql.catalog.my_catalog.s3.metrics-publisher-impl=org.example.MyMetricPublisher +``` + ## AWS Client Customization Many organizations have customized their way of configuring AWS clients with their own credential provider, access proxy, retry strategy, etc. From d5a9856bb655408e8c53f8c76b47cb054179d66f Mon Sep 17 00:00:00 2001 From: Anupam Yadav Date: Wed, 6 May 2026 22:17:29 +0000 Subject: [PATCH 2/2] AWS: Improve MetricPublisher factory error reporting Split the create(Map) factory lookup from its invocation so the error message correctly identifies whether the factory method was missing or present-but-threw. Previously, a create(Map) method that existed and threw would be misreported as 'Cannot create MetricPublisher from class X' with the factory exception as the cause, giving users the false impression they needed to change the method signature when the actual issue was a configuration error inside the factory. The new error message states whether the failure occurred via create(Map) or via the no-arg constructor, so users can diagnose the right path without misreading the stack trace. Add regression tests asserting the distinction (via create(Map) vs via no-arg constructor) so the behaviour cannot silently regress. --- .../iceberg/aws/s3/S3FileIOProperties.java | 37 ++++++++---- .../aws/s3/TestS3FileIOProperties.java | 57 +++++++++++++++++++ 2 files changed, 83 insertions(+), 11 deletions(-) diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java index 537ebe569ac7..e39517e893cf 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java @@ -1230,22 +1230,37 @@ public String metricsPublisherImpl() { } private MetricPublisher loadMetricPublisher() { + // Phase 1: look up the factory. A NoSuchMethodException here means the class does not + // declare `create(Map)` — fall back to the no-arg constructor path. Any OTHER exception + // from a factory that DOES exist (e.g. the factory itself throws) should surface via the + // wrapping IllegalArgumentException in phase 2 so users can distinguish "wrong signature" + // from "misconfigured factory". + DynMethods.StaticMethod factory = null; try { - return (MetricPublisher) + factory = DynMethods.builder("create") .hiddenImpl(metricsPublisherImpl, Map.class) - .buildStaticChecked() - .invoke(allProperties); + .buildStaticChecked(); } catch (NoSuchMethodException e) { - try { - return Class.forName(metricsPublisherImpl) - .asSubclass(MetricPublisher.class) - .getDeclaredConstructor() - .newInstance(); - } catch (Exception ex) { - throw new IllegalArgumentException( - String.format("Cannot create MetricPublisher from class %s", metricsPublisherImpl), ex); + // Expected when the implementation doesn't provide a create(Map) factory — fall through. + } + + // Phase 2: invoke whichever path we found. Exceptions here are real failures and are + // surfaced with the precise path that failed so the user can diagnose. + try { + if (factory != null) { + return (MetricPublisher) factory.invoke(allProperties); } + return Class.forName(metricsPublisherImpl) + .asSubclass(MetricPublisher.class) + .getDeclaredConstructor() + .newInstance(); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format( + "Cannot create MetricPublisher from class %s via %s", + metricsPublisherImpl, factory != null ? "create(Map)" : "no-arg constructor"), + e); } } } diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java index 9b60ea4a5646..9fb38035c532 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java @@ -644,6 +644,39 @@ public void testApplyMetricsPublisherConfigurationInvalidClass() { .hasMessageContaining("Cannot create MetricPublisher from class"); } + @Test + public void testApplyMetricsPublisherConfigurationFactoryThrows() { + // The implementation provides a static create(Map) factory that throws. The error message + // must identify the factory path so the user can diagnose a bad implementation rather than + // being misled into thinking they need to add a no-arg constructor. + Map properties = Maps.newHashMap(); + properties.put( + S3FileIOProperties.METRICS_PUBLISHER_IMPL, ThrowingFactoryMetricPublisher.class.getName()); + S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties); + + S3ClientBuilder builder = S3Client.builder(); + assertThatThrownBy(() -> s3FileIOProperties.applyMetricsPublisherConfiguration(builder)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("via create(Map)") + .hasMessageContaining(ThrowingFactoryMetricPublisher.class.getName()); + } + + @Test + public void testApplyMetricsPublisherConfigurationNoArgConstructorThrows() { + // The implementation does not declare create(Map) but its no-arg constructor throws. + // The error message must identify the no-arg constructor path. + Map properties = Maps.newHashMap(); + properties.put( + S3FileIOProperties.METRICS_PUBLISHER_IMPL, ThrowingNoArgMetricPublisher.class.getName()); + S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties); + + S3ClientBuilder builder = S3Client.builder(); + assertThatThrownBy(() -> s3FileIOProperties.applyMetricsPublisherConfiguration(builder)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("via no-arg constructor") + .hasMessageContaining(ThrowingNoArgMetricPublisher.class.getName()); + } + public static class FactoryMetricPublisher implements MetricPublisher { public static FactoryMetricPublisher create(Map properties) { return new FactoryMetricPublisher(); @@ -665,4 +698,28 @@ public void publish(MetricCollection metricCollection) {} @Override public void close() {} } + + public static class ThrowingFactoryMetricPublisher implements MetricPublisher { + public static ThrowingFactoryMetricPublisher create(Map properties) { + throw new IllegalStateException("factory boom"); + } + + @Override + public void publish(MetricCollection metricCollection) {} + + @Override + public void close() {} + } + + public static class ThrowingNoArgMetricPublisher implements MetricPublisher { + public ThrowingNoArgMetricPublisher() { + throw new IllegalStateException("ctor boom"); + } + + @Override + public void publish(MetricCollection metricCollection) {} + + @Override + public void close() {} + } }