Skip to content

Commit

Permalink
[FLINK-11146][clients] Remove legacy code code in ClusterClient
Browse files Browse the repository at this point in the history
  • Loading branch information
tisonkun authored and zentol committed Mar 28, 2019
1 parent 2f80c8e commit 8438506
Show file tree
Hide file tree
Showing 14 changed files with 47 additions and 765 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.common.Program;
Expand Down Expand Up @@ -248,11 +247,6 @@ public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op);
}

@Override
public void endSession(JobID jobID) throws Exception {
// no op
}

private Configuration createConfiguration() {
Configuration newConfiguration = new Configuration();
newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.client;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.client.program.ClusterClient;
Expand Down Expand Up @@ -224,34 +223,4 @@ public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optPlan);
}

@Override
public void endSession(JobID jobID) throws Exception {
if (jobID == null) {
throw new NullPointerException("The supplied jobID must not be null.");
}

synchronized (this.lock) {
// check if we start a session dedicated for this execution
final boolean shutDownAtEnd;

if (client == null) {
shutDownAtEnd = true;
// start the executor for us
start();
}
else {
// we use the existing session
shutDownAtEnd = false;
}

try {
client.endSession(jobID);
}
finally {
if (shutDownAtEnd) {
stop();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@

import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.MODIFY_PARALLELISM_OPTION;
import static org.apache.flink.client.program.ClusterClient.MAX_SLOTS_UNKNOWN;

/**
* Implementation of a simple command line frontend for executing programs.
Expand Down Expand Up @@ -269,18 +268,12 @@ private <T> void runProgram(
try {
client.setPrintStatusDuringExecution(runOptions.getStdoutLogging());
client.setDetached(runOptions.getDetachedMode());
LOG.debug("Client slots is set to {}", client.getMaxSlots());

LOG.debug("{}", runOptions.getSavepointRestoreSettings());

int userParallelism = runOptions.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);
if (client.getMaxSlots() != MAX_SLOTS_UNKNOWN && userParallelism == -1) {
logAndSysout("Using the parallelism provided by the remote cluster ("
+ client.getMaxSlots() + "). "
+ "To use another parallelism, set it at the ./bin/flink client.");
userParallelism = client.getMaxSlots();
} else if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
userParallelism = defaultParallelism;
}

Expand Down
Loading

0 comments on commit 8438506

Please sign in to comment.