-
Notifications
You must be signed in to change notification settings - Fork 523
feat: add automatic S3 request retry with exponential backoff #1701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 15 commits
9524c1f
a79aa11
beead6e
53da859
60aeaed
d1c52e0
0d687ec
34abd64
1997b48
b2cad44
ba65312
6d39a12
756e294
2f2fb4f
7fe6e87
9a5c8e4
93e4f47
0ae488b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| /* | ||
| * MinIO Java SDK for Amazon S3 Compatible Cloud Storage, | ||
| * (C) 2025 MinIO, Inc. | ||
| * (C) 2025-2026 MinIO, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
|
|
@@ -51,13 +51,11 @@ | |
| import java.io.OutputStreamWriter; | ||
| import java.io.PrintWriter; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.security.SecureRandom; | ||
| import java.util.Arrays; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.Locale; | ||
| import java.util.Map; | ||
| import java.util.Random; | ||
| import java.util.Set; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.CompletionException; | ||
|
|
@@ -92,7 +90,6 @@ public abstract class BaseS3Client implements AutoCloseable { | |
| "ServerSideEncryptionConfigurationNotFoundError"; | ||
| // maximum allowed bucket policy size is 20KiB | ||
| protected static final int MAX_BUCKET_POLICY_SIZE = 20 * 1024; | ||
| protected static final Random RANDOM = new Random(new SecureRandom().nextLong()); | ||
| protected static final ObjectMapper OBJECT_MAPPER = | ||
| JsonMapper.builder() | ||
| .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) | ||
|
|
@@ -104,6 +101,7 @@ public abstract class BaseS3Client implements AutoCloseable { | |
| private static final String UPLOAD_ID = "uploadId"; | ||
| private static final Set<String> TRACE_QUERY_PARAMS = | ||
| ImmutableSet.of("retention", "legal-hold", "tagging", UPLOAD_ID, "acl", "attributes"); | ||
|
|
||
| private PrintWriter traceStream; | ||
| protected final Map<String, String> regionCache = new ConcurrentHashMap<>(); | ||
| protected String userAgent = Utils.getDefaultUserAgent(); | ||
|
|
@@ -113,19 +111,59 @@ public abstract class BaseS3Client implements AutoCloseable { | |
| protected OkHttpClient httpClient; | ||
| protected boolean closeHttpClient; | ||
|
|
||
| /** | ||
| * Maximum attempts per S3 request. Default {@link Retry#MAX_RETRY}. Read on every request via the | ||
| * retry interceptor's supplier so runtime tuning via {@link #setMaxRetries(int)} takes immediate | ||
| * effect. Package-private so the {@code Builder} classes in this package can seed it via the | ||
| * setter; subclasses must go through {@link #setMaxRetries(int)}. | ||
| */ | ||
| volatile int maxRetries = Retry.MAX_RETRY; | ||
|
|
||
| protected BaseS3Client( | ||
| Http.BaseUrl baseUrl, Provider provider, OkHttpClient httpClient, boolean closeHttpClient) { | ||
| this.baseUrl = baseUrl; | ||
| this.provider = provider; | ||
| this.httpClient = httpClient; | ||
| this.closeHttpClient = closeHttpClient; | ||
| this.httpClient = wrapWithRetry(httpClient); | ||
|
allanrogerr marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| protected BaseS3Client(BaseS3Client client) { | ||
| this.baseUrl = client.baseUrl; | ||
| this.provider = client.provider; | ||
| this.httpClient = client.httpClient; | ||
| this.closeHttpClient = client.closeHttpClient; | ||
| this.maxRetries = client.maxRetries; | ||
| this.httpClient = wrapWithRetry(client.httpClient); | ||
|
allanrogerr marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| /** | ||
| * Re-wires the retry interceptor on {@code client} so it reads {@link #maxRetries} from this | ||
| * instance. Strips any prior {@link Http.RetryInterceptor} from BOTH the application-interceptor | ||
| * and network-interceptor chains (e.g. one bound to a different instance via the copy | ||
| * constructor, or accidentally registered on the network chain) before installing this one as an | ||
| * application interceptor. | ||
| * | ||
| * <p>Also forces {@code retryOnConnectionFailure(false)} on the underlying client. The SDK's own | ||
| * {@link Http.RetryInterceptor} is the single source of retry policy; layering OkHttp's | ||
| * connection-level retry on top would double-count attempts and obscure the {@link #maxRetries} | ||
| * budget. This silently overrides any {@code retryOnConnectionFailure(true)} that a | ||
| * caller-supplied client was configured with — by design. | ||
| */ | ||
| private OkHttpClient wrapWithRetry(OkHttpClient client) { | ||
| OkHttpClient.Builder builder = client.newBuilder().retryOnConnectionFailure(false); | ||
| builder.interceptors().removeIf(i -> i instanceof Http.RetryInterceptor); | ||
| builder.networkInterceptors().removeIf(i -> i instanceof Http.RetryInterceptor); | ||
| return builder.addInterceptor(new Http.RetryInterceptor(() -> this.maxRetries)).build(); | ||
| } | ||
|
|
||
| /** | ||
| * Sets the maximum number of attempts for transient HTTP failures. Pass {@code 1} to disable | ||
| * automatic retries. Defaults to {@code 10}. | ||
| * | ||
| * @param maxRetries maximum attempts (must be {@code >= 1}). | ||
| */ | ||
| public void setMaxRetries(int maxRetries) { | ||
| if (maxRetries < 1) throw new IllegalArgumentException("maxRetries must be >= 1"); | ||
| this.maxRetries = maxRetries; | ||
| } | ||
|
|
||
| /** Closes underneath HTTP client. */ | ||
|
|
@@ -182,6 +220,11 @@ public void setAppInfo(String name, String version) { | |
| /** | ||
| * Enables HTTP call tracing and written to traceStream. | ||
| * | ||
| * <p><b>Retry caveat.</b> Tracing happens at the SDK callback level, so only the final response | ||
| * of a retry sequence is recorded. Per-attempt traces (which the {@link Http.RetryInterceptor} | ||
| * sees and discards) are not surfaced here. To inspect individual retry attempts, register an | ||
| * OkHttp {@code HttpLoggingInterceptor} on a custom client. | ||
| * | ||
| * @param traceStream {@link OutputStream} for writing HTTP call tracing. | ||
| * @see #traceOff | ||
| */ | ||
|
|
@@ -268,7 +311,13 @@ private String[] handleRedirectResponse( | |
| return new String[] {code, message}; | ||
| } | ||
|
|
||
| /** Execute HTTP request asynchronously for given parameters. */ | ||
| /** | ||
| * Execute HTTP request asynchronously for given parameters. Retries on retryable IOException, | ||
| * HTTP status, and S3 error code are handled by {@link Http.RetryInterceptor}, which {@link | ||
| * #wrapWithRetry} installs on the underlying {@link OkHttpClient} regardless of whether the | ||
| * client is the default or caller-supplied. The attempt budget is read from {@link #maxRetries} | ||
| * on every request. | ||
| */ | ||
| protected CompletableFuture<Response> executeAsync(Http.S3Request s3request, String region) { | ||
| Credentials credentials = (provider == null) ? null : provider.fetch(); | ||
| Http.Request request = null; | ||
|
|
@@ -282,15 +331,9 @@ protected CompletableFuture<Response> executeAsync(Http.S3Request s3request, Str | |
| PrintWriter traceStream = this.traceStream; | ||
| if (traceStream != null) traceStream.print(request.httpTraces()); | ||
|
|
||
| OkHttpClient httpClient = this.httpClient; | ||
|
allanrogerr marked this conversation as resolved.
|
||
| // FIXME: enable retry for all request. | ||
| // if (!s3request.retryFailure()) { | ||
| // httpClient = httpClient.newBuilder().retryOnConnectionFailure(false).build(); | ||
| // } | ||
|
|
||
| okhttp3.Request httpRequest = request.httpRequest(); | ||
| CompletableFuture<Response> completableFuture = newCompleteableFuture(); | ||
| httpClient | ||
| this.httpClient | ||
| .newCall(httpRequest) | ||
| .enqueue( | ||
| new Callback() { | ||
|
|
@@ -469,9 +512,10 @@ private void onResponse(final Response response) throws IOException { | |
| response.header("x-amz-id-2")); | ||
| } | ||
|
|
||
| // invalidate region cache if needed | ||
| if (errorResponse.code().equals(NO_SUCH_BUCKET) | ||
| || errorResponse.code().equals(RETRY_HEAD)) { | ||
| // invalidate region cache if needed (bucket may be null for e.g. listBuckets) | ||
|
allanrogerr marked this conversation as resolved.
Outdated
|
||
| if (s3request.bucket() != null | ||
|
allanrogerr marked this conversation as resolved.
Outdated
|
||
| && (errorResponse.code().equals(NO_SUCH_BUCKET) | ||
| || errorResponse.code().equals(RETRY_HEAD))) { | ||
| regionCache.remove(s3request.bucket()); | ||
| } | ||
|
|
||
|
|
@@ -1223,6 +1267,15 @@ private Object[] createBody(PutObjectAPIBaseArgs args, MediaType contentType) | |
| boolean checksumHeader = headers.namePrefixAny("x-amz-checksum-"); | ||
| String md5Hash = headers.getFirst(Http.Headers.CONTENT_MD5); | ||
|
|
||
| long fileStartPos = 0; | ||
| if (args.file() != null) { | ||
| try { | ||
| fileStartPos = args.file().getFilePointer(); | ||
| } catch (IOException e) { | ||
| throw new MinioException(e); | ||
| } | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Each file.read(...) advances the file pointer. After hashing a length-byte file, the pointer is at fileStartPos + length. So whatever position the pointer is at when new Http.Body(...) runs becomes the offset that subsequent retries seek back to before each network attempt. Without the restore at line 1335, that captured offset would be fileStartPos + length instead of fileStartPos, and every PUT — first attempt and retries alike — would write from the wrong offset (typically EOF, producing an empty or truncated upload). Http.Body doesnt handle this. minio-go does this in api-put-object-streaming.go:682-705
This comment was marked as duplicate.
Sorry, something went wrong.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing this. in order to do so, Checksum.update's RandomAccessFile branch must be rewritten to use FileChannel.read(ByteBuffer, long position) so it no longer mutates the file's pointer. |
||
|
|
||
| if (sha256HexString == null && sha256Base64String == null) { | ||
| if (!baseUrl.isHttps()) { | ||
| Checksum.Hasher hasher = Checksum.Algorithm.SHA256.hasher(); | ||
|
|
@@ -1278,6 +1331,14 @@ private Object[] createBody(PutObjectAPIBaseArgs args, MediaType contentType) | |
| } | ||
| } | ||
|
|
||
| if (args.file() != null) { | ||
| try { | ||
| args.file().seek(fileStartPos); | ||
| } catch (IOException e) { | ||
| throw new MinioException(e); | ||
| } | ||
| } | ||
|
allanrogerr marked this conversation as resolved.
Outdated
|
||
|
|
||
| Http.Body body = null; | ||
| if (args.file() != null) { | ||
| body = new Http.Body(args.file(), args.length(), contentType, sha256HexString, md5Hash); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.