-
Notifications
You must be signed in to change notification settings - Fork 523
feat: add automatic S3 request retry with exponential backoff #1701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 2 commits
9524c1f
a79aa11
beead6e
53da859
60aeaed
d1c52e0
0d687ec
34abd64
1997b48
b2cad44
ba65312
6d39a12
756e294
2f2fb4f
7fe6e87
9a5c8e4
93e4f47
0ae488b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,6 +62,9 @@ | |
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.CompletionException; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.function.Supplier; | ||
| import java.util.logging.Logger; | ||
| import javax.annotation.Nonnull; | ||
|
|
@@ -103,6 +106,15 @@ public abstract class BaseS3Client implements AutoCloseable { | |
| private static final String UPLOAD_ID = "uploadId"; | ||
| private static final Set<String> TRACE_QUERY_PARAMS = | ||
| ImmutableSet.of("retention", "legal-hold", "tagging", UPLOAD_ID, "acl", "attributes"); | ||
|
|
||
| private static final ScheduledExecutorService RETRY_SCHEDULER = | ||
| Executors.newSingleThreadScheduledExecutor( | ||
| r -> { | ||
| Thread t = new Thread(r, "minio-retry-scheduler"); | ||
| t.setDaemon(true); | ||
| return t; | ||
| }); | ||
|
|
||
| private PrintWriter traceStream; | ||
| protected final Map<String, String> regionCache = new ConcurrentHashMap<>(); | ||
| protected String userAgent = Utils.getDefaultUserAgent(); | ||
|
|
@@ -111,6 +123,7 @@ public abstract class BaseS3Client implements AutoCloseable { | |
| protected Provider provider; | ||
| protected OkHttpClient httpClient; | ||
| protected boolean closeHttpClient; | ||
| protected volatile int maxRetries = Retry.MAX_RETRY; | ||
|
|
||
| protected BaseS3Client( | ||
| Http.BaseUrl baseUrl, Provider provider, OkHttpClient httpClient, boolean closeHttpClient) { | ||
|
|
@@ -125,6 +138,7 @@ protected BaseS3Client(BaseS3Client client) { | |
| this.provider = client.provider; | ||
| this.httpClient = client.httpClient; | ||
| this.closeHttpClient = client.closeHttpClient; | ||
| this.maxRetries = client.maxRetries; | ||
| } | ||
|
|
||
| /** Closes underneath HTTP client. */ | ||
|
|
@@ -136,6 +150,18 @@ public void close() { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Sets the maximum number of retry attempts for failed S3 requests. Requests with non-seekable | ||
| * bodies are never retried regardless of this value. The default is {@code Retry.MAX_RETRY} (10). | ||
| * Pass 1 to disable automatic retries. | ||
| * | ||
| * @param maxRetries maximum attempts (must be >= 1). | ||
| */ | ||
| public void setMaxRetries(int maxRetries) { | ||
| if (maxRetries < 1) throw new IllegalArgumentException("maxRetries must be >= 1"); | ||
| this.maxRetries = maxRetries; | ||
| } | ||
|
|
||
| /** | ||
| * Sets HTTP connect, write and read timeouts. A value of 0 means no timeout, otherwise values | ||
| * must be between 1 and Integer.MAX_VALUE when converted to milliseconds. | ||
|
|
@@ -270,8 +296,49 @@ private String[] handleRedirectResponse( | |
| return new String[] {code, message}; | ||
| } | ||
|
|
||
| /** Execute HTTP request asynchronously for given parameters. */ | ||
| /** Execute HTTP request asynchronously for given parameters, with automatic retry. */ | ||
| protected CompletableFuture<Response> executeAsync(Http.S3Request s3request, String region) { | ||
| // Non-seekable bodies (raw okhttp3 RequestBody) cannot be replayed — single attempt only. | ||
| Http.Body body = s3request.body(); | ||
| int maxAttempts = (body != null && body.isHttpRequestBody()) ? 1 : this.maxRetries; | ||
|
allanrogerr marked this conversation as resolved.
Outdated
|
||
| return executeWithRetry(s3request, region, maxAttempts, 0); | ||
| } | ||
|
|
||
| private CompletableFuture<Response> executeWithRetry( | ||
| Http.S3Request s3request, String region, int maxAttempts, int attempt) { | ||
| return doExecuteAsync(s3request, region) | ||
| .handle( | ||
| (response, throwable) -> { | ||
| if (throwable == null) { | ||
| return CompletableFuture.completedFuture(response); | ||
| } | ||
| Throwable cause = | ||
| (throwable instanceof CompletionException) ? throwable.getCause() : throwable; | ||
| if (cause == null) cause = throwable; | ||
| if (attempt + 1 >= maxAttempts || !Retry.isRetryable(cause)) { | ||
| return Utils.<Response>failedFuture(cause); | ||
| } | ||
| long delayMs = Retry.computeBackoffMs(attempt + 1, RANDOM); | ||
| CompletableFuture<Response> retryFuture = new CompletableFuture<>(); | ||
| RETRY_SCHEDULER.schedule( | ||
|
allanrogerr marked this conversation as resolved.
Outdated
|
||
| () -> | ||
| CompletableFuture.runAsync( | ||
| () -> | ||
| executeWithRetry(s3request, region, maxAttempts, attempt + 1) | ||
| .whenComplete( | ||
| (r, t) -> { | ||
| if (t != null) retryFuture.completeExceptionally(t); | ||
| else retryFuture.complete(r); | ||
| })), | ||
|
allanrogerr marked this conversation as resolved.
Outdated
|
||
| delayMs, | ||
| TimeUnit.MILLISECONDS); | ||
| return retryFuture; | ||
| }) | ||
| .thenCompose(cf -> cf); | ||
| } | ||
|
|
||
| /** Execute single HTTP request attempt asynchronously for given parameters. */ | ||
| private CompletableFuture<Response> doExecuteAsync(Http.S3Request s3request, String region) { | ||
| Credentials credentials = (provider == null) ? null : provider.fetch(); | ||
| Http.Request request = null; | ||
| try { | ||
|
|
@@ -285,11 +352,6 @@ protected CompletableFuture<Response> executeAsync(Http.S3Request s3request, Str | |
| if (traceStream != null) traceStream.print(request.httpTraces()); | ||
|
|
||
| OkHttpClient httpClient = this.httpClient; | ||
|
allanrogerr marked this conversation as resolved.
Outdated
allanrogerr marked this conversation as resolved.
|
||
| // FIXME: enable retry for all request. | ||
| // if (!s3request.retryFailure()) { | ||
| // httpClient = httpClient.newBuilder().retryOnConnectionFailure(false).build(); | ||
| // } | ||
|
|
||
| okhttp3.Request httpRequest = request.httpRequest(); | ||
| CompletableFuture<Response> completableFuture = newCompleteableFuture(); | ||
| httpClient | ||
|
|
@@ -1225,6 +1287,15 @@ private Object[] createBody(PutObjectAPIBaseArgs args, MediaType contentType) | |
| boolean checksumHeader = headers.namePrefixAny("x-amz-checksum-"); | ||
| String md5Hash = headers.getFirst(Http.Headers.CONTENT_MD5); | ||
|
|
||
| long fileStartPos = 0; | ||
| if (args.file() != null) { | ||
| try { | ||
| fileStartPos = args.file().getFilePointer(); | ||
| } catch (IOException e) { | ||
| throw new MinioException(e); | ||
| } | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Each file.read(...) advances the file pointer. After hashing a length-byte file, the pointer is at fileStartPos + length. So whatever position the pointer is at when new Http.Body(...) runs becomes the offset that subsequent retries seek back to before each network attempt. Without the restore at line 1335, that captured offset would be fileStartPos + length instead of fileStartPos, and every PUT — first attempt and retries alike — would write from the wrong offset (typically EOF, producing an empty or truncated upload). Http.Body doesnt handle this. minio-go does this in api-put-object-streaming.go:682-705
This comment was marked as duplicate.
Sorry, something went wrong.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing this. in order to do so, Checksum.update's RandomAccessFile branch must be rewritten to use FileChannel.read(ByteBuffer, long position) so it no longer mutates the file's pointer. |
||
|
|
||
| if (sha256HexString == null && sha256Base64String == null) { | ||
| if (!baseUrl.isHttps()) { | ||
| Checksum.Hasher hasher = Checksum.Algorithm.SHA256.hasher(); | ||
|
|
@@ -1280,6 +1351,14 @@ private Object[] createBody(PutObjectAPIBaseArgs args, MediaType contentType) | |
| } | ||
| } | ||
|
|
||
| if (args.file() != null) { | ||
| try { | ||
| args.file().seek(fileStartPos); | ||
| } catch (IOException e) { | ||
| throw new MinioException(e); | ||
| } | ||
| } | ||
|
allanrogerr marked this conversation as resolved.
Outdated
|
||
|
|
||
| Http.Body body = null; | ||
| if (args.file() != null) { | ||
| body = new Http.Body(args.file(), args.length(), contentType, sha256HexString, md5Hash); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.