Skip to content

Commit

Permalink
[FLINK-2825] FlinkClient.killTopology fails due to missing leader ses…
Browse files Browse the repository at this point in the history
…sion ID
  • Loading branch information
mjsax committed Oct 6, 2015
1 parent af1e03e commit b489c36
Showing 1 changed file with 22 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
import org.apache.flink.storm.util.StormConfig;

Expand Down Expand Up @@ -211,31 +210,34 @@ public void killTopology(final String name) throws NotAliveException {
public void killTopologyWithOpts(final String name, final KillOptions options) throws NotAliveException {
final JobID jobId = this.getTopologyJobId(name);
if (jobId == null) {
throw new NotAliveException();
throw new NotAliveException("Storm topology with name " + name + " not found.");
}

try {
final ActorRef jobManager = this.getJobManager();

if (options != null) {
try {
Thread.sleep(1000 * options.get_wait_secs());
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}

final FiniteDuration askTimeout = this.getTimeout();
final Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(askTimeout));
if (options != null) {
try {
Await.result(response, askTimeout);
} catch (final Exception e) {
throw new RuntimeException("Killing topology " + name + " with Flink job ID " + jobId + " failed", e);
Thread.sleep(1000 * options.get_wait_secs());
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}

final Configuration configuration = GlobalConfiguration.getConfiguration();
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, this.jobManagerHost);
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, this.jobManagerPort);

final Client client;
try {
client = new Client(configuration);
} catch (final IOException e) {
throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
+ ":" + this.jobManagerPort, e);
throw new RuntimeException("Could not establish a connection to the job manager", e);
}

try {
client.cancel(jobId);
} catch (final Exception e) {
throw new RuntimeException("Cannot stop job.", e);
}

}

// Flink specific additional methods
Expand Down

0 comments on commit b489c36

Please sign in to comment.