Skip to content

Commit

Permalink
[FLINK-21466][sql-client] Add unit tests for SqlClient
Browse files Browse the repository at this point in the history
This closes apache#15255
  • Loading branch information
wuchong committed Mar 20, 2021
1 parent 7fad74d commit baa2723
Show file tree
Hide file tree
Showing 14 changed files with 356 additions and 19 deletions.
2 changes: 1 addition & 1 deletion docs/content.zh/docs/dev/table/sql/gettingStarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Flink SQL 使得使用标准 SQL 开发流应用程序变的简单。如果你
在安装文件夹中运行 `sql-client` 脚本来启动 SQL 客户端。

```bash
./bin/sql-client.sh embedded
./bin/sql-client.sh
```

### Hello World
Expand Down
10 changes: 8 additions & 2 deletions docs/content.zh/docs/dev/table/sqlClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ SQL Client 脚本也位于 Flink 的 bin 目录中。[将来](sqlClient.html#lim
./bin/sql-client.sh
```

或者显式使用 `embedded` 模式:

```bash
./bin/sql-client.sh embedded
```

默认情况下,SQL 客户端将从 `./conf/sql-client-defaults.yaml` 中读取配置。有关环境配置文件结构的更多信息,请参见[配置部分](sqlClient.html#environment-files)

### 执行 SQL 查询
Expand Down Expand Up @@ -160,9 +166,9 @@ SQL 客户端启动时可以添加 CLI 选项,具体如下。
```text
./bin/sql-client.sh --help
Mode "embedded" submits Flink jobs from the local machine.
Mode "embedded" (default) submits Flink jobs from the local machine.
Syntax: embedded [OPTIONS]
Syntax: [embedded] [OPTIONS]
"embedded" mode options:
-d,--defaults <environment file> The environment properties with which
every new session is initialized.
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/dev/table/sql/gettingStarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ The [SQL Client]({{< ref "docs/dev/table/sqlClient" >}}) is an interactive clien
To start the SQL client, run the `sql-client` script from the installation folder.

```bash
./bin/sql-client.sh embedded
./bin/sql-client.sh
```

### Hello World
Expand Down
12 changes: 9 additions & 3 deletions docs/content/docs/dev/table/sqlClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ The SQL Client scripts are also located in the binary directory of Flink. [In th
./bin/sql-client.sh
```

or explicitly use `embedded` mode:

```bash
./bin/sql-client.sh embedded
```

By default, the SQL Client will read its configuration from the environment file located in `./conf/sql-client-defaults.yaml`. See the [configuration part](sqlClient.html#environment-files) for more information about the structure of environment files.

### Running SQL Queries
Expand Down Expand Up @@ -157,11 +163,11 @@ Configuration
The SQL Client can be started with the following optional CLI commands. They are discussed in detail in the subsequent paragraphs.

```text
./bin/sql-client.sh embedded --help
./bin/sql-client.sh --help
Mode "embedded" submits Flink jobs from the local machine.
Mode "embedded" (default) submits Flink jobs from the local machine.
Syntax: embedded [OPTIONS]
Syntax: [embedded] [OPTIONS]
"embedded" mode options:
-d,--defaults <environment file> The environment properties with which
every new session is initialized.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ public void submitSQLJob(SQLJobSubmission job) throws IOException, InterruptedEx
copyFileToContainer(MountableFile.forHostPath(script), "/tmp/script.sql");
commands.add("cat /tmp/script.sql | ");
commands.add(FLINK_BIN + "/sql-client.sh");
commands.add("embedded");
job.getDefaultEnvFile()
.ifPresent(
defaultEnvFile -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ public JobID submitJob(final JobSubmission jobSubmission, Duration timeout) thro
public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOException {
final List<String> commands = new ArrayList<>();
commands.add(bin.resolve("sql-client.sh").toAbsolutePath().toString());
commands.add("embedded");
job.getDefaultEnvFile()
.ifPresent(
defaultEnvFile -> {
Expand Down
2 changes: 1 addition & 1 deletion flink-end-to-end-tests/test-scripts/test_sql_client.sh
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ INSERT INTO ElasticsearchAppendSinkTable
EOF
)

JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
JOB_ID=$($FLINK_DIR/bin/sql-client.sh \
--jar $KAFKA_SQL_JAR \
--jar $ELASTICSEARCH_SQL_JAR \
--jar $SQL_TOOLBOX_JAR \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
* <p>- In future versions: In gateway mode, the SQL CLI client connects to the REST API of the
* gateway and allows for managing queries via console.
*
* <p>For debugging in an IDE you can execute the main method of this class using: "embedded
* --defaults /path/to/sql-client-defaults.yaml --jar /path/to/target/flink-sql-client-*.jar"
* <p>For debugging in an IDE you can execute the main method of this class using: "--defaults
* /path/to/sql-client-defaults.yaml --jar /path/to/target/flink-sql-client-*.jar"
*
* <p>Make sure that the FLINK_CONF_DIR environment variable is set.
*/
Expand Down Expand Up @@ -125,20 +125,20 @@ private void openCli(String sessionId, Executor executor) {
// --------------------------------------------------------------------------------------------

public static void main(String[] args) {
final String model;
final String mode;
final String[] modeArgs;
if (args.length < 1 || args[0].startsWith("-")) {
// mode is not specified, use the default `embedded` mode
model = MODE_EMBEDDED;
mode = MODE_EMBEDDED;
modeArgs = args;
} else {
// mode is specified, extract the mode value and reaming args
model = args[0];
mode = args[0];
// remove mode
modeArgs = Arrays.copyOfRange(args, 1, args.length);
}

switch (model) {
switch (mode) {
case MODE_EMBEDDED:
final CliOptions options = CliOptionsParser.parseEmbeddedModeClient(modeArgs);
if (options.isPrintHelp()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.client.SqlClient;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.cli.SqlCommandParser.SqlCommandCall;
import org.apache.flink.table.client.config.YamlConfigUtils;
Expand Down Expand Up @@ -684,9 +685,23 @@ private void printInfo(String message) {

// --------------------------------------------------------------------------------------------

/**
* Internal flag to use {@link System#in} and {@link System#out} stream to construct {@link
* Terminal} for tests. This allows tests can easily mock input stream when startup {@link
* SqlClient}.
*/
protected static boolean useSystemInOutStream = false;

private static Terminal createDefaultTerminal() {
try {
return TerminalBuilder.builder().name(CliStrings.CLI_NAME).build();
if (useSystemInOutStream) {
return TerminalBuilder.builder()
.name(CliStrings.CLI_NAME)
.streams(System.in, System.out)
.build();
} else {
return TerminalBuilder.builder().name(CliStrings.CLI_NAME).build();
}
} catch (IOException e) {
throw new SqlClientException("Error opening command line interface.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,9 @@ public static void printHelpEmbeddedModeClient() {
formatter.setLeftPadding(5);
formatter.setWidth(80);

System.out.println("\nMode \"embedded\" submits Flink jobs from the local machine.");
System.out.println("\n Syntax: embedded [OPTIONS]");
System.out.println(
"\nMode \"embedded\" (default) submits Flink jobs from the local machine.");
System.out.println("\n Syntax: [embedded] [OPTIONS]");
formatter.setSyntaxPrefix(" \"embedded\" mode options:");
formatter.printHelp(" ", EMBEDDED_MODE_CLIENT_OPTIONS);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.client;

import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.table.client.cli.TerminalStreamsResource;
import org.apache.flink.util.FileUtils;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

/** Tests for {@link SqlClient}. */
public class SqlClientTest {

@Rule public ExpectedException thrown = ExpectedException.none();

@Rule public TemporaryFolder tempFolder = new TemporaryFolder();

@Rule public final TerminalStreamsResource useSystemStream = TerminalStreamsResource.INSTANCE;

private PrintStream originalPrintStream;

private InputStream originalInputStream;

private ByteArrayOutputStream testOutputStream;

private Map<String, String> originalEnv;

private String historyPath;

@Before
public void before() throws IOException {
originalEnv = System.getenv();
originalPrintStream = System.out;
originalInputStream = System.in;
testOutputStream = new ByteArrayOutputStream();
System.setOut(new PrintStream(testOutputStream, true));
// send "QUIT;" command to gracefully shutdown the terminal
System.setIn(new ByteArrayInputStream("QUIT;".getBytes(StandardCharsets.UTF_8)));

// prepare conf dir
File confFolder = tempFolder.newFolder("conf");
File confYaml = new File(confFolder, "flink-conf.yaml");
if (!confYaml.createNewFile()) {
throw new IOException("Can't create testing flink-conf.yaml file.");
}

// adjust the test environment for the purposes of this test
Map<String, String> map = new HashMap<>(System.getenv());
map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath());
CommonTestUtils.setEnv(map);

historyPath = tempFolder.newFile("history").toString();
}

@After
public void after() throws InterruptedException {
System.setOut(originalPrintStream);
System.setIn(originalInputStream);
CommonTestUtils.setEnv(originalEnv);
}

private String getStdoutString() {
return testOutputStream.toString();
}

@Test(timeout = 20000)
public void testEmbeddedWithOptions() throws InterruptedException {
String[] args = new String[] {"embedded", "-hist", historyPath};
SqlClient.main(args);
assertThat(getStdoutString(), containsString("Command history file path: " + historyPath));
}

@Test(timeout = 20000)
public void testEmbeddedWithLongOptions() throws InterruptedException {
String[] args = new String[] {"embedded", "--history", historyPath};
SqlClient.main(args);
assertThat(getStdoutString(), containsString("Command history file path: " + historyPath));
}

@Test(timeout = 20000)
public void testEmbeddedWithoutOptions() throws InterruptedException {
String[] args = new String[] {"embedded"};
SqlClient.main(args);
assertThat(getStdoutString(), containsString("Command history file path"));
}

@Test(timeout = 20000)
public void testEmptyOptions() throws InterruptedException {
String[] args = new String[] {};
SqlClient.main(args);
assertThat(getStdoutString(), containsString("Command history file path"));
}

@Test(timeout = 20000)
public void testUnsupportedGatewayMode() throws Exception {
String[] args = new String[] {"gateway"};
thrown.expect(SqlClientException.class);
thrown.expectMessage("Gateway mode is not supported yet.");
SqlClient.main(args);
}

@Test(timeout = 20000)
public void testPrintHelpForUnknownMode() throws IOException {
String[] args = new String[] {"unknown"};
SqlClient.main(args);
final URL url = getClass().getClassLoader().getResource("sql-client-help.out");
Objects.requireNonNull(url);
final String help = FileUtils.readFileUtf8(new File(url.getFile()));
assertEquals(help, getStdoutString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.client.cli;

import org.junit.rules.ExternalResource;

/**
* Enables {@link org.apache.flink.table.client.SqlClient} to create a default terminal using {@link
* System#in} and {@link System#out} as the input and output stream. This can allows tests to easily
* mock input stream of the SqlClient by hijacking the standard stream.
*/
public class TerminalStreamsResource extends ExternalResource {

public static final TerminalStreamsResource INSTANCE = new TerminalStreamsResource();

private TerminalStreamsResource() {
// singleton
}

@Override
protected void before() throws Throwable {
CliClient.useSystemInOutStream = true;
}

@Override
protected void after() {
CliClient.useSystemInOutStream = false;
}
}
Loading

0 comments on commit baa2723

Please sign in to comment.