Skip to content

Commit

Permalink
[FLINK-10819] Replace with the Flink's Deadline implementation
Browse files Browse the repository at this point in the history
Remove Scala's Deadline and replace it with Flink's own version.

This closes apache#10950.
  • Loading branch information
guoweiM authored and tillrohrmann committed Jan 25, 2020
1 parent e7e2447 commit 53f956f
Showing 1 changed file with 4 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
Expand Down Expand Up @@ -70,10 +71,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -99,7 +96,7 @@ public class JobManagerHAProcessFailureRecoveryITCase extends TestLogger {

private static ZooKeeperTestEnvironment zooKeeper;

private static final FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
private static final Duration TEST_TIMEOUT = Duration.ofMinutes(5);

@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
Expand Down Expand Up @@ -258,7 +255,7 @@ public void testDispatcherProcessFailure() throws Exception {
final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config);

try {
final Deadline deadline = TestTimeOut.fromNow();
final Deadline deadline = Deadline.fromNow(TEST_TIMEOUT);

// Coordination directory
coordinateTempDir = temporaryFolder.newFolder();
Expand Down Expand Up @@ -394,7 +391,7 @@ public void run() {
}
}

private void waitForTaskManagers(int numberOfTaskManagers, DispatcherGateway dispatcherGateway, FiniteDuration timeLeft) throws ExecutionException, InterruptedException {
private void waitForTaskManagers(int numberOfTaskManagers, DispatcherGateway dispatcherGateway, Duration timeLeft) throws ExecutionException, InterruptedException {
FutureUtils.retrySuccessfulWithDelay(
() -> dispatcherGateway.requestClusterOverview(Time.milliseconds(timeLeft.toMillis())),
Time.milliseconds(50L),
Expand Down

0 comments on commit 53f956f

Please sign in to comment.