Skip to content

Commit

Permalink
[FLINK-11069][core] Merge FutureUtil into FutureUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
tisonkun authored and zentol committed Jan 17, 2019
1 parent 3d25080 commit 322e479
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 124 deletions.
85 changes: 0 additions & 85 deletions flink-core/src/main/java/org/apache/flink/util/FutureUtil.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -349,6 +351,27 @@ public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, lo
// Future actions
// ------------------------------------------------------------------------

/**
* Run the given {@code RunnableFuture} if it is not done, and then retrieves its result.
* @param future to run if not done and get
* @param <T> type of the result
* @return the result after running the future
* @throws ExecutionException if a problem occurred
* @throws InterruptedException if the current thread has been interrupted
*/
public static <T> T runIfNotDoneAndGet(RunnableFuture<T> future) throws ExecutionException, InterruptedException {

if (null == future) {
return null;
}

if (!future.isDone()) {
future.run();
}

return future.get();
}

/**
* Run the given action after the completion of the given future. The given future can be
* completed normally or exceptionally. In case of an exceptional completion the, the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.runtime.state;

import org.apache.flink.util.FutureUtil;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.util.LambdaUtil;

import org.slf4j.Logger;
Expand Down Expand Up @@ -73,7 +73,7 @@ public static void discardStateFuture(RunnableFuture<? extends StateObject> stat

try {
// We attempt to get a result, in case the future completed before cancellation.
StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture);
StateObject stateObject = FutureUtils.runIfNotDoneAndGet(stateFuture);

if (null != stateObject) {
stateObject.discardState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
Expand All @@ -30,18 +31,21 @@
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
import org.apache.flink.util.function.CheckedSupplier;

import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;


import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
import static org.apache.flink.util.FutureUtil.waitForAll;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -199,15 +203,18 @@ public void onEvent(AbstractEvent event) {

final PipelinedSubpartition subpartition = createSubpartition();

TestSubpartitionProducer producer = new TestSubpartitionProducer(subpartition, isSlowProducer, producerSource);
TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(isSlowConsumer, consumerCallback);
final PipelinedSubpartitionView view = subpartition.createReadView(consumer);
consumer.setSubpartitionView(view);

Future<Boolean> producerResult = executorService.submit(
new TestSubpartitionProducer(subpartition, isSlowProducer, producerSource));
Future<Boolean> consumerResult = executorService.submit(consumer);
CompletableFuture<Boolean> producerResult = CompletableFuture.supplyAsync(
CheckedSupplier.unchecked(producer::call), executorService);
CompletableFuture<Boolean> consumerResult = CompletableFuture.supplyAsync(
CheckedSupplier.unchecked(consumer::call), executorService);

waitForAll(60_000L, producerResult, consumerResult);
FutureUtils.waitForAll(Arrays.asList(producerResult, consumerResult))
.get(60_000L, TimeUnit.MILLISECONDS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
Expand All @@ -42,6 +43,7 @@
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.util.function.CheckedSupplier;

import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;

Expand All @@ -57,13 +59,13 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import scala.Tuple2;

import static org.apache.flink.util.FutureUtil.waitForAll;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -158,27 +160,30 @@ public void testConcurrentConsumeMultiplePartitions() throws Exception {
// Test
try {
// Submit producer tasks
List<Future<?>> results = Lists.newArrayListWithCapacity(
List<CompletableFuture<?>> results = Lists.newArrayListWithCapacity(
parallelism + 1);

for (int i = 0; i < parallelism; i++) {
results.add(executor.submit(partitionProducers[i]));
results.add(CompletableFuture.supplyAsync(
CheckedSupplier.unchecked(partitionProducers[i]::call), executor));
}

// Submit consumer
for (int i = 0; i < parallelism; i++) {
results.add(executor.submit(
new TestLocalInputChannelConsumer(
i,
parallelism,
numberOfBuffersPerChannel,
networkBuffers.createBufferPool(parallelism, parallelism),
partitionManager,
new TaskEventDispatcher(),
partitionIds)));
final TestLocalInputChannelConsumer consumer = new TestLocalInputChannelConsumer(
i,
parallelism,
numberOfBuffersPerChannel,
networkBuffers.createBufferPool(parallelism, parallelism),
partitionManager,
new TaskEventDispatcher(),
partitionIds);

results.add(CompletableFuture.supplyAsync(CheckedSupplier.unchecked(consumer::call), executor));
}

waitForAll(60_000L, results);
FutureUtils.waitForAll(results)
.get(60_000L, TimeUnit.MILLISECONDS);
}
finally {
networkBuffers.destroyAllBufferPools();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FutureUtil;

import org.junit.Assert;
import org.junit.Ignore;
Expand Down Expand Up @@ -121,7 +121,7 @@ public void testOperatorStateRestoreFailsIfSerializerDeserializationFails() thro
RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture =
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());

SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(runnableFuture);
SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(runnableFuture);
OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState;
Expand All @@ -43,7 +44,6 @@
import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.Preconditions;

import org.junit.Assert;
Expand Down Expand Up @@ -252,7 +252,7 @@ public void testCorrectClassLoaderUsedOnSnapshot() throws Exception {
CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture =
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
FutureUtil.runIfNotDoneAndGet(runnableFuture);
FutureUtils.runIfNotDoneAndGet(runnableFuture);

// make sure that the copy method has been called
assertTrue(copyCounter.get() > 0);
Expand Down Expand Up @@ -374,7 +374,7 @@ public void testSnapshotEmpty() throws Exception {
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot =
operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());

SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot);
SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
assertNull(stateHandle);
}
Expand Down Expand Up @@ -404,7 +404,7 @@ public void testSnapshotBroadcastStateWithEmptyOperatorState() throws Exception
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot =
operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());

SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot);
SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
assertNotNull(stateHandle);

Expand All @@ -422,7 +422,7 @@ public void testSnapshotBroadcastStateWithEmptyOperatorState() throws Exception
expected.remove(1);

snapshot = operatorStateBackend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot);
snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);

stateHandle.discardState();
stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
Expand All @@ -440,7 +440,7 @@ public void testSnapshotBroadcastStateWithEmptyOperatorState() throws Exception
expected.clear();

snapshot = operatorStateBackend.snapshot(2L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot);
snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
if (stateHandle != null) {
stateHandle.discardState();
}
Expand Down Expand Up @@ -509,7 +509,7 @@ public void testSnapshotRestoreSync() throws Exception {
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot =
operatorStateBackend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());

SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot);
SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();

try {
Expand Down Expand Up @@ -873,7 +873,7 @@ public void testRestoreFailsIfSerializerDeserializationFails() throws Exception
RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture =
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());

SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(runnableFuture);
SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(runnableFuture);
OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();

try {
Expand Down
Loading

0 comments on commit 322e479

Please sign in to comment.