Skip to content

Commit

Permalink
add data stream test
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Jan 4, 2024
1 parent cd35671 commit 963a936
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,21 @@

package com.oceanbase.connector.flink;

import com.oceanbase.connector.flink.connection.OBKVHBaseConnectionProvider;
import com.oceanbase.connector.flink.connection.OBKVHBaseTableSchema;
import com.oceanbase.connector.flink.sink.OBKVHBaseStatementExecutor;
import com.oceanbase.connector.flink.sink.OceanBaseSink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;

import com.alipay.oceanbase.hbase.OHTableClient;
import com.alipay.oceanbase.hbase.constants.OHConstants;
Expand All @@ -37,6 +49,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertTrue;

Expand All @@ -46,6 +59,11 @@ public class OBKVHBaseConnectorITCase extends OceanBaseTestBase {
public static final String CONFIG_URL =
"http:https://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=" + CLUSTER_NAME;

@Override
protected String getTestTable() {
return "htable";
}

@Override
protected String getUrl() {
return String.format("%s&database=%s", CONFIG_URL, OB_SERVER.getDatabaseName());
Expand All @@ -57,8 +75,11 @@ protected String getUsername() {
}

@Override
protected String getTestTable() {
return "htable";
protected Map<String, String> getOptions() {
Map<String, String> options = super.getOptions();
options.put("sys.username", OB_SERVER.getSysUsername());
options.put("sys.password", OB_SERVER.getSysPassword());
return options;
}

private OHTableClient client;
Expand Down Expand Up @@ -93,6 +114,60 @@ private Delete deleteFamily(String rowKey, String family) {
return new Delete(Bytes.toBytes(rowKey)).deleteFamily(Bytes.toBytes(family));
}

@Test
public void testDataStreamSink() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

ResolvedSchema physicalSchema =
new ResolvedSchema(
Arrays.asList(
Column.physical("rowkey", DataTypes.STRING().notNull()),
Column.physical(
"family1",
DataTypes.ROW(
DataTypes.FIELD(
"q1", DataTypes.INT().nullable()))
.notNull()),
Column.physical(
"family2",
DataTypes.ROW(
DataTypes.FIELD(
"q2",
DataTypes.STRING().nullable()),
DataTypes.FIELD(
"q3", DataTypes.INT().nullable()))
.notNull())),
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("rowkey")));

List<RowData> dataSet =
Arrays.asList(
rowData("1", 1, "1", 1),
rowData("2", null, "2", null),
rowData("3", 3, null, null),
rowData("4", 4, "4", null));

OBKVHBaseConnectorOptions connectorOptions = new OBKVHBaseConnectorOptions(getOptions());
OBKVHBaseConnectionProvider connectionProvider =
new OBKVHBaseConnectionProvider(connectorOptions.getConnectionOptions());
OBKVHBaseStatementExecutor statementExecutor =
new OBKVHBaseStatementExecutor(
connectorOptions.getStatementOptions(),
new OBKVHBaseTableSchema(physicalSchema),
connectionProvider);
OceanBaseSink sink =
new OceanBaseSink(connectorOptions.getWriterOptions(), statementExecutor);
env.fromCollection(dataSet).sinkTo(sink);
env.execute();

validateSinkResults();
}

private RowData rowData(String rowKey, Integer q1, String q2, Integer q3) {
return GenericRowData.of(StringData.fromString(rowKey), q1, StringData.fromString(q2), q3);
}

@Test
public void testSink() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
Expand All @@ -102,20 +177,15 @@ public void testSink() throws Exception {
execEnv, EnvironmentSettings.newInstance().inStreamingMode().build());

tEnv.executeSql(
String.format(
"CREATE TEMPORARY TABLE target ("
+ " rowkey STRING,"
+ " family1 ROW<q1 INT>,"
+ " family2 ROW<q2 STRING, q3 INT>,"
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") with ("
+ " 'connector'='obkv-hbase',"
+ " 'sys.username'='%s',"
+ " 'sys.password'='%s',"
+ getCommonOptionsString()
+ ");",
OB_SERVER.getSysUsername(),
OB_SERVER.getSysPassword()));
"CREATE TEMPORARY TABLE target ("
+ " rowkey STRING,"
+ " family1 ROW<q1 INT>,"
+ " family2 ROW<q2 STRING, q3 INT>,"
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") with ("
+ " 'connector'='obkv-hbase',"
+ getOptionsString()
+ ");");

tEnv.executeSql(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected String getPassword() {
return OB_SERVER.getPassword();
}

protected Map<String, String> getCommonOptions() {
protected Map<String, String> getOptions() {
Map<String, String> options = new HashMap<>();
options.put("url", getUrl());
options.put("table-name", getTestTable());
Expand All @@ -72,8 +72,8 @@ protected Map<String, String> getCommonOptions() {
return options;
}

protected String getCommonOptionsString() {
return getCommonOptions().entrySet().stream()
protected String getOptionsString() {
return getOptions().entrySet().stream()
.map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue()))
.collect(Collectors.joining(","));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ protected String getTestTable() {
}

@Override
protected Map<String, String> getCommonOptions() {
Map<String, String> options = super.getCommonOptions();
protected Map<String, String> getOptions() {
Map<String, String> options = super.getOptions();
options.put("compatible-mode", "mysql");
options.put("schema-name", "test");
options.put("connection-pool-properties", "druid.initialSize=4;druid.maxActive=20;");
Expand Down Expand Up @@ -105,15 +105,14 @@ public void testDataStreamSink() throws Exception {
rowData(108, "jacket", "water resistent black wind breaker", 0.1),
rowData(109, "spare tire", "24 inch spare tire", 22.2));

OceanBaseConnectorOptions connectorOptions =
new OceanBaseConnectorOptions(getCommonOptions());

OceanBaseConnectorOptions connectorOptions = new OceanBaseConnectorOptions(getOptions());
OceanBaseConnectionProvider connectionProvider =
new OceanBaseConnectionPool(connectorOptions.getConnectionOptions());
OceanBaseTableSchema tableSchema = new OceanBaseTableSchema(physicalSchema);
OceanBaseStatementExecutor statementExecutor =
new OceanBaseStatementExecutor(
connectorOptions.getStatementOptions(), tableSchema, connectionProvider);
connectorOptions.getStatementOptions(),
new OceanBaseTableSchema(physicalSchema),
connectionProvider);
OceanBaseSink sink =
new OceanBaseSink(connectorOptions.getWriterOptions(), statementExecutor);
env.fromCollection(dataSet).sinkTo(sink);
Expand Down Expand Up @@ -147,7 +146,7 @@ public void testSink() throws Exception {
+ " PRIMARY KEY (`id`) NOT ENFORCED"
+ ") with ("
+ " 'connector'='oceanbase',"
+ getCommonOptionsString()
+ getOptionsString()
+ ");");

tEnv.executeSql(
Expand Down

0 comments on commit 963a936

Please sign in to comment.