Skip to content

Commit

Permalink
[FLINK-26396][state/changelog] Fail upload if last attempt times out
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan committed Mar 7, 2022
1 parent 276a40a commit 2292c19
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,10 @@ private void drainAndSave() {
return;
}
uploadBatchSizes.update(tasks.size());
retryingExecutor.execute(retryPolicy, () -> delegate.upload(tasks));
retryingExecutor.execute(
retryPolicy,
() -> delegate.upload(tasks),
t -> tasks.forEach(task -> task.fail(t)));
} catch (Throwable t) {
tasks.forEach(task -> task.fail(t));
if (findThrowable(t, IOException.class).isPresent()) {
Expand Down Expand Up @@ -271,14 +274,14 @@ private UploadTask wrapWithSizeUpdate(UploadTask uploadTask, long size) {
try {
releaseCapacity(size);
} finally {
uploadTask.successCallback.accept(result);
uploadTask.complete(result);
}
},
(result, error) -> {
try {
releaseCapacity(size);
} finally {
uploadTask.failureCallback.accept(result, error);
uploadTask.fail(error);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;

/**
* A {@link RetriableAction} executor that schedules a next attempt upon timeout based on {@link
Expand Down Expand Up @@ -71,25 +74,43 @@ class RetryingExecutor implements AutoCloseable {
* <p>NOTE: the action must be idempotent because multiple instances of it can be executed
* concurrently (if the policy allows retries).
*/
void execute(RetryPolicy retryPolicy, RetriableAction action) {
void execute(
RetryPolicy retryPolicy, RetriableAction action, Consumer<Throwable> failureCallback) {
LOG.debug("execute with retryPolicy: {}", retryPolicy);
RetriableTask task =
new RetriableTask(
action, retryPolicy, blockingExecutor, attemptsPerTaskHistogram, timer);
RetriableTask.initialize(
action,
retryPolicy,
blockingExecutor,
attemptsPerTaskHistogram,
timer,
failureCallback);
blockingExecutor.submit(task);
}

@Override
public void close() throws Exception {
LOG.debug("close");
timer.shutdownNow();
Exception closeException = null;
try {
timer.shutdownNow();
} catch (Exception e) {
closeException = e;
}
try {
blockingExecutor.shutdownNow();
} catch (Exception e) {
closeException = firstOrSuppressed(e, closeException);
}
if (!timer.awaitTermination(1, TimeUnit.SECONDS)) {
LOG.warn("Unable to cleanly shutdown scheduler in 1s");
}
blockingExecutor.shutdownNow();
if (!blockingExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
LOG.warn("Unable to cleanly shutdown blockingExecutor in 1s");
}
if (closeException != null) {
throw closeException;
}
}

/**
Expand All @@ -102,6 +123,7 @@ interface RetriableAction extends RunnableWithException {}

private static final class RetriableTask implements Runnable {
private final RetriableAction runnable;
private final Consumer<Throwable> failureCallback;
private final ScheduledExecutorService blockingExecutor;
private final ScheduledExecutorService timer;
private final int current;
Expand All @@ -118,51 +140,45 @@ private static final class RetriableTask implements Runnable {
* to prevent double finalization ({@link #handleError}) by the executing thread and
* timeouting thread.
*/
private final AtomicBoolean attemptCompleted = new AtomicBoolean(false);
private final AtomicBoolean attemptCompleted;

private final Histogram attemptsPerTaskHistogram;
private final AtomicInteger activeAttempts;

RetriableTask(
RetriableAction runnable,
RetryPolicy retryPolicy,
ScheduledExecutorService blockingExecutor,
Histogram attemptsPerTaskHistogram,
ScheduledExecutorService timer) {
this(
1,
new AtomicBoolean(false),
runnable,
retryPolicy,
blockingExecutor,
attemptsPerTaskHistogram,
timer);
}
private final Histogram attemptsPerTaskHistogram;

private RetriableTask(
int current,
AtomicBoolean actionCompleted,
RetriableAction runnable,
RetryPolicy retryPolicy,
ScheduledExecutorService blockingExecutor,
Histogram attemptsPerTaskHistogram,
ScheduledExecutorService timer) {
ScheduledExecutorService timer,
Consumer<Throwable> failureCallback,
AtomicInteger activeAttempts,
Histogram attemptsPerTaskHistogram) {
this.current = current;
this.runnable = runnable;
this.failureCallback = failureCallback;
this.retryPolicy = retryPolicy;
this.blockingExecutor = blockingExecutor;
this.actionCompleted = actionCompleted;
this.attemptsPerTaskHistogram = attemptsPerTaskHistogram;
this.timer = timer;
this.activeAttempts = activeAttempts;
this.attemptCompleted = new AtomicBoolean(false);
}

@Override
public void run() {
LOG.debug("starting attempt {}", current);
if (!actionCompleted.get()) {
Optional<ScheduledFuture<?>> timeoutFuture = scheduleTimeout();
try {
runnable.run();
actionCompleted.set(true);
attemptsPerTaskHistogram.update(current);
if (actionCompleted.compareAndSet(false, true)) {
LOG.debug("succeeded with {} attempts", current);
attemptsPerTaskHistogram.update(current);
}
attemptCompleted.set(true);
} catch (Exception e) {
handleError(e);
Expand All @@ -173,19 +189,49 @@ public void run() {
}

private void handleError(Exception e) {
LOG.info("execution attempt {} failed: {}", current, e.getMessage());
// prevent double completion in case of a timeout and another failure
boolean attemptTransition = attemptCompleted.compareAndSet(false, true);
if (attemptTransition && !actionCompleted.get()) {
long nextAttemptDelay = retryPolicy.retryAfter(current, e);
if (nextAttemptDelay == 0L) {
blockingExecutor.submit(next());
} else if (nextAttemptDelay > 0L) {
blockingExecutor.schedule(next(), nextAttemptDelay, MILLISECONDS);
} else {
actionCompleted.set(true);
}
if (!attemptCompleted.compareAndSet(false, true) || actionCompleted.get()) {
// either this attempt was already completed (e.g. timed out);
// or another attempt completed the task
return;
}
LOG.debug("execution attempt {} failed: {}", current, e.getMessage());
long nextAttemptDelay = retryPolicy.retryAfter(current, e);
if (nextAttemptDelay >= 0L) {
activeAttempts.incrementAndGet();
scheduleNext(nextAttemptDelay, next());
}
if (activeAttempts.decrementAndGet() == 0
&& actionCompleted.compareAndSet(false, true)) {
LOG.info("failed with {} attempts: {}", current, e.getMessage());
failureCallback.accept(e);
}
}

private void scheduleNext(long nextAttemptDelay, RetriableTask next) {
if (nextAttemptDelay == 0L) {
blockingExecutor.submit(next);
} else if (nextAttemptDelay > 0L) {
blockingExecutor.schedule(next, nextAttemptDelay, MILLISECONDS);
}
}

private static RetriableTask initialize(
RetriableAction runnable,
RetryPolicy retryPolicy,
ScheduledExecutorService blockingExecutor,
Histogram attemptsPerTaskHistogram,
ScheduledExecutorService timer,
Consumer<Throwable> failureCallback) {
return new RetriableTask(
1,
new AtomicBoolean(false),
runnable,
retryPolicy,
blockingExecutor,
timer,
failureCallback,
new AtomicInteger(1),
attemptsPerTaskHistogram);
}

private RetriableTask next() {
Expand All @@ -195,8 +241,10 @@ private RetriableTask next() {
runnable,
retryPolicy,
blockingExecutor,
attemptsPerTaskHistogram,
timer);
timer,
failureCallback,
activeAttempts,
attemptsPerTaskHistogram);
}

private Optional<ScheduledFuture<?>> scheduleTimeout() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.changelog.fs;

import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.changelog.fs.StateChangeUploader.UploadTask;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
Expand All @@ -28,16 +29,19 @@
import org.junit.Test;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -169,6 +173,43 @@ public void upload(UploadTask uploadTask) throws IOException {
}
}

@Test
public void testUploadTimeout() throws Exception {
AtomicReference<List<SequenceNumber>> failed = new AtomicReference<>();
UploadTask upload =
new UploadTask(getChanges(4), unused -> {}, (sqn, error) -> failed.set(sqn));
ManuallyTriggeredScheduledExecutorService scheduler =
new ManuallyTriggeredScheduledExecutorService();
try (BatchingStateChangeUploader store =
new BatchingStateChangeUploader(
0,
0,
Integer.MAX_VALUE,
RetryPolicy.fixed(1, 1, 1),
new BlockingUploader(),
scheduler,
new RetryingExecutor(
1,
createUnregisteredChangelogStorageMetricGroup()
.getAttemptsPerUpload()),
createUnregisteredChangelogStorageMetricGroup())) {
store.upload(upload);
Deadline deadline = Deadline.fromNow(Duration.ofMinutes(5));
while (!upload.finished.get() && deadline.hasTimeLeft()) {
scheduler.triggerScheduledTasks();
scheduler.triggerAll();
Thread.sleep(10);
}
}

assertTrue(upload.finished.get());
assertEquals(
upload.changeSets.stream()
.map(StateChangeSet::getSequenceNumber)
.collect(Collectors.toSet()),
new HashSet<>(failed.get()));
}

@Test(expected = RejectedExecutionException.class)
public void testErrorHandling() throws Exception {
TestingStateChangeUploader probe = new TestingStateChangeUploader();
Expand Down Expand Up @@ -344,4 +385,18 @@ private Tuple2<Thread, CompletableFuture<Void>> uploadAsync(int limit, TestScena
thread.start();
return Tuple2.of(thread, future);
}

private static final class BlockingUploader implements StateChangeUploader {
@Override
public void upload(UploadTask uploadTask) {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public void close() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ private void testPolicy(
} finally {
firstAttemptCompletedLatch.countDown();
}
});
},
t -> {});
firstAttemptCompletedLatch.await(); // before closing executor
}
assertEquals(expectedAttempts, attemptsMade.get());
Expand Down

0 comments on commit 2292c19

Please sign in to comment.