Skip to content

Commit

Permalink
[FLINK-7739][tests] Properly close flink mini cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowojski authored and tillrohrmann committed Oct 30, 2017
1 parent 9eb878e commit f30f91a
Show file tree
Hide file tree
Showing 23 changed files with 38 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public static void shutDownServices() throws Exception {
TestStreamEnvironment.unsetAsContext();

if (flink != null) {
flink.shutdown();
flink.stop();
}
kafkaServer.shutdown();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,14 @@ protected static void startClusters(boolean secureMode, boolean hideKafkaBehindP
protected static void shutdownClusters() throws Exception {

if (flink != null) {
flink.shutdown();
flink.stop();
}

if (secureProps != null) {
secureProps.clear();
}

kafkaServer.shutdown();

}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static void setup(int proxyPortRangeStart, int serverPortRangeStart) {
@AfterClass
public static void tearDown() {
try {
cluster.shutdown();
cluster.stop();
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void testStandaloneWebRuntimeMonitor() throws Exception {
}
finally {
if (flink != null) {
flink.shutdown();
flink.stop();
}
}
}
Expand Down Expand Up @@ -411,7 +411,7 @@ public void testNoEscape() throws Exception {
}
} finally {
if (flink != null) {
flink.shutdown();
flink.stop();
}
}
}
Expand Down Expand Up @@ -471,7 +471,7 @@ public void testNoCopyFromJar() throws Exception {
}
} finally {
if (flink != null) {
flink.shutdown();
flink.stop();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ abstract class FlinkMiniCluster(

def stop(): Unit = {
LOG.info("Stopping FlinkMiniCluster.")
shutdown()
startInternalShutdown()
awaitTermination()

jobManagerLeaderRetrievalService.foreach(_.stop())
Expand All @@ -435,7 +435,7 @@ abstract class FlinkMiniCluster(
ioExecutor)
}

protected def shutdown(): Unit = {
protected def startInternalShutdown(): Unit = {
webMonitor foreach {
_.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ public void testCoordinatorShutsDownOnFailure() {
}
finally {
if (cluster != null) {
cluster.shutdown();
cluster.awaitTermination();
cluster.stop();
}
}
}
Expand Down Expand Up @@ -186,8 +185,7 @@ public void testCoordinatorShutsDownOnSuccess() {
}
finally {
if (cluster != null) {
cluster.shutdown();
cluster.awaitTermination();
cluster.stop();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void run() {
// if the job fails then an exception is thrown here
Await.result(promise.future(), deadline.timeLeft());
} finally {
cluster.shutdown();
cluster.stop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public boolean accept(File dir, String name) {
fail(e.getMessage());
} finally {
if (cluster != null) {
cluster.shutdown();
cluster.stop();
}
if (tempBlob != null) {
assertTrue(tempBlob.delete());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ protected void run() {
fail(e.getMessage());
} finally {
if (cluster != null) {
cluster.shutdown();
cluster.stop();
}
}
}
Expand Down Expand Up @@ -397,7 +397,7 @@ protected void run() {
fail(e.getMessage());
} finally {
if (cluster != null) {
cluster.shutdown();
cluster.stop();
}
}
}
Expand Down Expand Up @@ -493,7 +493,7 @@ protected void run() {
fail(e.getMessage());
} finally {
if (cluster != null) {
cluster.shutdown();
cluster.stop();
}
}
}
Expand Down Expand Up @@ -545,7 +545,7 @@ protected void run() {
expectMsgClass(JobResultSuccess.class);
} finally {
if (cluster != null) {
cluster.shutdown();
cluster.stop();
}
}
}
Expand Down Expand Up @@ -599,7 +599,7 @@ protected void run() {
expectMsgClass(ExecutionGraphFound.class);
} finally {
if (cluster != null) {
cluster.shutdown();
cluster.stop();
}
}
}
Expand Down Expand Up @@ -1025,7 +1025,7 @@ public void testCancelJobWithSavepointFailurePeriodicCheckpoints() throws Except
fail("Unexpected cancellation response from JobManager: " + cancellationResponse);
}
} finally {
testingCluster.shutdown();
testingCluster.stop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void testCancelAsyncProducerAndConsumer() throws Exception {
assertEquals(IllegalStateException.class, ASYNC_CONSUMER_EXCEPTION.getClass());
} finally {
if (flink != null) {
flink.shutdown();
flink.stop();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static void startCluster() {

@AfterClass
public static void shutdownCluster() {
cluster.shutdown();
cluster.stop();
cluster = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void setup() throws Exception {
@AfterClass
public static void shutDownExistingCluster() {
if (cluster != null) {
cluster.shutdown();
cluster.stop();
cluster.awaitTermination();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ public void testTriggerSavepointAndResumeWithFileBasedCheckpoints() throws Excep

// Shut down the Flink cluster (thereby canceling the job)
LOG.info("Shutting down Flink cluster.");
flink.shutdown();
flink.awaitTermination();
flink.stop();
flink = null;

// - Verification START -------------------------------------------

Expand Down Expand Up @@ -251,6 +251,7 @@ public void testTriggerSavepointAndResumeWithFileBasedCheckpoints() throws Excep

// Restart the cluster
LOG.info("Restarting Flink cluster.");
flink = new TestingCluster(config);
flink.start();

// Retrieve the job manager
Expand Down Expand Up @@ -409,7 +410,7 @@ protected void run() {
// - Verification END ---------------------------------------------
} finally {
if (flink != null) {
flink.shutdown();
flink.stop();
}
}
}
Expand Down Expand Up @@ -472,7 +473,7 @@ public void testSubmitWithUnknownSavepointPath() throws Exception {
}
} finally {
if (flink != null) {
flink.shutdown();
flink.stop();
}
}
}
Expand Down Expand Up @@ -572,6 +573,8 @@ public Integer map(Integer value) throws Exception {
flink = new TestingCluster(config);
try {
LOG.info("Restarting Flink cluster.");
flink = new TestingCluster(config);

flink.start(true);

// Retrieve the job manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public static void setUp() throws Exception {
@AfterClass
public static void tearDown() throws Exception {
if (testCluster != null) {
testCluster.shutdown();
testCluster.stop();
}

TestStreamEnvironment.unsetAsContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static void setup() {
@AfterClass
public static void teardown() {
try {
cluster.shutdown();
cluster.stop();
}
catch (Exception e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static void main(String[] args) {
}
finally {
if (cluster != null) {
cluster.shutdown();
cluster.stop();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static void main(String[] args) throws Exception {
}
finally {
if (cluster != null) {
cluster.shutdown();
cluster.stop();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static void startCluster() {

@AfterClass
public static void shutdownCluster() {
cluster.shutdown();
cluster.stop();
cluster = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public static void startCluster() {

@AfterClass
public static void shutdownCluster() {
cluster.shutdown();
cluster.stop();
cluster = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void testSuccessfulProgramAfterFailure() {
}
finally {
if (cluster != null) {
cluster.shutdown();
cluster.stop();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void testJobPersistencyWhenJobManagerShutdown() throws Exception {
jobManager, deadline.timeLeft());
}
finally {
flink.shutdown();
flink.stop();
}

// verify that the persisted job data has not been removed from ZooKeeper when the JM has
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws
}
finally {
if (flink != null) {
flink.shutdown();
flink.stop();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public static void startCluster() {

@AfterClass
public static void shutdownCluster() {
cluster.shutdown();
cluster.stop();
cluster = null;

TestStreamEnvironment.unsetAsContext();
Expand Down

0 comments on commit f30f91a

Please sign in to comment.