Skip to content

Commit

Permalink
[FLINK-13699][hbase] Add integration test for HBase to verify DDL wit…
Browse files Browse the repository at this point in the history
…h TIMESTAMP types
  • Loading branch information
wuchong committed Aug 19, 2019
1 parent b837a58 commit d20175e
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.commons.net.ntp.TimeStamp;
import org.apache.hadoop.hbase.util.Bytes;

import java.math.BigDecimal;
Expand Down Expand Up @@ -102,7 +101,7 @@ public static byte[] serializeFromObject(Object value, int typeIdx, Charset stri
case 8:
return Bytes.toBytes((boolean) value);
case 9: // sql.Timestamp encoded to Long
return Bytes.toBytes(((TimeStamp) value).getTime());
return Bytes.toBytes(((Timestamp) value).getTime());
case 10: // sql.Date encoded as long
return Bytes.toBytes(((Date) value).getTime());
case 11: // sql.Time encoded as long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
Expand All @@ -38,7 +39,9 @@
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.planner.runtime.utils.TableUtil;
import org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil;
import org.apache.flink.table.planner.sinks.CollectRowTableSink;
import org.apache.flink.table.planner.sinks.CollectTableSink;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.utils.StreamITCase;
import org.apache.flink.table.sinks.TableSink;
Expand All @@ -53,11 +56,16 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import scala.Option;

import static org.apache.flink.addons.hbase.util.PlannerType.OLD_PLANNER;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
Expand Down Expand Up @@ -228,18 +236,27 @@ public void testTableInputFormat() throws Exception {
// prepare a source collection.
private static final List<Row> testData1 = new ArrayList<>();
private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(
new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE, Types.BOOLEAN, Types.STRING},
new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3"});
new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE,
Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_DATE, Types.SQL_TIME},
new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3", "f4c1", "f4c2", "f4c3"});

static {
testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4"));
testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8"));
testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1",
Timestamp.valueOf("2019-08-18 19:00:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:00:00")));
testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2",
Timestamp.valueOf("2019-08-18 19:01:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:01:00")));
testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3",
Timestamp.valueOf("2019-08-18 19:02:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:02:00")));
testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4",
Timestamp.valueOf("2019-08-18 19:03:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:03:00")));
testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5",
Timestamp.valueOf("2019-08-19 19:10:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:10:00")));
testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6",
Timestamp.valueOf("2019-08-19 19:20:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:20:00")));
testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7",
Timestamp.valueOf("2019-08-19 19:30:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:30:00")));
testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8",
Timestamp.valueOf("2019-08-19 19:40:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:40:00")));
}

@Test
Expand Down Expand Up @@ -320,6 +337,72 @@ public void testTableSink() throws Exception {
TestBaseUtils.compareResultAsText(results, expected);
}

@Test
public void testTableSourceSinkWithDDL() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);

DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1);
tEnv.registerDataStream("src", ds);

// register hbase table
String quorum = getZookeeperQuorum();
String ddl = "CREATE TABLE hbase (\n" +
" rowkey INT," +
" family1 ROW<col1 INT>,\n" +
" family2 ROW<col1 VARCHAR, col2 BIGINT>,\n" +
" family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 VARCHAR>,\n" +
" family4 ROW<col1 TIMESTAMP(3), col2 DATE, col3 TIME(3)>\n" +
") WITH (\n" +
" 'connector.type' = 'hbase',\n" +
" 'connector.version' = '1.4.3',\n" +
" 'connector.table-name' = 'testTable3',\n" +
" 'connector.zookeeper.quorum' = '" + quorum + "',\n" +
" 'connector.zookeeper.znode.parent' = '/hbase' " +
")";
tEnv.sqlUpdate(ddl);

String query = "INSERT INTO hbase " +
"SELECT rowkey, ROW(f1c1), ROW(f2c1, f2c2), ROW(f3c1, f3c2, f3c3), ROW(f4c1, f4c2, f4c3) " +
"FROM src";
tEnv.sqlUpdate(query);

// wait to finish
tEnv.execute("HBase Job");

// start a batch scan job to verify contents in HBase table
TableEnvironment batchTableEnv = createBatchTableEnv();
batchTableEnv.sqlUpdate(ddl);

Table table = batchTableEnv.sqlQuery(
"SELECT " +
" h.rowkey, " +
" h.family1.col1, " +
" h.family2.col1, " +
" h.family2.col2, " +
" h.family3.col1, " +
" h.family3.col2, " +
" h.family3.col3, " +
" h.family4.col1, " +
" h.family4.col2, " +
" h.family4.col3 " +
"FROM hbase AS h"
);

List<Row> results = collectBatchResult(table);
String expected =
"1,10,Hello-1,100,1.01,false,Welt-1,2019-08-18 19:00:00.0,2019-08-18,19:00:00\n" +
"2,20,Hello-2,200,2.02,true,Welt-2,2019-08-18 19:01:00.0,2019-08-18,19:01:00\n" +
"3,30,Hello-3,300,3.03,false,Welt-3,2019-08-18 19:02:00.0,2019-08-18,19:02:00\n" +
"4,40,,400,4.04,true,Welt-4,2019-08-18 19:03:00.0,2019-08-18,19:03:00\n" +
"5,50,Hello-5,500,5.05,false,Welt-5,2019-08-19 19:10:00.0,2019-08-19,19:10:00\n" +
"6,60,Hello-6,600,6.06,true,Welt-6,2019-08-19 19:20:00.0,2019-08-19,19:20:00\n" +
"7,70,Hello-7,700,7.07,false,Welt-7,2019-08-19 19:30:00.0,2019-08-19,19:30:00\n" +
"8,80,,800,8.08,true,Welt-8,2019-08-19 19:40:00.0,2019-08-19,19:40:00\n";

TestBaseUtils.compareResultAsText(results, expected);
}


// -------------------------------------------------------------------------------------
// HBase lookup source tests
Expand Down Expand Up @@ -461,7 +544,28 @@ private List<Row> collectBatchResult(Table table) throws Exception {
DataSet<Row> resultSet = batchTableEnv.toDataSet(table, Row.class);
return resultSet.collect();
} else {
return JavaScalaConversionUtil.toJava(TableUtil.collect(tableImpl));
TableImpl t = (TableImpl) table;
TableSchema schema = t.getSchema();
List<TypeInformation> types = new ArrayList<>();
for (TypeInformation typeInfo : t.getSchema().getFieldTypes()) {
// convert LOCAL_DATE_TIME to legacy TIMESTAMP to make the output consistent with flink batch planner
if (typeInfo.equals(Types.LOCAL_DATE_TIME)) {
types.add(Types.SQL_TIMESTAMP);
} else if (typeInfo.equals(Types.LOCAL_DATE)) {
types.add(Types.SQL_DATE);
} else if (typeInfo.equals(Types.LOCAL_TIME)) {
types.add(Types.SQL_TIME);
} else {
types.add(typeInfo);
}
}
CollectRowTableSink sink = new CollectRowTableSink();
CollectTableSink<Row> configuredSink = (CollectTableSink<Row>) sink.configure(
schema.getFieldNames(), types.toArray(new TypeInformation[0]));
return JavaScalaConversionUtil.toJava(
BatchTableEnvUtil.collect(
t.getTableEnvironment(), table, configuredSink, Option.apply("JOB"),
EnvironmentSettings.DEFAULT_BUILTIN_CATALOG, EnvironmentSettings.DEFAULT_BUILTIN_DATABASE));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {

protected static final String TEST_TABLE_1 = "testTable1";
protected static final String TEST_TABLE_2 = "testTable2";
protected static final String TEST_TABLE_3 = "testTable3";

protected static final String ROWKEY = "rk";
protected static final String FAMILY1 = "family1";
Expand All @@ -58,6 +59,8 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
protected static final String F3COL2 = "col2";
protected static final String F3COL3 = "col3";

protected static final String FAMILY4 = "family4";

private static final byte[][] FAMILIES = new byte[][]{
Bytes.toBytes(FAMILY1),
Bytes.toBytes(FAMILY2),
Expand Down Expand Up @@ -100,6 +103,7 @@ public void before() {
private static void prepareTables() throws IOException {
createHBaseTable1();
createHBaseTable2();
createHBaseTable3();
}

private static void createHBaseTable1() throws IOException {
Expand Down Expand Up @@ -131,6 +135,18 @@ private static void createHBaseTable2() {
createTable(tableName, FAMILIES, SPLIT_KEYS);
}

private static void createHBaseTable3() {
// create a table
byte[][] families = new byte[][]{
Bytes.toBytes(FAMILY1),
Bytes.toBytes(FAMILY2),
Bytes.toBytes(FAMILY3),
Bytes.toBytes(FAMILY4),
};
TableName tableName = TableName.valueOf(TEST_TABLE_3);
createTable(tableName, families, SPLIT_KEYS);
}

private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) {
Put put = new Put(Bytes.toBytes(rowKey));
// family 1
Expand Down

0 comments on commit d20175e

Please sign in to comment.