Skip to content

Commit

Permalink
[FLINK-26485][state/changelog] Discard unnecessarily uploaded state
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan committed Mar 9, 2022
1 parent 1f1480a commit acd2238
Show file tree
Hide file tree
Showing 5 changed files with 292 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.changelog.fs;

import org.apache.flink.changelog.fs.StateChangeUploader.UploadTasksResult;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.AvailabilityProvider.AvailabilityHelper;
Expand Down Expand Up @@ -226,10 +227,7 @@ private void drainAndSave() {
return;
}
uploadBatchSizes.update(tasks.size());
retryingExecutor.execute(
retryPolicy,
() -> delegate.upload(tasks).complete(),
t -> tasks.forEach(task -> task.fail(t)));
retryingExecutor.execute(retryPolicy, asRetriableAction(tasks));
} catch (Throwable t) {
tasks.forEach(task -> task.fail(t));
if (findThrowable(t, IOException.class).isPresent()) {
Expand Down Expand Up @@ -296,4 +294,29 @@ public AvailabilityProvider getAvailabilityProvider() {
// or back-pressured hard trying to seize capacity in upload()
return availabilityHelper;
}

private RetryingExecutor.RetriableAction<UploadTasksResult> asRetriableAction(
Collection<UploadTask> tasks) {
return new RetryingExecutor.RetriableAction<UploadTasksResult>() {
@Override
public UploadTasksResult tryExecute() throws Exception {
return delegate.upload(tasks);
}

@Override
public void completeWithResult(UploadTasksResult uploadTasksResult) {
uploadTasksResult.complete();
}

@Override
public void discardResult(UploadTasksResult uploadTasksResult) throws Exception {
uploadTasksResult.discard();
}

@Override
public void handleFailure(Throwable throwable) {
tasks.forEach(task -> task.fail(throwable));
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.util.function.RunnableWithException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,7 +30,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
Expand Down Expand Up @@ -74,17 +72,11 @@ class RetryingExecutor implements AutoCloseable {
* <p>NOTE: the action must be idempotent because multiple instances of it can be executed
* concurrently (if the policy allows retries).
*/
void execute(
RetryPolicy retryPolicy, RetriableAction action, Consumer<Throwable> failureCallback) {
<T> void execute(RetryPolicy retryPolicy, RetriableAction<T> action) {
LOG.debug("execute with retryPolicy: {}", retryPolicy);
RetriableTask task =
RetriableTask.initialize(
action,
retryPolicy,
blockingExecutor,
attemptsPerTaskHistogram,
timer,
failureCallback);
RetriableActionAttempt<T> task =
RetriableActionAttempt.initialize(
action, retryPolicy, blockingExecutor, attemptsPerTaskHistogram, timer);
blockingExecutor.submit(task);
}

Expand Down Expand Up @@ -119,14 +111,43 @@ public void close() throws Exception {
*
* <p>NOTE: the action must be idempotent because of potential concurrent attempts.
*/
interface RetriableAction extends RunnableWithException {}
interface RetriableAction<Result> {
/**
* Make an attempt to execute this action.
*
* @return result of execution to be used in either {@link #completeWithResult(Object)} or
* {@link #discardResult(Object)}.
* @throws Exception any intermediate state should be cleaned up inside this method in case
* of failure
*/
Result tryExecute() throws Exception;

/**
* Complete the action with the given result, e.g. by notifying waiting parties. Called on
* successful execution once per action, regardless of the number of execution attempts.
*/
void completeWithResult(Result result);

/**
* Discard the execution results, e.g. because another execution attempt has completed
* earlier. This result will not be passed to {@link #completeWithResult(Object)} or
* otherwise used.
*/
void discardResult(Result result) throws Exception;

private static final class RetriableTask implements Runnable {
private final RetriableAction runnable;
private final Consumer<Throwable> failureCallback;
/**
* Handle this action failure, which means that an un-recoverable failure has occurred in
* {@link #tryExecute()} or retry limit has been reached. No further execution attempts will
* be performed.
*/
void handleFailure(Throwable throwable);
}

private static final class RetriableActionAttempt<Result> implements Runnable {
private final RetriableAction<Result> action;
private final ScheduledExecutorService blockingExecutor;
private final ScheduledExecutorService timer;
private final int current;
private final int attemptNumber;
private final RetryPolicy retryPolicy;
/**
* The flag shared across all attempts to execute the same {#link #runnable action}
Expand All @@ -146,19 +167,17 @@ private static final class RetriableTask implements Runnable {

private final Histogram attemptsPerTaskHistogram;

private RetriableTask(
int current,
private RetriableActionAttempt(
int attemptNumber,
AtomicBoolean actionCompleted,
RetriableAction runnable,
RetriableAction<Result> action,
RetryPolicy retryPolicy,
ScheduledExecutorService blockingExecutor,
ScheduledExecutorService timer,
Consumer<Throwable> failureCallback,
AtomicInteger activeAttempts,
Histogram attemptsPerTaskHistogram) {
this.current = current;
this.runnable = runnable;
this.failureCallback = failureCallback;
this.attemptNumber = attemptNumber;
this.action = action;
this.retryPolicy = retryPolicy;
this.blockingExecutor = blockingExecutor;
this.actionCompleted = actionCompleted;
Expand All @@ -170,21 +189,29 @@ private RetriableTask(

@Override
public void run() {
LOG.debug("starting attempt {}", current);
if (!actionCompleted.get()) {
Optional<ScheduledFuture<?>> timeoutFuture = scheduleTimeout();
try {
runnable.run();
if (actionCompleted.compareAndSet(false, true)) {
LOG.debug("succeeded with {} attempts", current);
attemptsPerTaskHistogram.update(current);
LOG.debug("starting attempt {}", attemptNumber);
if (actionCompleted.get()) {
return;
}
Optional<ScheduledFuture<?>> timeoutFuture = scheduleTimeout();
try {
Result result = action.tryExecute();
if (actionCompleted.compareAndSet(false, true)) {
LOG.debug("succeeded with {} attempts", attemptNumber);
action.completeWithResult(result);
attemptsPerTaskHistogram.update(attemptNumber);
} else {
LOG.debug("discard unnecessarily uploaded state, attempt {}", attemptNumber);
try {
action.discardResult(result);
} catch (Exception e) {
LOG.warn("unable to discard execution attempt result", e);
}
attemptCompleted.set(true);
} catch (Exception e) {
handleError(e);
} finally {
timeoutFuture.ifPresent(f -> f.cancel(true));
}
} catch (Exception e) {
handleError(e);
} finally {
timeoutFuture.ifPresent(f -> f.cancel(true));
}
}

Expand All @@ -194,61 +221,58 @@ private void handleError(Exception e) {
// or another attempt completed the task
return;
}
LOG.debug("execution attempt {} failed: {}", current, e.getMessage());
long nextAttemptDelay = retryPolicy.retryAfter(current, e);
LOG.debug("execution attempt {} failed: {}", attemptNumber, e.getMessage());
long nextAttemptDelay = retryPolicy.retryAfter(attemptNumber, e);
if (nextAttemptDelay >= 0L) {
activeAttempts.incrementAndGet();
scheduleNext(nextAttemptDelay, next());
}
if (activeAttempts.decrementAndGet() == 0
&& actionCompleted.compareAndSet(false, true)) {
LOG.info("failed with {} attempts: {}", current, e.getMessage());
failureCallback.accept(e);
LOG.info("failed with {} attempts: {}", attemptNumber, e.getMessage());
action.handleFailure(e);
}
}

private void scheduleNext(long nextAttemptDelay, RetriableTask next) {
private void scheduleNext(long nextAttemptDelay, RetriableActionAttempt<Result> next) {
if (nextAttemptDelay == 0L) {
blockingExecutor.submit(next);
} else if (nextAttemptDelay > 0L) {
blockingExecutor.schedule(next, nextAttemptDelay, MILLISECONDS);
}
}

private static RetriableTask initialize(
RetriableAction runnable,
private static <T> RetriableActionAttempt<T> initialize(
RetriableAction<T> runnable,
RetryPolicy retryPolicy,
ScheduledExecutorService blockingExecutor,
Histogram attemptsPerTaskHistogram,
ScheduledExecutorService timer,
Consumer<Throwable> failureCallback) {
return new RetriableTask(
ScheduledExecutorService timer) {
return new RetriableActionAttempt(
1,
new AtomicBoolean(false),
runnable,
retryPolicy,
blockingExecutor,
timer,
failureCallback,
new AtomicInteger(1),
attemptsPerTaskHistogram);
}

private RetriableTask next() {
return new RetriableTask(
current + 1,
private RetriableActionAttempt<Result> next() {
return new RetriableActionAttempt<>(
attemptNumber + 1,
actionCompleted,
runnable,
action,
retryPolicy,
blockingExecutor,
timer,
failureCallback,
activeAttempts,
attemptsPerTaskHistogram);
}

private Optional<ScheduledFuture<?>> scheduleTimeout() {
long timeout = retryPolicy.timeoutFor(current);
long timeout = retryPolicy.timeoutFor(attemptNumber);
return timeout <= 0
? Optional.empty()
: Optional.of(
Expand All @@ -258,7 +282,7 @@ private Optional<ScheduledFuture<?>> scheduleTimeout() {

private TimeoutException fmtError(long timeout) {
return new TimeoutException(
String.format("Attempt %d timed out after %dms", current, timeout));
String.format("Attempt %d timed out after %dms", attemptNumber, timeout));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,9 @@ private List<UploadResult> buildResults(
public long getStateSize() {
return handle.getStateSize();
}

public void discard() throws Exception {
handle.discardState();
}
}
}
Loading

0 comments on commit acd2238

Please sign in to comment.