Skip to content

Commit

Permalink
[FLINK-18086][tests] Support to set standard inputs for AutoClosableP…
Browse files Browse the repository at this point in the history
…rocess
  • Loading branch information
wuchong committed Jun 16, 2020
1 parent 4a775af commit 7ce45b5
Showing 1 changed file with 45 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
Expand All @@ -37,6 +41,9 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Utility class to terminate a given {@link Process} when exiting a try-with-resources statement.
*/
Expand Down Expand Up @@ -74,6 +81,7 @@ public static final class AutoClosableProcessBuilder {
private final String[] commands;
private Consumer<String> stdoutProcessor = LOG::debug;
private Consumer<String> stderrProcessor = LOG::debug;
private @Nullable String[] stdInputs;

AutoClosableProcessBuilder(final String... commands) {
this.commands = commands;
Expand All @@ -89,6 +97,13 @@ public AutoClosableProcessBuilder setStderrProcessor(final Consumer<String> stde
return this;
}

public AutoClosableProcessBuilder setStdInputs(final String... inputLines) {
checkNotNull(inputLines);
checkArgument(inputLines.length >= 1);
this.stdInputs = inputLines;
return this;
}

public void runBlocking() throws IOException {
runBlocking(Duration.ofSeconds(30));
}
Expand All @@ -97,9 +112,10 @@ public void runBlocking(final Duration timeout) throws IOException {
final StringWriter sw = new StringWriter();
try (final PrintWriter printer = new PrintWriter(sw)) {
final Process process = createProcess(commands, stdoutProcessor, line -> {
stderrProcessor.accept(line);
printer.println(line);
});
stderrProcessor.accept(line);
printer.println(line);
},
stdInputs);

try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) {
final boolean success = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -136,24 +152,31 @@ public void runBlockingWithRetry(final int maxRetries, final Duration attemptTim
}

public AutoClosableProcess runNonBlocking() throws IOException {
return new AutoClosableProcess(createProcess(commands, stdoutProcessor, stderrProcessor));
return new AutoClosableProcess(createProcess(commands, stdoutProcessor, stderrProcessor, stdInputs));
}
}

private static Process createProcess(final String[] commands, Consumer<String> stdoutProcessor, Consumer<String> stderrProcessor) throws IOException {
private static Process createProcess(
final String[] commands,
Consumer<String> stdoutProcessor,
Consumer<String> stderrProcessor,
@Nullable String[] stdInputs) throws IOException {
final ProcessBuilder processBuilder = new ProcessBuilder();
LOG.debug("Creating process: {}", Arrays.toString(commands));
processBuilder.command(commands);

final Process process = processBuilder.start();

processStream(process.getInputStream(), stdoutProcessor);
processStream(process.getErrorStream(), stderrProcessor);
consumeOutput(process.getInputStream(), stdoutProcessor);
consumeOutput(process.getErrorStream(), stderrProcessor);
if (stdInputs != null) {
produceInput(process.getOutputStream(), stdInputs);
}

return process;
}

private static void processStream(final InputStream stream, final Consumer<String> streamConsumer) {
private static void consumeOutput(final InputStream stream, final Consumer<String> streamConsumer) {
new Thread(() -> {
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) {
String line;
Expand All @@ -167,6 +190,20 @@ private static void processStream(final InputStream stream, final Consumer<Strin
).start();
}

private static void produceInput(final OutputStream stream, final String[] inputLines) {
new Thread(() -> {
// try with resource will close the OutputStream automatically,
// usually the process terminal will also be finished then.
try (PrintStream printStream = new PrintStream(stream, true, StandardCharsets.UTF_8.name())) {
for (String line : inputLines) {
printStream.println(line);
}
} catch (IOException e) {
LOG.error("Failure while processing process stdin.", e);
}
}).start();
}

@Override
public void close() throws IOException {
if (process.isAlive()) {
Expand Down

0 comments on commit 7ce45b5

Please sign in to comment.