Skip to content

Commit

Permalink
[FLINK-22988][tests] Port TestingUtils to java
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol authored Jun 21, 2021
1 parent f46cfec commit 28d146a
Show file tree
Hide file tree
Showing 71 changed files with 152 additions and 304 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.QueryableStateStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.MessageParameter;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.util.BlobServerResource;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
Expand Down Expand Up @@ -220,7 +220,7 @@ public void testRetryWithDelayFixedArgsFailure() throws Throwable {
TestingUtils.defaultScheduledExecutor());

try {
retryFuture.get(TestingUtils.TIMEOUT().toMilliseconds(), TimeUnit.MILLISECONDS);
retryFuture.get(TestingUtils.TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
} catch (ExecutionException ee) {
throw ExceptionUtils.stripExecutionException(ee);
}
Expand All @@ -238,7 +238,7 @@ public void testRetryWithDelayRetryStrategyFailure() throws Throwable {
TestingUtils.defaultScheduledExecutor());

try {
retryFuture.get(TestingUtils.TIMEOUT().toMilliseconds(), TimeUnit.MILLISECONDS);
retryFuture.get(TestingUtils.TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
} catch (ExecutionException ee) {
throw ExceptionUtils.stripExecutionException(ee);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.util.ManualTicker;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.Preconditions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.util.BlobServerResource;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.util.ZooKeeperUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.runtime.testutils.TestingUtils;

import javax.annotation.Nullable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.util.TestLogger;

import org.hamcrest.Matcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.runtime.highavailability.nonha.embedded;

import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.util.TestLogger;

import org.junit.Assert;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;

import org.apache.flink.shaded.netty4.io.netty.channel.Channel;

Expand Down Expand Up @@ -109,10 +109,10 @@ public ResultSubpartitionView answer(
.await();

// Wait for the notification
if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) {
fail(
"Timed out after waiting for "
+ TestingUtils.TESTING_DURATION().toMillis()
+ TestingUtils.TESTING_DURATION.toMillis()
+ " ms to be notified about cancelled partition.");
}

Expand Down Expand Up @@ -168,10 +168,10 @@ public ResultSubpartitionView answer(
.await();

// Wait for the notification
if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) {
fail(
"Timed out after waiting for "
+ TestingUtils.TESTING_DURATION().toMillis()
+ TestingUtils.TESTING_DURATION.toMillis()
+ " ms to be notified about cancelled partition.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;

import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
Expand Down Expand Up @@ -148,10 +148,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
requestClient.requestSubpartition(new ResultPartitionID(), 0, rich[1], 0);

// Wait for the notification and it could confirm all the request operations are done
if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) {
fail(
"Timed out after waiting for "
+ TestingUtils.TESTING_DURATION().toMillis()
+ TestingUtils.TESTING_DURATION.toMillis()
+ " ms to be notified about the channel error.");
}

Expand Down Expand Up @@ -275,10 +275,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
ch.writeAndFlush(Unpooled.buffer().writerIndex(16));

// Wait for the notification
if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) {
fail(
"Timed out after waiting for "
+ TestingUtils.TESTING_DURATION().toMillis()
+ TestingUtils.TESTING_DURATION.toMillis()
+ " ms to be notified about remote connection close.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;

import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
Expand Down Expand Up @@ -111,10 +111,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
new ResultPartitionID(), 0, new InputChannelID(), Integer.MAX_VALUE));

// Wait for the notification
if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) {
fail(
"Timed out after waiting for "
+ TestingUtils.TESTING_DURATION().toMillis()
+ TestingUtils.TESTING_DURATION.toMillis()
+ " ms to be notified about released partition.");
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.util.TestLogger;

import org.junit.ClassRule;
Expand Down Expand Up @@ -63,7 +63,7 @@ public class PartialConsumePipelinedResultTest extends TestLogger {

private static Configuration getFlinkConfiguration() {
final Configuration config = new Configuration();
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT);
config.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -68,7 +68,7 @@ public class SlotCountExceedingParallelismTest extends TestLogger {

private static Configuration getFlinkConfiguration() {
final Configuration config = new Configuration();
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT);

return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.execution.librarycache.ContextClassLoaderLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;

import java.util.concurrent.ScheduledExecutorService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.TestLogger;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -210,8 +210,7 @@ private static final class EmbeddedServiceClass implements ServiceClass {

@Override
public void setup() {
embeddedLeaderService =
new EmbeddedLeaderService(TestingUtils.defaultExecutionContext());
embeddedLeaderService = new EmbeddedLeaderService(TestingUtils.defaultExecutor());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.TestLogger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.util.TestLogger;

import org.junit.After;
Expand Down Expand Up @@ -195,7 +195,7 @@ public static void registerTaskExecutor(
ResourceProfile.ZERO);
final CompletableFuture<RegistrationResponse> registrationFuture =
resourceManagerGateway.registerTaskExecutor(
taskExecutorRegistration, TestingUtils.TIMEOUT());
taskExecutorRegistration, TestingUtils.TIMEOUT);

assertThat(registrationFuture.get(), instanceOf(RegistrationResponse.Success.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
Expand Down
Loading

0 comments on commit 28d146a

Please sign in to comment.