Skip to content

Commit

Permalink
[FLINK-17944][sql-client] Wrong output in SQL Client's table mode
Browse files Browse the repository at this point in the history
This is a temporary workaround until we don't use Tuple2<Boolean, Row>
to represent changelogs anymore.

This closes apache#12346.
  • Loading branch information
zjffdu authored and twalthr committed Jun 10, 2020
1 parent 1839fa5 commit 7143e6a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

import java.net.InetAddress;
import java.util.ArrayList;
Expand Down Expand Up @@ -186,6 +187,10 @@ public List<Row> retrievePage(int page) {
@Override
protected void processRecord(Tuple2<Boolean, Row> change) {
synchronized (resultLock) {
// Always set the RowKind to INSERT, so that we can compare rows correctly (RowKind will be ignored),
// just use the Boolean of Tuple2<Boolean, Row> to figure out whether it is insert or delete.
change.f1.setKind(RowKind.INSERT);

// insert
if (change.f0) {
processInsert(change.f1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

import org.junit.Test;

Expand Down Expand Up @@ -57,10 +58,10 @@ public void testSnapshot() throws UnknownHostException {

result.isRetrieving = true;

result.processRecord(Tuple2.of(true, Row.of("A", 1)));
result.processRecord(Tuple2.of(true, Row.of("B", 1)));
result.processRecord(Tuple2.of(true, Row.of("A", 1)));
result.processRecord(Tuple2.of(true, Row.of("C", 2)));
result.processRecord(Tuple2.of(true, Row.ofKind(RowKind.INSERT, "A", 1)));
result.processRecord(Tuple2.of(true, Row.ofKind(RowKind.INSERT, "B", 1)));
result.processRecord(Tuple2.of(true, Row.ofKind(RowKind.INSERT, "A", 1)));
result.processRecord(Tuple2.of(true, Row.ofKind(RowKind.INSERT, "C", 2)));

assertEquals(TypedResult.payload(4), result.snapshot(1));

Expand All @@ -69,16 +70,16 @@ public void testSnapshot() throws UnknownHostException {
assertEquals(Collections.singletonList(Row.of("A", 1)), result.retrievePage(3));
assertEquals(Collections.singletonList(Row.of("C", 2)), result.retrievePage(4));

result.processRecord(Tuple2.of(false, Row.of("A", 1)));
result.processRecord(Tuple2.of(false, Row.ofKind(RowKind.UPDATE_BEFORE, "A", 1)));

assertEquals(TypedResult.payload(3), result.snapshot(1));

assertEquals(Collections.singletonList(Row.of("A", 1)), result.retrievePage(1));
assertEquals(Collections.singletonList(Row.of("B", 1)), result.retrievePage(2));
assertEquals(Collections.singletonList(Row.of("C", 2)), result.retrievePage(3));

result.processRecord(Tuple2.of(false, Row.of("C", 2)));
result.processRecord(Tuple2.of(false, Row.of("A", 1)));
result.processRecord(Tuple2.of(false, Row.ofKind(RowKind.UPDATE_BEFORE, "C", 2)));
result.processRecord(Tuple2.of(false, Row.ofKind(RowKind.UPDATE_BEFORE, "A", 1)));

assertEquals(TypedResult.payload(1), result.snapshot(1));

Expand Down

0 comments on commit 7143e6a

Please sign in to comment.