Skip to content

Commit

Permalink
[FLINK-10541][tests] Removed unused legacy methods in TestBaseUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
tisonkun authored and zentol committed Oct 13, 2018
1 parent 599b74b commit e2a5f15
Showing 1 changed file with 0 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,13 @@

import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.TestLogger;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

import akka.actor.ActorRef;
import akka.dispatch.Futures;
import akka.pattern.Patterns;
import akka.util.Timeout;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.junit.Assert;
Expand All @@ -52,13 +41,10 @@
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -73,10 +59,8 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand All @@ -103,9 +87,6 @@ public class TestBaseUtils extends TestLogger {

public static final Time DEFAULT_HTTP_TIMEOUT = Time.seconds(10L);

static final String NEW_CODEBASE = "new";
static final String CODEBASE_KEY = "codebase";

// ------------------------------------------------------------------------

protected static File logDir;
Expand All @@ -120,139 +101,6 @@ private static void verifyJvmOptions() {
+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
}

public static LocalFlinkMiniCluster startCluster(
int numTaskManagers,
int taskManagerNumSlots,
boolean startWebserver,
boolean startZooKeeper,
boolean singleActorSystem) throws Exception {

Configuration config = new Configuration();

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);

config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, startWebserver);

if (startZooKeeper) {
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
}

return startCluster(config, singleActorSystem);
}

public static LocalFlinkMiniCluster startCluster(
Configuration config,
boolean singleActorSystem) throws Exception {

if (!config.contains(WebOptions.LOG_PATH) || !config.containsKey(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY)) {
logDir = File.createTempFile("TestBaseUtils-logdir", null);
Assert.assertTrue("Unable to delete temp file", logDir.delete());
Assert.assertTrue("Unable to create temp directory", logDir.mkdir());
Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
Files.createFile(new File(logDir, "jobmanager.out").toPath());

if (!config.contains(WebOptions.LOG_PATH)) {
config.setString(WebOptions.LOG_PATH, logFile.toString());
}

if (!config.containsKey(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY)) {
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString());
}
}

if (!config.contains(WebOptions.PORT)) {
config.setInteger(WebOptions.PORT, 8081);
}

if (!config.contains(AkkaOptions.ASK_TIMEOUT)) {
config.setString(AkkaOptions.ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s");
}

if (!config.contains(AkkaOptions.STARTUP_TIMEOUT)) {
config.setString(AkkaOptions.STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT);
}

if (!config.contains(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE)) {
config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
}

if (!config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, TASK_MANAGER_MEMORY_SIZE);
}

LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, singleActorSystem);

cluster.start();

return cluster;
}

public static void stopCluster(LocalFlinkMiniCluster executor, FiniteDuration timeout) throws Exception {
if (logDir != null) {
FileUtils.deleteDirectory(logDir);
}
if (executor != null) {
int numUnreleasedBCVars = 0;
int numActiveConnections = 0;

if (executor.running()) {
List<ActorRef> tms = executor.getTaskManagersAsJava();
List<Future<Object>> bcVariableManagerResponseFutures = new ArrayList<>();
List<Future<Object>> numActiveConnectionsResponseFutures = new ArrayList<>();

for (ActorRef tm : tms) {
bcVariableManagerResponseFutures.add(Patterns.ask(
tm,
TaskManagerMessages.getRequestBroadcastVariablesWithReferences(),
new Timeout(timeout)));

numActiveConnectionsResponseFutures.add(Patterns.ask(
tm,
TaskManagerMessages.getRequestNumActiveConnections(),
new Timeout(timeout)));
}

Future<Iterable<Object>> bcVariableManagerFutureResponses = Futures.sequence(
bcVariableManagerResponseFutures, defaultExecutionContext());

Iterable<Object> responses = Await.result(bcVariableManagerFutureResponses, timeout);

for (Object response : responses) {
numUnreleasedBCVars += ((TaskManagerMessages.ResponseBroadcastVariablesWithReferences) response).number();
}

Future<Iterable<Object>> numActiveConnectionsFutureResponses = Futures.sequence(
numActiveConnectionsResponseFutures, defaultExecutionContext());

responses = Await.result(numActiveConnectionsFutureResponses, timeout);

for (Object response : responses) {
numActiveConnections += ((TaskManagerMessages.ResponseNumActiveConnections) response).number();
}
}

executor.stop();
try {
Class<?> hadoopFileSystemClass = Class.forName(
"org.apache.hadoop.fs.FileSystem",
true,
TestBaseUtils.class.getClassLoader());

Method closeAllMethod = hadoopFileSystemClass.getMethod("closeAll");
closeAllMethod.invoke(null);
} catch (Throwable e) {
// ignore
}
System.gc();

Assert.assertEquals("Not all broadcast variables were released.", 0, numUnreleasedBCVars);
Assert.assertEquals("Not all TCP connections were released.", 0, numActiveConnections);
}

}

// --------------------------------------------------------------------------------------------
// Result Checking
// --------------------------------------------------------------------------------------------
Expand Down

0 comments on commit e2a5f15

Please sign in to comment.