Skip to content

Commit

Permalink
Remove concurrent access to process stdout
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Feb 23, 2020
1 parent 3c58f5a commit ae0fb0b
Showing 1 changed file with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -205,12 +208,20 @@ public JobID submitJob(final JobSubmission jobSubmission) throws IOException {

LOG.info("Running {}.", commands.stream().collect(Collectors.joining(" ")));

try (AutoClosableProcess flink = AutoClosableProcess.runNonBlocking(commands.toArray(new String[0]))) {
final Pattern pattern = jobSubmission.isDetached()
? Pattern.compile("Job has been submitted with JobID (.*)")
: Pattern.compile("Job with JobID (.*) has finished.");

final Pattern pattern = jobSubmission.isDetached()
? Pattern.compile("Job has been submitted with JobID (.*)")
: Pattern.compile("Job with JobID (.*) has finished.");
final CompletableFuture<String> rawJobIdFuture = new CompletableFuture<>();
final Consumer<String> stdoutProcessor = string -> {
LOG.info(string);
Matcher matcher = pattern.matcher(string);
if (matcher.matches()) {
rawJobIdFuture.complete(matcher.group(1));
}
};

try (AutoClosableProcess flink = AutoClosableProcess.create(commands.toArray(new String[0])).setStdoutProcessor(stdoutProcessor).runNonBlocking()) {
if (jobSubmission.isDetached()) {
try {
flink.getProcess().waitFor();
Expand All @@ -219,18 +230,10 @@ public JobID submitJob(final JobSubmission jobSubmission) throws IOException {
}
}

try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(flink.getProcess().getInputStream(), StandardCharsets.UTF_8))) {
final Optional<String> jobId = bufferedReader.lines()
.peek(LOG::info)
.map(pattern::matcher)
.filter(Matcher::matches)
.map(matcher -> matcher.group(1))
.findAny();
if (!jobId.isPresent()) {
throw new IOException("Could not determine Job ID.");
} else {
return JobID.fromHexString(jobId.get());
}
try {
return JobID.fromHexString(rawJobIdFuture.get(1, TimeUnit.MINUTES));
} catch (Exception e) {
throw new IOException("Could not determine Job ID.", e);
}
}
}
Expand Down

0 comments on commit ae0fb0b

Please sign in to comment.