Skip to content

Commit

Permalink
[FLINK-13794][client] Remove unused logic of printStatusDuringExecution
Browse files Browse the repository at this point in the history
  • Loading branch information
tisonkun authored and mxm committed Aug 27, 2019
1 parent c51b19c commit c30e84f
Show file tree
Hide file tree
Showing 68 changed files with 60 additions and 295 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ public void start() throws Exception {
synchronized (lock) {
if (client == null) {
client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor");
client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
}
else {
throw new IllegalStateException("The remote executor was already started.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ private <T> void runProgram(
}

try {
client.setPrintStatusDuringExecution(runOptions.getStdoutLogging());
client.setDetached(runOptions.getDetachedMode());

LOG.debug("{}", runOptions.getSavepointRestoreSettings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PYMODULE_OPTION;
Expand All @@ -57,8 +56,6 @@ public abstract class ProgramOptions extends CommandLineOptions {

private final int parallelism;

private final boolean stdoutLogging;

private final boolean detachedMode;

private final boolean shutdownOnAttachedExit;
Expand Down Expand Up @@ -171,7 +168,6 @@ protected ProgramOptions(CommandLine line) throws CliArgsException {
parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
}

stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());
detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || line.hasOption(
YARN_DETACHED_OPTION.getOpt());
shutdownOnAttachedExit = line.hasOption(SHUTDOWN_IF_ATTACHED_OPTION.getOpt());
Expand Down Expand Up @@ -199,10 +195,6 @@ public int getParallelism() {
return parallelism;
}

public boolean getStdoutLogging() {
return stdoutLogging;
}

public boolean getDetachedMode() {
return detachedMode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ public abstract class ClusterClient<T> {

private final boolean sharedHaServices;

/** Flag indicating whether to sysout print execution updates. */
private boolean printStatusDuringExecution = true;

/**
* For interactive invocations, the job results are only available after the ContextEnvironment has
* been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment
Expand Down Expand Up @@ -163,23 +160,6 @@ public void shutdown() throws Exception {
// Configuration
// ------------------------------------------------------------------------

/**
* Configures whether the client should print progress updates during the execution to {@code System.out}.
* All updates are logged via the SLF4J loggers regardless of this setting.
*
* @param print True to print updates to standard out during execution, false to not print them.
*/
public void setPrintStatusDuringExecution(boolean print) {
this.printStatusDuringExecution = print;
}

/**
* @return whether the client will print progress updates during the execution to {@code System.out}
*/
public boolean getPrintStatusDuringExecution() {
return this.printStatusDuringExecution;
}

/**
* Gets the current cluster connection info (may change in case of a HA setup).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,25 @@ public void testRun() throws Exception {
// test without parallelism
{
String[] parameters = {"-v", getTestJarPath()};
verifyCliFrontend(getCli(configuration), parameters, 1, true, false);
verifyCliFrontend(getCli(configuration), parameters, 1, false);
}

// test configure parallelism
{
String[] parameters = {"-v", "-p", "42", getTestJarPath()};
verifyCliFrontend(getCli(configuration), parameters, 42, true, false);
verifyCliFrontend(getCli(configuration), parameters, 42, false);
}

// test configure sysout logging
{
String[] parameters = {"-p", "2", "-q", getTestJarPath()};
verifyCliFrontend(getCli(configuration), parameters, 2, false, false);
verifyCliFrontend(getCli(configuration), parameters, 2, false);
}

// test detached mode
{
String[] parameters = {"-p", "2", "-d", getTestJarPath()};
verifyCliFrontend(getCli(configuration), parameters, 2, true, true);
verifyCliFrontend(getCli(configuration), parameters, 2, true);
}

// test configure savepoint path (no ignore flag)
Expand Down Expand Up @@ -145,40 +145,34 @@ public void testParallelismWithOverflow() throws Exception {
// --------------------------------------------------------------------------------------------

public static void verifyCliFrontend(
AbstractCustomCommandLine<?> cli,
String[] parameters,
int expectedParallelism,
boolean logging,
boolean isDetached) throws Exception {
AbstractCustomCommandLine<?> cli,
String[] parameters,
int expectedParallelism,
boolean isDetached) throws Exception {
RunTestingCliFrontend testFrontend =
new RunTestingCliFrontend(cli, expectedParallelism, logging,
isDetached);
new RunTestingCliFrontend(cli, expectedParallelism, isDetached);
testFrontend.run(parameters); // verifies the expected values (see below)
}

private static final class RunTestingCliFrontend extends CliFrontend {

private final int expectedParallelism;
private final boolean sysoutLogging;
private final boolean isDetached;

private RunTestingCliFrontend(
AbstractCustomCommandLine<?> cli,
int expectedParallelism,
boolean logging,
boolean isDetached) throws Exception {
boolean isDetached) {
super(
cli.getConfiguration(),
Collections.singletonList(cli));
this.expectedParallelism = expectedParallelism;
this.sysoutLogging = logging;
this.isDetached = isDetached;
}

@Override
protected void executeProgram(PackagedProgram program, ClusterClient client, int parallelism) {
assertEquals(isDetached, client.isDetached());
assertEquals(sysoutLogging, client.getPrintStatusDuringExecution());
assertEquals(expectedParallelism, parallelism);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public class ExecutionPlanAfterExecutionTest extends TestLogger implements Seria
@Test
public void testExecuteAfterGetExecutionPlan() {
ExecutionEnvironment env = new LocalEnvironment();
env.getConfig().disableSysoutLogging();

DataSet<Integer> baseSet = env.fromElements(1, 2);

Expand Down Expand Up @@ -87,7 +86,6 @@ public void testCreatePlanAfterGetExecutionPlan() {
@Test
public void testGetExecutionPlanOfRangePartition() {
ExecutionEnvironment env = new LocalEnvironment();
env.getConfig().disableSysoutLogging();

DataSet<Integer> baseSet = env.fromElements(1, 2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ public void testTimestamps() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().disableSysoutLogging();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Long> streamWithTimestamps = env.addSource(new SourceFunction<Long>() {
private static final long serialVersionUID = -2255105836471289626L;
Expand Down Expand Up @@ -224,8 +223,7 @@ public int partition(Long next, byte[] key, byte[] value, String targetTopic, in
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().disableSysoutLogging();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

FlinkKafkaConsumer010<Long> kafkaSource = new FlinkKafkaConsumer010<>(topic, new LimitedLongDeserializer(), standardProps);
kafkaSource.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Long>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ public void testTimestamps() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().disableSysoutLogging();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Long> streamWithTimestamps = env.addSource(new SourceFunction<Long>() {
private static final long serialVersionUID = -2255115836471289626L;
Expand Down Expand Up @@ -233,8 +232,7 @@ public int partition(Long next, byte[] key, byte[] value, String targetTopic, in
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().disableSysoutLogging();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

FlinkKafkaConsumer011<Long> kafkaSource = new FlinkKafkaConsumer011<>(topic, new LimitedLongDeserializer(), standardProps);
kafkaSource.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Long>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public void testInvalidOffset() throws Exception {
final int startFrom = 0;

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();

readSequence(env, StartupMode.GROUP_OFFSETS, null, null, standardProps, parallelism, topic, valuesCount, startFrom);

Expand Down Expand Up @@ -191,8 +190,7 @@ public void testOffsetAutocommitTest() throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// NOTE: We are not enabling the checkpointing!
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.setParallelism(parallelism);

// the readSequence operation sleeps for 20 ms between each record.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ public void testRateLimitedConsumer() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().disableSysoutLogging();

DataStream<String> stream = env.addSource(new SourceFunction<String>() {
private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -216,7 +215,6 @@ public void cancel() {
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().disableSysoutLogging();

// ---------- RateLimiter config -------------
final long globalRate = 10; // bytes/second
Expand Down
Loading

0 comments on commit c30e84f

Please sign in to comment.