diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploader.java
index c1e353b421933..a3e339f4260dd 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploader.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploader.java
@@ -225,7 +225,10 @@ private void drainAndSave() {
return;
}
uploadBatchSizes.update(tasks.size());
- retryingExecutor.execute(retryPolicy, () -> delegate.upload(tasks));
+ retryingExecutor.execute(
+ retryPolicy,
+ () -> delegate.upload(tasks),
+ t -> tasks.forEach(task -> task.fail(t)));
} catch (Throwable t) {
tasks.forEach(task -> task.fail(t));
if (findThrowable(t, IOException.class).isPresent()) {
@@ -271,14 +274,14 @@ private UploadTask wrapWithSizeUpdate(UploadTask uploadTask, long size) {
try {
releaseCapacity(size);
} finally {
- uploadTask.successCallback.accept(result);
+ uploadTask.complete(result);
}
},
(result, error) -> {
try {
releaseCapacity(size);
} finally {
- uploadTask.failureCallback.accept(result, error);
+ uploadTask.fail(error);
}
});
}
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
index f54d02b42e286..68f4edb2f7ee9 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
@@ -30,8 +30,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
/**
* A {@link RetriableAction} executor that schedules a next attempt upon timeout based on {@link
@@ -71,25 +74,43 @@ class RetryingExecutor implements AutoCloseable {
*
NOTE: the action must be idempotent because multiple instances of it can be executed
* concurrently (if the policy allows retries).
*/
- void execute(RetryPolicy retryPolicy, RetriableAction action) {
+ void execute(
+ RetryPolicy retryPolicy, RetriableAction action, Consumer failureCallback) {
LOG.debug("execute with retryPolicy: {}", retryPolicy);
RetriableTask task =
- new RetriableTask(
- action, retryPolicy, blockingExecutor, attemptsPerTaskHistogram, timer);
+ RetriableTask.initialize(
+ action,
+ retryPolicy,
+ blockingExecutor,
+ attemptsPerTaskHistogram,
+ timer,
+ failureCallback);
blockingExecutor.submit(task);
}
@Override
public void close() throws Exception {
LOG.debug("close");
- timer.shutdownNow();
+ Exception closeException = null;
+ try {
+ timer.shutdownNow();
+ } catch (Exception e) {
+ closeException = e;
+ }
+ try {
+ blockingExecutor.shutdownNow();
+ } catch (Exception e) {
+ closeException = firstOrSuppressed(e, closeException);
+ }
if (!timer.awaitTermination(1, TimeUnit.SECONDS)) {
LOG.warn("Unable to cleanly shutdown scheduler in 1s");
}
- blockingExecutor.shutdownNow();
if (!blockingExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
LOG.warn("Unable to cleanly shutdown blockingExecutor in 1s");
}
+ if (closeException != null) {
+ throw closeException;
+ }
}
/**
@@ -102,6 +123,7 @@ interface RetriableAction extends RunnableWithException {}
private static final class RetriableTask implements Runnable {
private final RetriableAction runnable;
+ private final Consumer failureCallback;
private final ScheduledExecutorService blockingExecutor;
private final ScheduledExecutorService timer;
private final int current;
@@ -118,25 +140,11 @@ private static final class RetriableTask implements Runnable {
* to prevent double finalization ({@link #handleError}) by the executing thread and
* timeouting thread.
*/
- private final AtomicBoolean attemptCompleted = new AtomicBoolean(false);
+ private final AtomicBoolean attemptCompleted;
- private final Histogram attemptsPerTaskHistogram;
+ private final AtomicInteger activeAttempts;
- RetriableTask(
- RetriableAction runnable,
- RetryPolicy retryPolicy,
- ScheduledExecutorService blockingExecutor,
- Histogram attemptsPerTaskHistogram,
- ScheduledExecutorService timer) {
- this(
- 1,
- new AtomicBoolean(false),
- runnable,
- retryPolicy,
- blockingExecutor,
- attemptsPerTaskHistogram,
- timer);
- }
+ private final Histogram attemptsPerTaskHistogram;
private RetriableTask(
int current,
@@ -144,25 +152,33 @@ private RetriableTask(
RetriableAction runnable,
RetryPolicy retryPolicy,
ScheduledExecutorService blockingExecutor,
- Histogram attemptsPerTaskHistogram,
- ScheduledExecutorService timer) {
+ ScheduledExecutorService timer,
+ Consumer failureCallback,
+ AtomicInteger activeAttempts,
+ Histogram attemptsPerTaskHistogram) {
this.current = current;
this.runnable = runnable;
+ this.failureCallback = failureCallback;
this.retryPolicy = retryPolicy;
this.blockingExecutor = blockingExecutor;
this.actionCompleted = actionCompleted;
this.attemptsPerTaskHistogram = attemptsPerTaskHistogram;
this.timer = timer;
+ this.activeAttempts = activeAttempts;
+ this.attemptCompleted = new AtomicBoolean(false);
}
@Override
public void run() {
+ LOG.debug("starting attempt {}", current);
if (!actionCompleted.get()) {
Optional> timeoutFuture = scheduleTimeout();
try {
runnable.run();
- actionCompleted.set(true);
- attemptsPerTaskHistogram.update(current);
+ if (actionCompleted.compareAndSet(false, true)) {
+ LOG.debug("succeeded with {} attempts", current);
+ attemptsPerTaskHistogram.update(current);
+ }
attemptCompleted.set(true);
} catch (Exception e) {
handleError(e);
@@ -173,19 +189,49 @@ public void run() {
}
private void handleError(Exception e) {
- LOG.info("execution attempt {} failed: {}", current, e.getMessage());
- // prevent double completion in case of a timeout and another failure
- boolean attemptTransition = attemptCompleted.compareAndSet(false, true);
- if (attemptTransition && !actionCompleted.get()) {
- long nextAttemptDelay = retryPolicy.retryAfter(current, e);
- if (nextAttemptDelay == 0L) {
- blockingExecutor.submit(next());
- } else if (nextAttemptDelay > 0L) {
- blockingExecutor.schedule(next(), nextAttemptDelay, MILLISECONDS);
- } else {
- actionCompleted.set(true);
- }
+ if (!attemptCompleted.compareAndSet(false, true) || actionCompleted.get()) {
+ // either this attempt was already completed (e.g. timed out);
+ // or another attempt completed the task
+ return;
}
+ LOG.debug("execution attempt {} failed: {}", current, e.getMessage());
+ long nextAttemptDelay = retryPolicy.retryAfter(current, e);
+ if (nextAttemptDelay >= 0L) {
+ activeAttempts.incrementAndGet();
+ scheduleNext(nextAttemptDelay, next());
+ }
+ if (activeAttempts.decrementAndGet() == 0
+ && actionCompleted.compareAndSet(false, true)) {
+ LOG.info("failed with {} attempts: {}", current, e.getMessage());
+ failureCallback.accept(e);
+ }
+ }
+
+ private void scheduleNext(long nextAttemptDelay, RetriableTask next) {
+ if (nextAttemptDelay == 0L) {
+ blockingExecutor.submit(next);
+ } else if (nextAttemptDelay > 0L) {
+ blockingExecutor.schedule(next, nextAttemptDelay, MILLISECONDS);
+ }
+ }
+
+ private static RetriableTask initialize(
+ RetriableAction runnable,
+ RetryPolicy retryPolicy,
+ ScheduledExecutorService blockingExecutor,
+ Histogram attemptsPerTaskHistogram,
+ ScheduledExecutorService timer,
+ Consumer failureCallback) {
+ return new RetriableTask(
+ 1,
+ new AtomicBoolean(false),
+ runnable,
+ retryPolicy,
+ blockingExecutor,
+ timer,
+ failureCallback,
+ new AtomicInteger(1),
+ attemptsPerTaskHistogram);
}
private RetriableTask next() {
@@ -195,8 +241,10 @@ private RetriableTask next() {
runnable,
retryPolicy,
blockingExecutor,
- attemptsPerTaskHistogram,
- timer);
+ timer,
+ failureCallback,
+ activeAttempts,
+ attemptsPerTaskHistogram);
}
private Optional> scheduleTimeout() {
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploaderTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploaderTest.java
index e3d19ca6086c7..9f627b7123c09 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploaderTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploaderTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.changelog.fs;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.changelog.fs.StateChangeUploader.UploadTask;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
@@ -28,9 +29,11 @@
import org.junit.Test;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.UUID;
@@ -38,6 +41,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
@@ -169,6 +173,43 @@ public void upload(UploadTask uploadTask) throws IOException {
}
}
+ @Test
+ public void testUploadTimeout() throws Exception {
+ AtomicReference> failed = new AtomicReference<>();
+ UploadTask upload =
+ new UploadTask(getChanges(4), unused -> {}, (sqn, error) -> failed.set(sqn));
+ ManuallyTriggeredScheduledExecutorService scheduler =
+ new ManuallyTriggeredScheduledExecutorService();
+ try (BatchingStateChangeUploader store =
+ new BatchingStateChangeUploader(
+ 0,
+ 0,
+ Integer.MAX_VALUE,
+ RetryPolicy.fixed(1, 1, 1),
+ new BlockingUploader(),
+ scheduler,
+ new RetryingExecutor(
+ 1,
+ createUnregisteredChangelogStorageMetricGroup()
+ .getAttemptsPerUpload()),
+ createUnregisteredChangelogStorageMetricGroup())) {
+ store.upload(upload);
+ Deadline deadline = Deadline.fromNow(Duration.ofMinutes(5));
+ while (!upload.finished.get() && deadline.hasTimeLeft()) {
+ scheduler.triggerScheduledTasks();
+ scheduler.triggerAll();
+ Thread.sleep(10);
+ }
+ }
+
+ assertTrue(upload.finished.get());
+ assertEquals(
+ upload.changeSets.stream()
+ .map(StateChangeSet::getSequenceNumber)
+ .collect(Collectors.toSet()),
+ new HashSet<>(failed.get()));
+ }
+
@Test(expected = RejectedExecutionException.class)
public void testErrorHandling() throws Exception {
TestingStateChangeUploader probe = new TestingStateChangeUploader();
@@ -344,4 +385,18 @@ private Tuple2> uploadAsync(int limit, TestScena
thread.start();
return Tuple2.of(thread, future);
}
+
+ private static final class BlockingUploader implements StateChangeUploader {
+ @Override
+ public void upload(UploadTask uploadTask) {
+ try {
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {}
+ }
}
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java
index 46166827090d4..f60437c7d4275 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java
@@ -176,7 +176,8 @@ private void testPolicy(
} finally {
firstAttemptCompletedLatch.countDown();
}
- });
+ },
+ t -> {});
firstAttemptCompletedLatch.await(); // before closing executor
}
assertEquals(expectedAttempts, attemptsMade.get());
diff --git a/flink-dstl/flink-dstl-dfs/src/test/resources/log4j2.properties b/flink-dstl/flink-dstl-dfs/src/test/resources/log4j2-test.properties
similarity index 100%
rename from flink-dstl/flink-dstl-dfs/src/test/resources/log4j2.properties
rename to flink-dstl/flink-dstl-dfs/src/test/resources/log4j2-test.properties