Skip to content

Commit

Permalink
[hotfix] Add ThrowingRunnable#unchecked and FunctionUtils#uncheckedCo…
Browse files Browse the repository at this point in the history
…nsumer

ThrowingRunnable#unchecked converts a ThrowingRunnable into a Runnable which throws checked
exceptions as unchecked ones. FunctionUtils#uncheckedConsmer(ThrowingConsumer) converts a
ThrowingConsumer into a Consumer which throws checked exceptions as unchecked ones. This is
necessary because ThrowingConsumer is public and we cannot add new methods to the interface.
  • Loading branch information
tillrohrmann committed Sep 14, 2018
1 parent 6feabbc commit 7be3956
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 51 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.util.ExceptionUtils;

import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -50,4 +51,22 @@ public static <A, B> Function<A, B> uncheckedFunction(FunctionWithException<A, B
}
};
}

/**
* Converts a {@link ThrowingConsumer} into a {@link Consumer} which throws checked exceptions
* as unchecked.
*
* @param throwingConsumer to convert into a {@link Consumer}
* @param <A> input type
* @return {@link Consumer} which throws all checked exceptions as unchecked
*/
public static <A> Consumer<A> uncheckedConsumer(ThrowingConsumer<A, ?> throwingConsumer) {
return (A value) -> {
try {
throwingConsumer.accept(value);
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.util.function;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.ExceptionUtils;

/**
* Similar to a {@link Runnable}, this interface is used to capture a block of code
Expand All @@ -35,4 +36,21 @@ public interface ThrowingRunnable<E extends Throwable> {
* @throws E Exceptions may be thrown.
*/
void run() throws E;

/**
* Converts a {@link ThrowingRunnable} into a {@link Runnable} which throws all checked exceptions
* as unchecked.
*
* @param throwingRunnable to convert into a {@link Runnable}
* @return {@link Runnable} which throws all checked exceptions as unchecked.
*/
static Runnable unchecked(ThrowingRunnable<?> throwingRunnable) {
return () -> {
try {
throwingRunnable.run();
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.ConsumerWithException;
import org.apache.flink.util.function.ThrowingConsumer;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
Expand Down Expand Up @@ -246,7 +246,7 @@ public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception
LOG.debug("Added {} to {}.", checkpoint, path);
}

private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ConsumerWithException<CompletedCheckpoint, Exception> discardCallback) {
private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ThrowingConsumer<CompletedCheckpoint, Exception> discardCallback) {
try {
if (tryRemove(completedCheckpoint.getCheckpointID())) {
executor.execute(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.ConsumerWithException;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -849,7 +850,7 @@ private CompletableFuture<Boolean> tryAcceptLeadershipAndRunJobs(UUID newLeaderS
}
}

private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ConsumerWithException<JobGraph, ?> action) {
private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
final CompletableFuture<Void> jobManagerTerminationFuture = getJobTerminationFuture(jobId)
.exceptionally((Throwable throwable) -> {
throw new CompletionException(
Expand All @@ -858,10 +859,10 @@ private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGra
throwable)); });

return jobManagerTerminationFuture.thenRunAsync(
() -> {
ThrowingRunnable.unchecked(() -> {
jobManagerTerminationFutures.remove(jobId);
action.accept(jobGraph);
},
}),
getMainThreadExecutor());
}

Expand Down Expand Up @@ -934,11 +935,11 @@ public void onAddedJobGraph(final JobID jobId) {
final CompletableFuture<Void> submissionFuture = recoveredJob.thenComposeAsync(
(Optional<JobGraph> jobGraphOptional) -> jobGraphOptional.map(
FunctionUtils.uncheckedFunction(jobGraph -> tryRunRecoveredJobGraph(jobGraph, dispatcherId).thenAcceptAsync(
(ConsumerWithException<Boolean, Exception>) (Boolean isRecoveredJobRunning) -> {
FunctionUtils.uncheckedConsumer((Boolean isRecoveredJobRunning) -> {
if (!isRecoveredJobRunning) {
submittedJobGraphStore.releaseJobGraph(jobId);
}
},
}),
getRpcService().getExecutor())))
.orElse(CompletableFuture.completedFuture(null)),
getUnfencedMainThreadExecutor());
Expand Down

0 comments on commit 7be3956

Please sign in to comment.