Skip to content

Commit

Permalink
[FLINK-15658][table-planner-blink] Fix duplicate field names exceptio…
Browse files Browse the repository at this point in the history
…n when join on the same key multiple times (apache#11011)
  • Loading branch information
wuchong authored and Oleksandr Nitavskyi committed Feb 20, 2020
1 parent 18996f7 commit 4f06ef1
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,20 @@ public class KeySelectorUtil {
public static BaseRowKeySelector getBaseRowSelector(int[] keyFields, BaseRowTypeInfo rowType) {
if (keyFields.length > 0) {
LogicalType[] inputFieldTypes = rowType.getLogicalTypes();
String[] inputFieldNames = rowType.getFieldNames();
LogicalType[] keyFieldTypes = new LogicalType[keyFields.length];
String[] keyFieldNames = new String[keyFields.length];
for (int i = 0; i < keyFields.length; ++i) {
keyFieldTypes[i] = inputFieldTypes[keyFields[i]];
keyFieldNames[i] = inputFieldNames[keyFields[i]];
}
RowType returnType = RowType.of(keyFieldTypes, keyFieldNames);
RowType inputType = RowType.of(inputFieldTypes, rowType.getFieldNames());
// do not provide field names for the result key type,
// because we may have duplicate key fields and the field names may conflict
RowType returnType = RowType.of(keyFieldTypes);
RowType inputType = rowType.toRowType();
GeneratedProjection generatedProjection = ProjectionCodeGenerator.generateProjection(
CodeGeneratorContext.apply(new TableConfig()),
"KeyProjection",
inputType,
returnType, keyFields);
returnType,
keyFields);
BaseRowTypeInfo keyRowType = BaseRowTypeInfo.of(returnType);
return new BinaryRowKeySelector(keyRowType, generatedProjection);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,18 @@ class JoinITCase(state: StateBackendMode) extends StreamingWithStateTestBase(sta
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}

@Test
def testInnerJoinWithDuplicateKey(): Unit = {
val query = "SELECT a1, b1, b3 FROM A JOIN B ON a1 = b1 AND a1 = b3"

val sink = new TestingRetractSink
tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
env.execute()

val expected = Seq("2,2,2", "3,3,3")
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}

@Test
def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b"
Expand Down

0 comments on commit 4f06ef1

Please sign in to comment.