Skip to content

Commit

Permalink
[FLINK-19863][test] Fix SQLClientHBaseITCase.testHBase failed with pr…
Browse files Browse the repository at this point in the history
…ocess timeout

This closes apache#14274
  • Loading branch information
leonardBang committed Dec 4, 2020
1 parent 3111574 commit 6c18317
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ public void testKafka() throws Exception {

private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines) throws IOException {
LOG.info("Executing Kafka {} end-to-end SQL statements.", kafkaSQLVersion);
clusterController.submitSQLJob(new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
.addJar(sqlAvroJar)
.addJars(apacheAvroJars)
.addJar(sqlConnectorKafkaJar)
.addJar(sqlToolBoxJar)
.build());
clusterController.submitSQLJob(
new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
.addJar(sqlAvroJar)
.addJars(apacheAvroJars)
.addJar(sqlConnectorKafkaJar)
.addJar(sqlToolBoxJar)
.build(),
Duration.ofMinutes(2L));
}

private List<String> initializeSqlLines(Map<String, String> vars) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.slf4j.LoggerFactory;

import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -97,17 +98,19 @@ public void testKafka() throws Exception {
kafka.createTopic(1, 1, outputTopic);

// run the Flink job (detached mode)
clusterController.submitJob(new JobSubmission.JobSubmissionBuilder(kafkaExampleJar)
.setDetached(true)
.addArgument("--input-topic", inputTopic)
.addArgument("--output-topic", outputTopic)
.addArgument("--prefix", "PREFIX")
.addArgument("--bootstrap.servers", kafka.getBootstrapServerAddresses().stream().map(address -> address.getHostString() + ':' + address.getPort()).collect(Collectors.joining(",")))
.addArgument("--group.id", "myconsumer")
.addArgument("--auto.offset.reset", "earliest")
.addArgument("--transaction.timeout.ms", "900000")
.addArgument("--flink.partition-discovery.interval-millis", "1000")
.build());
clusterController.submitJob(
new JobSubmission.JobSubmissionBuilder(kafkaExampleJar)
.setDetached(true)
.addArgument("--input-topic", inputTopic)
.addArgument("--output-topic", outputTopic)
.addArgument("--prefix", "PREFIX")
.addArgument("--bootstrap.servers", kafka.getBootstrapServerAddresses().stream().map(address -> address.getHostString() + ':' + address.getPort()).collect(Collectors.joining(",")))
.addArgument("--group.id", "myconsumer")
.addArgument("--auto.offset.reset", "earliest")
.addArgument("--transaction.timeout.ms", "900000")
.addArgument("--flink.partition-discovery.interval-millis", "1000")
.build(),
Duration.ofMinutes(2L));

LOG.info("Sending messages to Kafka topic [{}] ...", inputTopic);
// send some data to Kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.util.AutoCloseableAsync;

import java.io.IOException;
import java.time.Duration;

/**
* Controller for interacting with a cluster.
Expand All @@ -31,16 +32,18 @@ public interface ClusterController extends AutoCloseableAsync {
* Submits the given job to the cluster.
*
* @param job job to submit
* @param timeout the maximum time to wait.
* @return JobController for the submitted job
* @throws IOException
*/
JobController submitJob(JobSubmission job) throws IOException;
JobController submitJob(JobSubmission job, Duration timeout) throws IOException;

/**
* Submits the given SQL job to the cluster.
*
* @param job job to submit.
* @param timeout the maximum time to wait.
* @throws IOException if any IO error happen.
*/
void submitSQLJob(SQLJobSubmission job) throws IOException;
void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
Expand Down Expand Up @@ -151,7 +152,7 @@ public void stopFlinkCluster() throws IOException {
AutoClosableProcess.runBlocking(bin.resolve("stop-cluster.sh").toAbsolutePath().toString());
}

public JobID submitJob(final JobSubmission jobSubmission) throws IOException {
public JobID submitJob(final JobSubmission jobSubmission, Duration timeout) throws IOException {
final List<String> commands = new ArrayList<>(4);
commands.add(bin.resolve("flink").toString());
commands.add("run");
Expand Down Expand Up @@ -190,14 +191,14 @@ public JobID submitJob(final JobSubmission jobSubmission) throws IOException {
}

try {
return JobID.fromHexString(rawJobIdFuture.get(1, TimeUnit.MINUTES));
return JobID.fromHexString(rawJobIdFuture.get(timeout.getSeconds(), TimeUnit.SECONDS));
} catch (Exception e) {
throw new IOException("Could not determine Job ID.", e);
}
}
}

public void submitSQLJob(SQLJobSubmission job) throws IOException {
public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOException {
final List<String> commands = new ArrayList<>();
commands.add(bin.resolve("sql-client.sh").toAbsolutePath().toString());
commands.add("embedded");
Expand All @@ -218,7 +219,7 @@ public void submitSQLJob(SQLJobSubmission job) throws IOException {
.create(commands.toArray(new String[0]))
.setStdInputs(job.getSqlLines().toArray(new String[0]))
.setStdoutProcessor(LOG::info) // logging the SQL statements and error message
.runBlocking();
.runBlocking(timeout);
}

public void performJarOperation(JarOperation operation) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.util.ExternalResource;

import java.io.IOException;
import java.time.Duration;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -38,7 +39,7 @@ public interface FlinkResource extends ExternalResource {
* <p>The exact constellation of the cluster is undefined.
*
* <p>In the case of per-job clusters this method may not start any Flink processes, deferring this to
* {@link ClusterController#submitJob(JobSubmission)}.
* {@link ClusterController#submitJob(JobSubmission, Duration)}.
*
* @return controller for interacting with the cluster
* @throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -185,15 +186,15 @@ private static class StandaloneClusterController implements ClusterController {
}

@Override
public JobController submitJob(JobSubmission job) throws IOException {
final JobID run = distribution.submitJob(job);
public JobController submitJob(JobSubmission job, Duration timeout) throws IOException {
final JobID run = distribution.submitJob(job, timeout);

return new StandaloneJobController(run);
}

@Override
public void submitSQLJob(SQLJobSubmission job) throws IOException {
distribution.submitSQLJob(job);
public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOException {
distribution.submitSQLJob(job, timeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,12 @@ private List<String> initializeSqlLines(Map<String, String> vars) throws IOExcep

private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines) throws IOException {
LOG.info("Executing SQL: HBase source table -> HBase sink table");
clusterController.submitSQLJob(new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
.addJar(sqlToolBoxJar)
.addJar(sqlConnectorHBaseJar)
.addJars(hadoopClasspathJars)
.build());
clusterController.submitSQLJob(
new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
.addJar(sqlToolBoxJar)
.addJar(sqlConnectorHBaseJar)
.addJars(hadoopClasspathJars)
.build(),
Duration.ofMinutes(2L));
}
}

0 comments on commit 6c18317

Please sign in to comment.