-
Notifications
You must be signed in to change notification settings - Fork 523
feat: add automatic S3 request retry with exponential backoff #1701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 11 commits
9524c1f
a79aa11
beead6e
53da859
60aeaed
d1c52e0
0d687ec
34abd64
1997b48
b2cad44
ba65312
6d39a12
756e294
2f2fb4f
7fe6e87
9a5c8e4
93e4f47
0ae488b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,13 +51,11 @@ | |
| import java.io.OutputStreamWriter; | ||
| import java.io.PrintWriter; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.security.SecureRandom; | ||
| import java.util.Arrays; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.Locale; | ||
| import java.util.Map; | ||
| import java.util.Random; | ||
| import java.util.Set; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.CompletionException; | ||
|
|
@@ -92,7 +90,6 @@ public abstract class BaseS3Client implements AutoCloseable { | |
| "ServerSideEncryptionConfigurationNotFoundError"; | ||
| // maximum allowed bucket policy size is 20KiB | ||
| protected static final int MAX_BUCKET_POLICY_SIZE = 20 * 1024; | ||
| protected static final Random RANDOM = new Random(new SecureRandom().nextLong()); | ||
| protected static final ObjectMapper OBJECT_MAPPER = | ||
| JsonMapper.builder() | ||
| .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) | ||
|
|
@@ -104,6 +101,7 @@ public abstract class BaseS3Client implements AutoCloseable { | |
| private static final String UPLOAD_ID = "uploadId"; | ||
| private static final Set<String> TRACE_QUERY_PARAMS = | ||
| ImmutableSet.of("retention", "legal-hold", "tagging", UPLOAD_ID, "acl", "attributes"); | ||
|
|
||
| private PrintWriter traceStream; | ||
| protected final Map<String, String> regionCache = new ConcurrentHashMap<>(); | ||
| protected String userAgent = Utils.getDefaultUserAgent(); | ||
|
|
@@ -268,7 +266,10 @@ private String[] handleRedirectResponse( | |
| return new String[] {code, message}; | ||
| } | ||
|
|
||
| /** Execute HTTP request asynchronously for given parameters. */ | ||
| /** | ||
| * Execute HTTP request asynchronously for given parameters. Retries on retryable HTTP status | ||
| * codes are handled by {@link Http.RetryInterceptor} installed on the default HTTP client. | ||
|
allanrogerr marked this conversation as resolved.
Outdated
|
||
| */ | ||
| protected CompletableFuture<Response> executeAsync(Http.S3Request s3request, String region) { | ||
| Credentials credentials = (provider == null) ? null : provider.fetch(); | ||
| Http.Request request = null; | ||
|
|
@@ -282,15 +283,9 @@ protected CompletableFuture<Response> executeAsync(Http.S3Request s3request, Str | |
| PrintWriter traceStream = this.traceStream; | ||
| if (traceStream != null) traceStream.print(request.httpTraces()); | ||
|
|
||
| OkHttpClient httpClient = this.httpClient; | ||
|
allanrogerr marked this conversation as resolved.
|
||
| // FIXME: enable retry for all request. | ||
| // if (!s3request.retryFailure()) { | ||
| // httpClient = httpClient.newBuilder().retryOnConnectionFailure(false).build(); | ||
| // } | ||
|
|
||
| okhttp3.Request httpRequest = request.httpRequest(); | ||
| CompletableFuture<Response> completableFuture = newCompleteableFuture(); | ||
| httpClient | ||
| this.httpClient | ||
| .newCall(httpRequest) | ||
| .enqueue( | ||
| new Callback() { | ||
|
|
@@ -469,9 +464,10 @@ private void onResponse(final Response response) throws IOException { | |
| response.header("x-amz-id-2")); | ||
| } | ||
|
|
||
| // invalidate region cache if needed | ||
| if (errorResponse.code().equals(NO_SUCH_BUCKET) | ||
| || errorResponse.code().equals(RETRY_HEAD)) { | ||
| // invalidate region cache if needed (bucket may be null for e.g. listBuckets) | ||
|
allanrogerr marked this conversation as resolved.
Outdated
|
||
| if (s3request.bucket() != null | ||
|
allanrogerr marked this conversation as resolved.
Outdated
|
||
| && (errorResponse.code().equals(NO_SUCH_BUCKET) | ||
| || errorResponse.code().equals(RETRY_HEAD))) { | ||
| regionCache.remove(s3request.bucket()); | ||
| } | ||
|
|
||
|
|
@@ -1223,6 +1219,15 @@ private Object[] createBody(PutObjectAPIBaseArgs args, MediaType contentType) | |
| boolean checksumHeader = headers.namePrefixAny("x-amz-checksum-"); | ||
| String md5Hash = headers.getFirst(Http.Headers.CONTENT_MD5); | ||
|
|
||
| long fileStartPos = 0; | ||
| if (args.file() != null) { | ||
| try { | ||
| fileStartPos = args.file().getFilePointer(); | ||
| } catch (IOException e) { | ||
| throw new MinioException(e); | ||
| } | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Each file.read(...) advances the file pointer. After hashing a length-byte file, the pointer is at fileStartPos + length. So whatever position the pointer is at when new Http.Body(...) runs becomes the offset that subsequent retries seek back to before each network attempt. Without the restore at line 1335, that captured offset would be fileStartPos + length instead of fileStartPos, and every PUT — first attempt and retries alike — would write from the wrong offset (typically EOF, producing an empty or truncated upload). Http.Body doesnt handle this. minio-go does this in api-put-object-streaming.go:682-705
This comment was marked as duplicate.
Sorry, something went wrong.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing this. in order to do so, Checksum.update's RandomAccessFile branch must be rewritten to use FileChannel.read(ByteBuffer, long position) so it no longer mutates the file's pointer. |
||
|
|
||
| if (sha256HexString == null && sha256Base64String == null) { | ||
| if (!baseUrl.isHttps()) { | ||
| Checksum.Hasher hasher = Checksum.Algorithm.SHA256.hasher(); | ||
|
|
@@ -1278,6 +1283,14 @@ private Object[] createBody(PutObjectAPIBaseArgs args, MediaType contentType) | |
| } | ||
| } | ||
|
|
||
| if (args.file() != null) { | ||
| try { | ||
| args.file().seek(fileStartPos); | ||
| } catch (IOException e) { | ||
| throw new MinioException(e); | ||
| } | ||
| } | ||
|
allanrogerr marked this conversation as resolved.
Outdated
|
||
|
|
||
| Http.Body body = null; | ||
| if (args.file() != null) { | ||
| body = new Http.Body(args.file(), args.length(), contentType, sha256HexString, md5Hash); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| /* | ||
| * MinIO Java SDK for Amazon S3 Compatible Cloud Storage, (C) 2026 MinIO, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package io.minio; | ||
|
|
||
| import io.minio.errors.ErrorResponseException; | ||
| import io.minio.errors.MinioException; | ||
| import java.io.IOException; | ||
| import okhttp3.mockwebserver.MockResponse; | ||
| import okhttp3.mockwebserver.MockWebServer; | ||
| import okio.Buffer; | ||
| import org.junit.Assert; | ||
| import org.junit.Test; | ||
|
|
||
| /** Integration tests for {@link Http.RetryInterceptor}. */ | ||
| public class RetryTest { | ||
| private static final String LIST_BUCKETS_OK = | ||
| "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" | ||
| + "<ListAllMyBucketsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">" | ||
| + "<Owner><ID>test</ID><DisplayName>test</DisplayName></Owner>" | ||
| + "<Buckets/></ListAllMyBucketsResult>"; | ||
|
|
||
| private MockResponse successResponse() { | ||
| return new MockResponse() | ||
| .setResponseCode(200) | ||
| .setHeader("Content-Type", "application/xml") | ||
| .setBody(new Buffer().writeUtf8(LIST_BUCKETS_OK)); | ||
| } | ||
|
|
||
| private MockResponse retryableServerError() { | ||
| return new MockResponse() | ||
| .setResponseCode(503) | ||
| .setHeader("Content-Type", "text/html") | ||
| .setBody(new Buffer().writeUtf8("<html>Service Unavailable</html>")); | ||
| } | ||
|
|
||
| @Test | ||
| public void testRetryOnRetryableStatusThenSuccess() throws IOException, MinioException { | ||
| try (MockWebServer server = new MockWebServer()) { | ||
| server.enqueue(retryableServerError()); | ||
| server.enqueue(successResponse()); | ||
| server.start(); | ||
|
|
||
| MinioClient client = MinioClient.builder().endpoint(server.url("").toString()).build(); | ||
| client.listBuckets(); | ||
|
|
||
| Assert.assertEquals(2, server.getRequestCount()); | ||
| } | ||
| } | ||
|
allanrogerr marked this conversation as resolved.
|
||
|
|
||
| @Test | ||
| public void testNoRetryOn4xx() throws IOException, MinioException { | ||
| String notFoundXml = | ||
| "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" | ||
| + "<Error><Code>NoSuchBucket</Code><Message>not found</Message>" | ||
| + "<Resource>/</Resource><RequestId>abc</RequestId></Error>"; | ||
|
|
||
| try (MockWebServer server = new MockWebServer()) { | ||
| server.enqueue( | ||
| new MockResponse() | ||
| .setResponseCode(404) | ||
| .setHeader("Content-Type", "application/xml") | ||
| .setBody(new Buffer().writeUtf8(notFoundXml))); | ||
| server.start(); | ||
|
|
||
| MinioClient client = MinioClient.builder().endpoint(server.url("").toString()).build(); | ||
| try { | ||
| client.listBuckets(); | ||
| Assert.fail("expected ErrorResponseException"); | ||
| } catch (ErrorResponseException e) { | ||
| Assert.assertEquals("NoSuchBucket", e.errorResponse().code()); | ||
| } | ||
|
allanrogerr marked this conversation as resolved.
|
||
| Assert.assertEquals(1, server.getRequestCount()); | ||
| } | ||
|
allanrogerr marked this conversation as resolved.
|
||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.