Skip to content

Commit

Permalink
[FLINK-21851][table-runtime] Refactor BinaryRowDataKeySelector in tes…
Browse files Browse the repository at this point in the history
…ting

This closes apache#15256
  • Loading branch information
JingsongLi committed Mar 19, 2021
1 parent 5bea459 commit c07cda7
Show file tree
Hide file tree
Showing 14 changed files with 155 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,20 @@
import org.apache.flink.table.runtime.dataview.StateDataViewStore;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigners;
import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;

import org.junit.Test;

Expand Down Expand Up @@ -80,8 +81,8 @@ public class SlicingWindowAggOperatorTest {
new BigIntType()
};

private static final BinaryRowDataKeySelector KEY_SELECTOR =
new BinaryRowDataKeySelector(
private static final RowDataKeySelector KEY_SELECTOR =
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {0}, INPUT_ROW_TYPE.getChildren().toArray(new LogicalType[0]));

private static final PagedTypeSerializer<RowData> KEY_SER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.RowDataRecordEqualiser;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;

/** Base class of tests for all kinds of processing-time DeduplicateFunction. */
abstract class ProcTimeDeduplicateFunctionTestBase {
Expand All @@ -40,8 +41,9 @@ abstract class ProcTimeDeduplicateFunctionTestBase {
new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new IntType());

int rowKeyIdx = 1;
BinaryRowDataKeySelector rowKeySelector =
new BinaryRowDataKeySelector(new int[] {rowKeyIdx}, inputRowType.toRowFieldTypes());
RowDataKeySelector rowKeySelector =
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {rowKeyIdx}, inputRowType.toRowFieldTypes());

RowDataHarnessAssertor assertor =
new RowDataHarnessAssertor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.types.RowKind;

import org.junit.Test;
Expand Down Expand Up @@ -64,8 +65,9 @@ public class RowTimeDeduplicateFunctionTest {
private TypeSerializer<RowData> serializer = inputRowType.toSerializer();
private int rowTimeIndex = 2;
private int rowKeyIndex = 0;
private BinaryRowDataKeySelector rowKeySelector =
new BinaryRowDataKeySelector(new int[] {rowKeyIndex}, inputRowType.toRowFieldTypes());
private RowDataKeySelector rowKeySelector =
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {rowKeyIndex}, inputRowType.toRowFieldTypes());
private RowDataHarnessAssertor assertor =
new RowDataHarnessAssertor(
inputRowType.toRowFieldTypes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;

import org.junit.Test;

Expand All @@ -38,8 +39,9 @@
public class ProcTimeIntervalJoinTest extends TimeIntervalStreamJoinTestBase {

private int keyIdx = 0;
private BinaryRowDataKeySelector keySelector =
new BinaryRowDataKeySelector(new int[] {keyIdx}, rowType.toRowFieldTypes());
private RowDataKeySelector keySelector =
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {keyIdx}, rowType.toRowFieldTypes());
private TypeInformation<RowData> keyType = InternalTypeInfo.ofFields();

/** a.proctime >= b.proctime - 10 and a.proctime <= b.proctime + 20. * */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.KeyedCoProcessOperatorWithWatermarkDelay;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;

import org.junit.Test;

Expand All @@ -40,8 +41,9 @@
public class RowTimeIntervalJoinTest extends TimeIntervalStreamJoinTestBase {

private int keyIdx = 1;
private BinaryRowDataKeySelector keySelector =
new BinaryRowDataKeySelector(new int[] {keyIdx}, rowType.toRowFieldTypes());
private RowDataKeySelector keySelector =
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {keyIdx}, rowType.toRowFieldTypes());
private TypeInformation<RowData> keyType = InternalTypeInfo.ofFields();

/** a.rowtime >= b.rowtime - 10 and a.rowtime <= b.rowtime + 20. * */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;

import org.junit.Test;

Expand All @@ -43,8 +44,9 @@ public class TemporalProcessTimeJoinOperatorTest extends TemporalTimeJoinOperato
private int keyIdx = 0;
private InternalTypeInfo<RowData> rowType =
InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH));
private BinaryRowDataKeySelector keySelector =
new BinaryRowDataKeySelector(new int[] {keyIdx}, rowType.toRowFieldTypes());
private RowDataKeySelector keySelector =
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {keyIdx}, rowType.toRowFieldTypes());
private TypeInformation<RowData> keyType = keySelector.getProducedType();
private InternalTypeInfo<RowData> outputRowType =
InternalTypeInfo.ofFields(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;

/** Base test class for TemporalJoinOperator. */
abstract class TemporalTimeJoinOperatorTestBase {
Expand Down Expand Up @@ -59,7 +60,8 @@ abstract class TemporalTimeJoinOperatorTestBase {
protected RowDataHarnessAssertor assertor =
new RowDataHarnessAssertor(outputRowType.toRowFieldTypes());
protected int keyIdx = 1;
protected BinaryRowDataKeySelector keySelector =
new BinaryRowDataKeySelector(new int[] {keyIdx}, rowType.toRowFieldTypes());
protected RowDataKeySelector keySelector =
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {keyIdx}, rowType.toRowFieldTypes());
protected TypeInformation<RowData> keyType = keySelector.getProducedType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;

import org.junit.Test;

Expand All @@ -53,8 +54,8 @@ public AggsHandleFunction newInstance(ClassLoader classLoader) {
};
private LogicalType[] accTypes = new LogicalType[] {new BigIntType()};

private BinaryRowDataKeySelector keySelector =
new BinaryRowDataKeySelector(new int[] {0}, inputFieldTypes);
private RowDataKeySelector keySelector =
HandwrittenSelectorUtil.getRowDataSelector(new int[] {0}, inputFieldTypes);
private TypeInformation<RowData> keyType = keySelector.getProducedType();

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;

/** Base class for row-time over window test. */
public class RowTimeOverWindowTestBase {
Expand All @@ -46,8 +47,8 @@ public AggsHandleFunction newInstance(ClassLoader classLoader) {
};
protected LogicalType[] accTypes = new LogicalType[] {new BigIntType()};

protected BinaryRowDataKeySelector keySelector =
new BinaryRowDataKeySelector(new int[] {0}, inputFieldTypes);
protected RowDataKeySelector keySelector =
HandwrittenSelectorUtil.getRowDataSelector(new int[] {0}, inputFieldTypes);
protected TypeInformation<RowData> keyType = keySelector.getProducedType();

protected OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.sort.IntRecordComparator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.RowDataRecordEqualiser;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;

import org.junit.Test;

Expand Down Expand Up @@ -81,8 +82,9 @@ public RecordComparator newInstance(ClassLoader classLoader) {

private int sortKeyIdx = 2;

BinaryRowDataKeySelector sortKeySelector =
new BinaryRowDataKeySelector(new int[] {sortKeyIdx}, inputRowType.toRowFieldTypes());
RowDataKeySelector sortKeySelector =
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {sortKeyIdx}, inputRowType.toRowFieldTypes());

static GeneratedRecordEqualiser generatedEqualiser =
new GeneratedRecordEqualiser("", "", new Object[0]) {
Expand All @@ -97,8 +99,8 @@ public RecordEqualiser newInstance(ClassLoader classLoader) {

private int partitionKeyIdx = 0;

private BinaryRowDataKeySelector keySelector =
new BinaryRowDataKeySelector(
private RowDataKeySelector keySelector =
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {partitionKeyIdx}, inputRowType.toRowFieldTypes());

private InternalTypeInfo<RowData> outputTypeWithoutRowNumber = inputRowType;
Expand All @@ -124,8 +126,9 @@ public RecordEqualiser newInstance(ClassLoader classLoader) {

// rowKey only used in UpdateRankFunction
private int rowKeyIdx = 1;
BinaryRowDataKeySelector rowKeySelector =
new BinaryRowDataKeySelector(new int[] {rowKeyIdx}, inputRowType.toRowFieldTypes());
RowDataKeySelector rowKeySelector =
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {rowKeyIdx}, inputRowType.toRowFieldTypes());

/** RankEnd column must be long, int or short type, but could not be string type yet. */
@Test(expected = UnsupportedOperationException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
import org.apache.flink.table.runtime.generated.NamespaceTableAggsHandleFunction;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.window.assigners.MergingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.operators.window.triggers.Trigger;
import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;

import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -194,8 +195,8 @@ KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createWindowOp

LogicalType[] inputTypes =
new LogicalType[] {new VarCharType(VarCharType.MAX_LENGTH), new IntType()};
BinaryRowDataKeySelector keySelector =
new BinaryRowDataKeySelector(new int[] {0}, inputTypes);
RowDataKeySelector keySelector =
HandwrittenSelectorUtil.getRowDataSelector(new int[] {0}, inputTypes);
TypeInformation<RowData> keyType = keySelector.getProducedType();
LogicalType[] accTypes = new LogicalType[] {new BigIntType(), new BigIntType()};
LogicalType[] windowTypes = new LogicalType[] {new BigIntType(), new BigIntType()};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,22 @@
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
import org.apache.flink.table.runtime.generated.NamespaceTableAggsHandleFunction;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.window.assigners.SessionWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.TumblingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.operators.window.triggers.ElementTriggers;
import org.apache.flink.table.runtime.operators.window.triggers.EventTimeTriggers;
import org.apache.flink.table.runtime.operators.window.triggers.ProcessingTimeTriggers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.RowDataTestUtil;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.util.Collector;

import org.junit.Test;
Expand Down Expand Up @@ -128,8 +129,8 @@ private NamespaceAggsHandleFunctionBase getCountWindowAggFunction() {
private LogicalType[] windowTypes =
new LogicalType[] {new BigIntType(), new BigIntType(), new BigIntType()};
private GenericRowEqualiser equaliser = new GenericRowEqualiser(accTypes, windowTypes);
private BinaryRowDataKeySelector keySelector =
new BinaryRowDataKeySelector(new int[] {0}, inputFieldTypes);
private RowDataKeySelector keySelector =
HandwrittenSelectorUtil.getRowDataSelector(new int[] {0}, inputFieldTypes);
private TypeInformation<RowData> keyType = keySelector.getProducedType();
private RowDataHarnessAssertor assertor =
new RowDataHarnessAssertor(
Expand Down
Loading

0 comments on commit c07cda7

Please sign in to comment.