diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java index 797bca6859c9c..76039f8e3e635 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java @@ -512,21 +512,35 @@ private void callSelect(SqlCommandCall cmdCall) { printExecutionException(e); return; } - final CliResultView view; - if (resultDesc.isMaterialized()) { - view = new CliTableResultView(this, resultDesc); + + if (resultDesc.isTableauMode()) { + try (CliTableauResultView tableauResultView = new CliTableauResultView( + terminal, executor, sessionId, resultDesc)) { + if (resultDesc.isMaterialized()) { + tableauResultView.displayBatchResults(); + } else { + tableauResultView.displayStreamResults(); + } + } catch (SqlExecutionException e) { + printExecutionException(e); + } } else { - view = new CliChangelogResultView(this, resultDesc); - } + final CliResultView view; + if (resultDesc.isMaterialized()) { + view = new CliTableResultView(this, resultDesc); + } else { + view = new CliChangelogResultView(this, resultDesc); + } - // enter view - try { - view.open(); + // enter view + try { + view.open(); - // view left - printInfo(CliStrings.MESSAGE_RESULT_QUIT); - } catch (SqlExecutionException e) { - printExecutionException(e); + // view left + printInfo(CliStrings.MESSAGE_RESULT_QUIT); + } catch (SqlExecutionException e) { + printExecutionException(e); + } } } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java new file mode 100644 index 0000000000000..e82a026b1d839 --- /dev/null +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.client.gateway.Executor; +import org.apache.flink.table.client.gateway.ResultDescriptor; +import org.apache.flink.table.client.gateway.SqlExecutionException; +import org.apache.flink.table.client.gateway.TypedResult; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.types.Row; + +import org.apache.commons.lang3.StringUtils; +import org.jline.terminal.Terminal; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +import static org.apache.flink.table.client.cli.CliUtils.rowToString; + +/** + * Print result in tableau mode. + */ +public class CliTableauResultView implements AutoCloseable { + + private static final int NULL_COLUMN_WIDTH = CliStrings.NULL_COLUMN.length(); + private static final int MAX_COLUMN_WIDTH = 30; + private static final int DEFAULT_COLUMN_WIDTH = 20; + private static final String COLUMN_TRUNCATED_FLAG = "..."; + private static final String CHANGEFLAG_COLUMN_NAME = "+/-"; + + private final Terminal terminal; + private final Executor sqlExecutor; + private final String sessionId; + private final ResultDescriptor resultDescriptor; + private final ExecutorService displayResultExecutorService; + + public CliTableauResultView( + final Terminal terminal, + final Executor sqlExecutor, + final String sessionId, + final ResultDescriptor resultDescriptor) { + this.terminal = terminal; + this.sqlExecutor = sqlExecutor; + this.sessionId = sessionId; + this.resultDescriptor = resultDescriptor; + this.displayResultExecutorService = Executors.newSingleThreadExecutor(); + } + + public void displayStreamResults() throws SqlExecutionException { + final AtomicInteger receivedRowCount = new AtomicInteger(0); + Future resultFuture = displayResultExecutorService.submit(() -> { + printStreamResults(receivedRowCount); + }); + + // capture CTRL-C + terminal.handle(Terminal.Signal.INT, signal -> { + resultFuture.cancel(true); + }); + + boolean cleanUpQuery = true; + try { + resultFuture.get(); + cleanUpQuery = false; // job finished successfully + } catch (CancellationException e) { + terminal.writer().println("Query terminated, received a total of " + receivedRowCount.get() + " rows"); + terminal.flush(); + } catch (ExecutionException e) { + if (e.getCause() instanceof SqlExecutionException) { + throw (SqlExecutionException) e.getCause(); + } + throw new SqlExecutionException("unknown exception", e.getCause()); + } catch (InterruptedException e) { + throw new SqlExecutionException("Query interrupted", e); + } finally { + checkAndCleanUpQuery(cleanUpQuery); + } + } + + public void displayBatchResults() throws SqlExecutionException { + Future resultFuture = displayResultExecutorService.submit(() -> { + final List resultRows = waitBatchResults(); + printBatchResults(resultRows); + }); + + // capture CTRL-C + terminal.handle(Terminal.Signal.INT, signal -> { + resultFuture.cancel(true); + }); + + boolean cleanUpQuery = true; + try { + resultFuture.get(); + cleanUpQuery = false; // job finished successfully + } catch (CancellationException e) { + terminal.writer().println("Query terminated"); + terminal.flush(); + } catch (ExecutionException e) { + if (e.getCause() instanceof SqlExecutionException) { + throw (SqlExecutionException) e.getCause(); + } + throw new SqlExecutionException("unknown exception", e.getCause()); + } catch (InterruptedException e) { + throw new SqlExecutionException("Query interrupted", e); + } finally { + checkAndCleanUpQuery(cleanUpQuery); + } + } + + @Override + public void close() { + this.displayResultExecutorService.shutdown(); + } + + private void checkAndCleanUpQuery(boolean cleanUpQuery) { + if (cleanUpQuery) { + try { + sqlExecutor.cancelQuery(sessionId, resultDescriptor.getResultId()); + } catch (SqlExecutionException e) { + // ignore further exceptions + } + } + } + + private List waitBatchResults() { + List resultRows; + // take snapshot and make all results in one page + do { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + TypedResult result = sqlExecutor.snapshotResult( + sessionId, + resultDescriptor.getResultId(), + Integer.MAX_VALUE); + + if (result.getType() == TypedResult.ResultType.EOS) { + resultRows = Collections.emptyList(); + break; + } else if (result.getType() == TypedResult.ResultType.PAYLOAD) { + resultRows = sqlExecutor.retrieveResultPage(resultDescriptor.getResultId(), 1); + break; + } else { + // result not retrieved yet + } + } while (true); + + return resultRows; + } + + private void printStreamResults(AtomicInteger receivedRowCount) { + List columns = resultDescriptor.getResultSchema().getTableColumns(); + String[] fieldNames = + Stream.concat( + Stream.of("+/-"), + columns.stream().map(TableColumn::getName) + ).toArray(String[]::new); + + int[] colWidths = columnWidthsByType(columns, true); + String borderline = genBorderLine(colWidths); + + // print filed names + terminal.writer().println(borderline); + printSingleRow(colWidths, fieldNames); + terminal.writer().println(borderline); + terminal.flush(); + + while (true) { + final TypedResult>> result = + sqlExecutor.retrieveResultChanges(sessionId, resultDescriptor.getResultId()); + + switch (result.getType()) { + case EMPTY: + // do nothing + break; + case EOS: + if (receivedRowCount.get() > 0) { + terminal.writer().println(borderline); + } + terminal.writer().println("Received a total of " + receivedRowCount.get() + " rows"); + terminal.flush(); + return; + case PAYLOAD: + List> changes = result.getPayload(); + for (Tuple2 change : changes) { + final String[] cols = rowToString(change.f1); + String[] row = new String[cols.length + 1]; + row[0] = change.f0 ? "+" : "-"; + System.arraycopy(cols, 0, row, 1, cols.length); + printSingleRow(colWidths, row); + receivedRowCount.incrementAndGet(); + } + break; + default: + throw new SqlExecutionException("Unknown result type: " + result.getType()); + } + } + } + + private void printBatchResults(List resultRows) { + List rows = new ArrayList<>(resultRows.size() + 1); + + // fill field names first + List columns = resultDescriptor.getResultSchema().getTableColumns(); + rows.add(columns.stream().map(TableColumn::getName).toArray(String[]::new)); + resultRows.forEach(row -> rows.add(rowToString(row))); + + int[] colWidths = columnWidthsByContent(columns, rows); + String borderline = genBorderLine(colWidths); + + // print field names + terminal.writer().println(borderline); + printSingleRow(colWidths, rows.get(0)); + terminal.writer().println(borderline); + + // print content + rows.subList(1, rows.size()).forEach(row -> printSingleRow(colWidths, row)); + if (!resultRows.isEmpty()) { + terminal.writer().println(borderline); + } + + // print footer + terminal.writer().println(resultRows.size() + " row in set"); + terminal.flush(); + } + + private String genBorderLine(int[] colWidths) { + StringBuilder sb = new StringBuilder(); + sb.append("+"); + for (int width : colWidths) { + sb.append(StringUtils.repeat('-', width + 1)); + sb.append("-+"); + } + return sb.toString(); + } + + private void printSingleRow(int[] colWidths, String[] cols) { + StringBuilder sb = new StringBuilder(); + sb.append("|"); + int idx = 0; + for (String col : cols) { + sb.append(" "); + if (col.length() <= colWidths[idx]) { + sb.append(StringUtils.repeat(' ', colWidths[idx] - col.length())); + sb.append(col); + } else { + sb.append(col, 0, colWidths[idx] - COLUMN_TRUNCATED_FLAG.length()); + sb.append(COLUMN_TRUNCATED_FLAG); + } + sb.append(" |"); + idx++; + } + terminal.writer().println(sb.toString()); + terminal.flush(); + } + + /** + * Try to infer column width based on column types. In streaming case, we will have an + * endless result set, thus couldn't determine column widths based on column values. + */ + private int[] columnWidthsByType(List columns, boolean includeChangeflag) { + // fill width with field names first + int[] colWidths = columns.stream() + .mapToInt(col -> col.getName().length()) + .toArray(); + + // determine proper column width based on types + for (int i = 0; i < columns.size(); ++i) { + LogicalType type = columns.get(i).getType().getLogicalType(); + int len; + switch (type.getTypeRoot()) { + case TINYINT: + len = TinyIntType.PRECISION + 1; // extra for negative value + break; + case SMALLINT: + len = SmallIntType.PRECISION + 1; // extra for negative value + break; + case INTEGER: + len = IntType.PRECISION + 1; // extra for negative value + break; + case BIGINT: + len = BigIntType.PRECISION + 1; // extra for negative value + break; + case DECIMAL: + len = ((DecimalType) type).getPrecision() + 2; // extra for negative value and decimal point + break; + case BOOLEAN: + len = 5; // "true" or "false" + break; + case DATE: + len = 10; // e.g. 9999-12-31 + break; + case TIME_WITHOUT_TIME_ZONE: + int precision = ((TimeType) type).getPrecision(); + len = precision == 0 ? 8 : precision + 9; // 23:59:59[.999999999] + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + precision = ((TimestampType) type).getPrecision(); + len = timestampTypeColumnWidth(precision); + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + precision = ((LocalZonedTimestampType) type).getPrecision(); + len = timestampTypeColumnWidth(precision); + break; + default: + len = DEFAULT_COLUMN_WIDTH; + } + + // adjust column width with potential null values + colWidths[i] = Math.max(colWidths[i], Math.max(len, NULL_COLUMN_WIDTH)); + } + + // add an extra column for change flag if necessary + if (includeChangeflag) { + int[] ret = new int[columns.size() + 1]; + ret[0] = CHANGEFLAG_COLUMN_NAME.length(); + System.arraycopy(colWidths, 0, ret, 1, columns.size()); + return ret; + } else { + return colWidths; + } + } + + /** + * Here we consider two popular class for timestamp: LocalDateTime and java.sql.Timestamp. + * + *

According to LocalDateTime's comment, the string output will be one of the following + * ISO-8601 formats: + *

  • {@code uuuu-MM-dd'T'HH:mm:ss}
  • + *
  • {@code uuuu-MM-dd'T'HH:mm:ss.SSS}
  • + *
  • {@code uuuu-MM-dd'T'HH:mm:ss.SSSSSS}
  • + *
  • {@code uuuu-MM-dd'T'HH:mm:ss.SSSSSSSSS}
  • + * + *

    And for java.sql.Timestamp, the number of digits after point will be precision except + * when precision is 0. In that case, the format would be 'uuuu-MM-dd HH:mm:ss.0' + */ + int timestampTypeColumnWidth(int precision) { + int base = 19; // length of uuuu-MM-dd HH:mm:ss + if (precision == 0) { + return base + 2; // consider java.sql.Timestamp + } else if (precision <= 3) { + return base + 4; + } else if (precision <= 6) { + return base + 7; + } else { + return base + 10; + } + } + + private int[] columnWidthsByContent(List columns, List rows) { + // fill width with field names first + int[] colWidths = columns.stream().mapToInt(col -> col.getName().length()).toArray(); + + // fill column width with real data + for (String[] row : rows) { + for (int i = 0; i < row.length; ++i) { + colWidths[i] = Math.max(colWidths[i], row[i].length()); + } + } + + // adjust column width with maximum length + for (int i = 0; i < colWidths.length; ++i) { + colWidths[i] = Math.min(colWidths[i], MAX_COLUMN_WIDTH); + } + + return colWidths; + } + +} diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java index a65642bb89b5e..6e60c9594ab66 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java @@ -84,6 +84,8 @@ public class ExecutionEntry extends ConfigEntry { private static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table"; + private static final String EXECUTION_RESULT_MODE_VALUE_TABLEAU = "tableau"; + private static final String EXECUTION_MAX_TABLE_RESULT_ROWS = "max-table-result-rows"; private static final String EXECUTION_RESTART_STRATEGY_TYPE = "restart-strategy.type"; @@ -322,6 +324,12 @@ public boolean isTableMode() { .orElse(false); } + public boolean isTableauMode() { + return properties.getOptionalString(EXECUTION_RESULT_MODE) + .map((v) -> v.equals(EXECUTION_RESULT_MODE_VALUE_TABLEAU)) + .orElse(false); + } + public Map asTopLevelMap() { return properties.asPrefixedMap(EXECUTION_ENTRY + '.'); } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java index 3cfaa6f882d6e..265e5c482561e 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java @@ -31,10 +31,17 @@ public class ResultDescriptor { private final boolean isMaterialized; - public ResultDescriptor(String resultId, TableSchema resultSchema, boolean isMaterialized) { + private final boolean isTableauMode; + + public ResultDescriptor( + String resultId, + TableSchema resultSchema, + boolean isMaterialized, + boolean isTableauMode) { this.resultId = resultId; this.resultSchema = resultSchema; this.isMaterialized = isMaterialized; + this.isTableauMode = isTableauMode; } public String getResultId() { @@ -48,4 +55,8 @@ public TableSchema getResultSchema() { public boolean isMaterialized() { return isMaterialized; } + + public boolean isTableauMode() { + return isTableauMode; + } } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index b1445cbd9d5da..3035d91acf0b0 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -663,7 +663,8 @@ private ResultDescriptor executeQueryInternal(String sessionId, ExecutionCon return new ResultDescriptor( resultId, removeTimeAttributes(table.getSchema()), - result.isMaterialized()); + result.isMaterialized(), + context.getEnvironment().getExecution().isTableauMode()); } /** diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java index 2cea4aa797386..476296f5a305b 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java @@ -67,7 +67,7 @@ public DynamicResult createResult( final InetAddress gatewayAddress = getGatewayAddress(env.getDeployment()); final int gatewayPort = getGatewayPort(env.getDeployment()); - if (env.getExecution().isChangelogMode()) { + if (env.getExecution().isChangelogMode() || env.getExecution().isTableauMode()) { return new ChangelogCollectStreamResult<>( schema, config, @@ -86,10 +86,12 @@ public DynamicResult createResult( } else { // Batch Execution - if (!env.getExecution().isTableMode()) { - throw new SqlExecutionException("Results of batch queries can only be served in table mode."); + if (env.getExecution().isTableMode() || env.getExecution().isTableauMode()) { + return new MaterializedCollectBatchResult<>(schema, config, classLoader); + } else { + throw new SqlExecutionException( + "Results of batch queries can only be served in table or tableau mode."); } - return new MaterializedCollectBatchResult<>(schema, config, classLoader); } } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java index 29ac7a1bb038d..7d5bca0dc9aa2 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java @@ -84,9 +84,10 @@ private void testResultViewClearResult(TypedResult typedResult, boolean isTab final MockExecutor executor = new MockExecutor(typedResult, cancellationCounterLatch); String sessionId = executor.openSession(session); final ResultDescriptor descriptor = new ResultDescriptor( - "result-id", - TableSchema.builder().field("Null Field", Types.STRING()).build(), - false); + "result-id", + TableSchema.builder().field("Null Field", Types.STRING()).build(), + false, + false); Thread resultViewRunner = null; CliClient cli = null; diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java new file mode 100644 index 0000000000000..3ab8ae3bf9dff --- /dev/null +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.client.cli.utils.TerminalUtils; +import org.apache.flink.table.client.gateway.Executor; +import org.apache.flink.table.client.gateway.ResultDescriptor; +import org.apache.flink.table.client.gateway.SqlExecutionException; +import org.apache.flink.table.client.gateway.TypedResult; +import org.apache.flink.types.Row; + +import org.jline.terminal.Terminal; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for CliTableauResultView. + */ +public class CliTableauResultViewTest { + + private ByteArrayOutputStream terminalOutput; + private Terminal terminal; + private TableSchema schema; + private List data; + private List> streamingData; + + @Before + public void setUp() { + terminalOutput = new ByteArrayOutputStream(); + terminal = TerminalUtils.createDummyTerminal(terminalOutput); + + schema = TableSchema.builder() + .field("boolean", DataTypes.BOOLEAN()) + .field("int", DataTypes.INT()) + .field("bigint", DataTypes.BIGINT()) + .field("varchar", DataTypes.STRING()) + .field("decimal(10, 5)", DataTypes.DECIMAL(10, 5)) + .field("timestamp", DataTypes.TIMESTAMP(6)) + .build(); + + data = new ArrayList<>(); + data.add( + Row.of( + null, + 1, + 2, + "abc", + BigDecimal.valueOf(1.23), + Timestamp.valueOf("2020-03-01 18:39:14")) + ); + data.add( + Row.of( + false, + null, + 0, + "", + BigDecimal.valueOf(1), + Timestamp.valueOf("2020-03-01 18:39:14.1")) + ); + data.add( + Row.of( + true, + Integer.MAX_VALUE, + null, + "abcdefg", + BigDecimal.valueOf(1234567890), + Timestamp.valueOf("2020-03-01 18:39:14.12")) + ); + data.add( + Row.of( + false, + Integer.MIN_VALUE, + Long.MAX_VALUE, + null, + BigDecimal.valueOf(12345.06789), + Timestamp.valueOf("2020-03-01 18:39:14.123")) + ); + data.add( + Row.of( + true, + 100, + Long.MIN_VALUE, + "abcdefg111", + null, + Timestamp.valueOf("2020-03-01 18:39:14.123456")) + ); + data.add( + Row.of( + null, + -1, + -1, + "abcdefghijklmnopqrstuvwxyz", + BigDecimal.valueOf(-12345.06789), + null) + ); + + streamingData = new ArrayList<>(); + for (int i = 0; i < data.size(); ++i) { + streamingData.add(new Tuple2<>(i % 2 == 0, data.get(i))); + } + } + + @Test + public void testBatchResult() { + ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, true); + Executor mockExecutor = mock(Executor.class); + CliTableauResultView view = new CliTableauResultView( + terminal, mockExecutor, "session", resultDescriptor); + + when(mockExecutor.snapshotResult(anyString(), anyString(), anyInt())) + .thenReturn(TypedResult.payload(1)) + .thenReturn(TypedResult.endOfStream()); + when(mockExecutor.retrieveResultPage(anyString(), anyInt())) + .thenReturn(data); + + view.displayBatchResults(); + view.close(); + + Assert.assertEquals( + "+---------+-------------+----------------------+----------------------------+----------------+----------------------------+\n" + + "| boolean | int | bigint | varchar | decimal(10, 5) | timestamp |\n" + + "+---------+-------------+----------------------+----------------------------+----------------+----------------------------+\n" + + "| (NULL) | 1 | 2 | abc | 1.23 | 2020-03-01 18:39:14.0 |\n" + + "| false | (NULL) | 0 | | 1 | 2020-03-01 18:39:14.1 |\n" + + "| true | 2147483647 | (NULL) | abcdefg | 1234567890 | 2020-03-01 18:39:14.12 |\n" + + "| false | -2147483648 | 9223372036854775807 | (NULL) | 12345.06789 | 2020-03-01 18:39:14.123 |\n" + + "| true | 100 | -9223372036854775808 | abcdefg111 | (NULL) | 2020-03-01 18:39:14.123456 |\n" + + "| (NULL) | -1 | -1 | abcdefghijklmnopqrstuvwxyz | -12345.06789 | (NULL) |\n" + + "+---------+-------------+----------------------+----------------------------+----------------+----------------------------+\n" + + "6 row in set\n", + terminalOutput.toString()); + verify(mockExecutor, times(0)).cancelQuery(anyString(), anyString()); + } + + @Test + public void testCancelBatchResult() throws InterruptedException, ExecutionException, TimeoutException { + ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, true); + Executor mockExecutor = mock(Executor.class); + CliTableauResultView view = new CliTableauResultView( + terminal, mockExecutor, "session", resultDescriptor); + + when(mockExecutor.snapshotResult(anyString(), anyString(), anyInt())) + .thenReturn(TypedResult.empty()); + + // submit result display in another thread + ExecutorService executorService = Executors.newSingleThreadExecutor(); + Future furture = executorService.submit(view::displayBatchResults); + + // wait until we trying to get batch result + verify(mockExecutor, timeout(5000).atLeast(1)).snapshotResult(anyString(), anyString(), anyInt()); + + // send signal to cancel + terminal.raise(Terminal.Signal.INT); + furture.get(5, TimeUnit.SECONDS); + + Assert.assertEquals("Query terminated\n", terminalOutput.toString()); + // didn't have a chance to read page + verify(mockExecutor, times(0)).retrieveResultPage(anyString(), anyInt()); + // tried to cancel query + verify(mockExecutor, times(1)).cancelQuery(anyString(), anyString()); + + view.close(); + } + + @Test + public void testEmptyBatchResult() { + ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, true); + Executor mockExecutor = mock(Executor.class); + CliTableauResultView view = new CliTableauResultView( + terminal, mockExecutor, "session", resultDescriptor); + + when(mockExecutor.snapshotResult(anyString(), anyString(), anyInt())) + .thenReturn(TypedResult.payload(1)) + .thenReturn(TypedResult.endOfStream()); + when(mockExecutor.retrieveResultPage(anyString(), anyInt())) + .thenReturn(Collections.emptyList()); + + view.displayBatchResults(); + view.close(); + + Assert.assertEquals( + "+---------+-----+--------+---------+----------------+-----------+\n" + + "| boolean | int | bigint | varchar | decimal(10, 5) | timestamp |\n" + + "+---------+-----+--------+---------+----------------+-----------+\n" + + "0 row in set\n", + terminalOutput.toString()); + verify(mockExecutor, times(0)).cancelQuery(anyString(), anyString()); + } + + @Test + public void testFailedBatchResult() { + ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, true); + Executor mockExecutor = mock(Executor.class); + CliTableauResultView view = new CliTableauResultView( + terminal, mockExecutor, "session", resultDescriptor); + + when(mockExecutor.snapshotResult(anyString(), anyString(), anyInt())) + .thenReturn(TypedResult.payload(1)) + .thenReturn(TypedResult.endOfStream()); + when(mockExecutor.retrieveResultPage(anyString(), anyInt())) + .thenThrow(new SqlExecutionException("query failed")); + + try { + view.displayBatchResults(); + Assert.fail("Shouldn't get here"); + } catch (SqlExecutionException e) { + Assert.assertEquals("query failed", e.getMessage()); + } + view.close(); + + verify(mockExecutor, times(1)).cancelQuery(anyString(), anyString()); + } + + @Test + public void testStreamingResult() { + ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, true); + Executor mockExecutor = mock(Executor.class); + CliTableauResultView view = new CliTableauResultView( + terminal, mockExecutor, "session", resultDescriptor); + + when(mockExecutor.retrieveResultChanges(anyString(), anyString())) + .thenReturn(TypedResult.payload(streamingData.subList(0, streamingData.size() / 2))) + .thenReturn(TypedResult.payload(streamingData.subList(streamingData.size() / 2, streamingData.size()))) + .thenReturn(TypedResult.endOfStream()); + + view.displayStreamResults(); + view.close(); + + Assert.assertEquals( + "+-----+---------+-------------+----------------------+----------------------+----------------+----------------------------+\n" + + "| +/- | boolean | int | bigint | varchar | decimal(10, 5) | timestamp |\n" + + "+-----+---------+-------------+----------------------+----------------------+----------------+----------------------------+\n" + + "| + | (NULL) | 1 | 2 | abc | 1.23 | 2020-03-01 18:39:14.0 |\n" + + "| - | false | (NULL) | 0 | | 1 | 2020-03-01 18:39:14.1 |\n" + + "| + | true | 2147483647 | (NULL) | abcdefg | 1234567890 | 2020-03-01 18:39:14.12 |\n" + + "| - | false | -2147483648 | 9223372036854775807 | (NULL) | 12345.06789 | 2020-03-01 18:39:14.123 |\n" + + "| + | true | 100 | -9223372036854775808 | abcdefg111 | (NULL) | 2020-03-01 18:39:14.123456 |\n" + + "| - | (NULL) | -1 | -1 | abcdefghijklmnopq... | -12345.06789 | (NULL) |\n" + + "+-----+---------+-------------+----------------------+----------------------+----------------+----------------------------+\n" + + "Received a total of 6 rows\n", + terminalOutput.toString()); + verify(mockExecutor, times(0)).cancelQuery(anyString(), anyString()); + } + + @Test + public void testEmptyStreamingResult() { + ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, true); + Executor mockExecutor = mock(Executor.class); + CliTableauResultView view = new CliTableauResultView( + terminal, mockExecutor, "session", resultDescriptor); + + when(mockExecutor.retrieveResultChanges(anyString(), anyString())) + .thenReturn(TypedResult.endOfStream()); + + view.displayStreamResults(); + view.close(); + + Assert.assertEquals( + "+-----+---------+-------------+----------------------+----------------------+----------------+----------------------------+\n" + + "| +/- | boolean | int | bigint | varchar | decimal(10, 5) | timestamp |\n" + + "+-----+---------+-------------+----------------------+----------------------+----------------+----------------------------+\n" + + "Received a total of 0 rows\n", + terminalOutput.toString()); + verify(mockExecutor, times(0)).cancelQuery(anyString(), anyString()); + } + + @Test + public void testCancelStreamingResult() throws InterruptedException, ExecutionException, TimeoutException { + ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, true); + Executor mockExecutor = mock(Executor.class); + CliTableauResultView view = new CliTableauResultView( + terminal, mockExecutor, "session", resultDescriptor); + + when(mockExecutor.retrieveResultChanges(anyString(), anyString())) + .thenReturn(TypedResult.payload(streamingData.subList(0, streamingData.size() / 2))) + .thenReturn(TypedResult.empty()); + + // submit result display in another thread + ExecutorService executorService = Executors.newSingleThreadExecutor(); + Future furture = executorService.submit(view::displayStreamResults); + + // wait until we processed first result + verify(mockExecutor, timeout(5000).atLeast(2)).retrieveResultChanges(anyString(), anyString()); + + // send signal to cancel + terminal.raise(Terminal.Signal.INT); + furture.get(5, TimeUnit.SECONDS); + view.close(); + + Assert.assertEquals( + "+-----+---------+-------------+----------------------+----------------------+----------------+----------------------------+\n" + + "| +/- | boolean | int | bigint | varchar | decimal(10, 5) | timestamp |\n" + + "+-----+---------+-------------+----------------------+----------------------+----------------+----------------------------+\n" + + "| + | (NULL) | 1 | 2 | abc | 1.23 | 2020-03-01 18:39:14.0 |\n" + + "| - | false | (NULL) | 0 | | 1 | 2020-03-01 18:39:14.1 |\n" + + "| + | true | 2147483647 | (NULL) | abcdefg | 1234567890 | 2020-03-01 18:39:14.12 |\n" + + "Query terminated, received a total of 3 rows\n", + terminalOutput.toString()); + + verify(mockExecutor, times(1)).cancelQuery(anyString(), anyString()); + } + + @Test + public void testFailedStreamingResult() { + ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, true); + Executor mockExecutor = mock(Executor.class); + CliTableauResultView view = new CliTableauResultView( + terminal, mockExecutor, "session", resultDescriptor); + + when(mockExecutor.retrieveResultChanges(anyString(), anyString())) + .thenReturn(TypedResult.payload(streamingData.subList(0, streamingData.size() / 2))) + .thenThrow(new SqlExecutionException("query failed")); + + try { + view.displayStreamResults(); + Assert.fail("Shouldn't get here"); + } catch (SqlExecutionException e) { + Assert.assertEquals("query failed", e.getMessage()); + } + view.close(); + + Assert.assertEquals( + "+-----+---------+-------------+----------------------+----------------------+----------------+----------------------------+\n" + + "| +/- | boolean | int | bigint | varchar | decimal(10, 5) | timestamp |\n" + + "+-----+---------+-------------+----------------------+----------------------+----------------+----------------------------+\n" + + "| + | (NULL) | 1 | 2 | abc | 1.23 | 2020-03-01 18:39:14.0 |\n" + + "| - | false | (NULL) | 0 | | 1 | 2020-03-01 18:39:14.1 |\n" + + "| + | true | 2147483647 | (NULL) | abcdefg | 1234567890 | 2020-03-01 18:39:14.12 |\n", + terminalOutput.toString()); + verify(mockExecutor, times(1)).cancelQuery(anyString(), anyString()); + } +} diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java index 772fb013e3c3c..0ee65cc4aa740 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TerminalUtils.java @@ -35,8 +35,12 @@ private TerminalUtils() { } public static Terminal createDummyTerminal() { + return createDummyTerminal(new MockOutputStream()); + } + + public static Terminal createDummyTerminal(OutputStream out) { try { - return new DumbTerminal(new MockInputStream(), new MockOutputStream()); + return new DumbTerminal(new MockInputStream(), out); } catch (IOException e) { throw new RuntimeException("Unable to create dummy terminal."); }