Skip to content

Commit

Permalink
test: add data stream test case and update java demo (#47)
Browse files Browse the repository at this point in the history
* fix(docs): use the same configs in demo

* add datastream demo

* update obkv-test

* fix delete

* add data stream test

* update docs

* fix row type field
  • Loading branch information
whhe committed Jan 4, 2024
1 parent e39c6e0 commit 775bc52
Show file tree
Hide file tree
Showing 9 changed files with 333 additions and 134 deletions.
2 changes: 2 additions & 0 deletions docs/sink/flink-connector-obkv-hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public class Main {

Once executed, the records should have been written to OceanBase.

For more information please refer to [OBKVHBaseConnectorITCase.java](../../flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java).

#### Flink SQL Demo

Put the JAR files of dependencies to the 'lib' directory of Flink, and then create the destination table with Flink SQL through the sql client.
Expand Down
2 changes: 2 additions & 0 deletions docs/sink/flink-connector-obkv-hbase_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public class Main {

执行完成后,即可在 OceanBase 中检索验证。

更多信息请参考 [OBKVHBaseConnectorITCase.java](../../flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java)

#### Flink SQL 示例

将需要用到的依赖的 JAR 文件放到 Flink 的 lib 目录下,之后通过 SQL Client 在 Flink 中创建目的表。
Expand Down
8 changes: 5 additions & 3 deletions docs/sink/flink-connector-oceanbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ public class Main {
+ ") with ("
+ " 'connector' = 'oceanbase',"
+ " 'url' = 'jdbc:oceanbase:https://127.0.0.1:2881/test',"
+ " 'schema-name'= 'test',"
+ " 'table-name' = 't_sink',"
+ " 'username' = 'root@test',"
+ " 'username' = 'root@test#obcluster',"
+ " 'password' = 'pswd',"
+ " 'compatible-mode' = 'mysql',"
+ " 'connection-pool-properties' = 'druid.initialSize=10;druid.maxActive=100',"
Expand All @@ -115,10 +116,13 @@ public class Main {
.await();
}
}

```

Once executed, the records should have been written to OceanBase.

For more information please refer to [OceanBaseConnectorITCase.java](../../flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java).

#### Flink SQL Demo

Put the JAR files of dependencies to the 'lib' directory of Flink, and then create the destination table with Flink SQL through the sql client.
Expand All @@ -133,8 +137,6 @@ CREATE TABLE t_sink
) with (
'connector' = 'oceanbase',
'url' = 'jdbc:oceanbase:https://127.0.0.1:2881/test',
'cluster-name' = 'obcluster',
'tenant-name' = 'test',
'schema-name' = 'test',
'table-name' = 't_sink',
'username' = 'root@test#obcluster',
Expand Down
8 changes: 5 additions & 3 deletions docs/sink/flink-connector-oceanbase_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ public class Main {
+ ") with ("
+ " 'connector' = 'oceanbase',"
+ " 'url' = 'jdbc:oceanbase:https://127.0.0.1:2881/test',"
+ " 'schema-name'= 'test',"
+ " 'table-name' = 't_sink',"
+ " 'username' = 'root@test',"
+ " 'username' = 'root@test#obcluster',"
+ " 'password' = 'pswd',"
+ " 'compatible-mode' = 'mysql',"
+ " 'connection-pool-properties' = 'druid.initialSize=10;druid.maxActive=100',"
Expand All @@ -116,10 +117,13 @@ public class Main {
.await();
}
}

```

执行完成后,即可在 OceanBase 中检索验证。

更多信息请参考 [OceanBaseConnectorITCase.java](../../flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java)

#### Flink SQL 示例

将需要用到的依赖的 JAR 文件放到 Flink 的 lib 目录下,之后通过 SQL Client 在 Flink 中创建目的表。
Expand All @@ -135,8 +139,6 @@ CREATE TABLE t_sink
with (
'connector' = 'oceanbase',
'url' = 'jdbc:oceanbase:https://127.0.0.1:2881/test',
'cluster-name' = 'obcluster',
'tenant-name' = 'test',
'schema-name' = 'test',
'table-name' = 't_sink',
'username' = 'root@test#obcluster',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,40 @@

package com.oceanbase.connector.flink;

import com.oceanbase.connector.flink.connection.OBKVHBaseConnectionProvider;
import com.oceanbase.connector.flink.connection.OBKVHBaseTableSchema;
import com.oceanbase.connector.flink.sink.OBKVHBaseStatementExecutor;
import com.oceanbase.connector.flink.sink.OceanBaseSink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;

import com.alipay.oceanbase.hbase.OHTableClient;
import com.alipay.oceanbase.hbase.constants.OHConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.Map;

import static org.junit.Assert.assertTrue;

Expand All @@ -44,6 +59,118 @@ public class OBKVHBaseConnectorITCase extends OceanBaseTestBase {
public static final String CONFIG_URL =
"http:https://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=" + CLUSTER_NAME;

@Override
protected String getTestTable() {
return "htable";
}

@Override
protected String getUrl() {
return String.format("%s&database=%s", CONFIG_URL, OB_SERVER.getDatabaseName());
}

@Override
protected String getUsername() {
return OB_SERVER.getUsername() + "#" + CLUSTER_NAME;
}

@Override
protected Map<String, String> getOptions() {
Map<String, String> options = super.getOptions();
options.put("sys.username", OB_SERVER.getSysUsername());
options.put("sys.password", OB_SERVER.getSysPassword());
return options;
}

private OHTableClient client;

@Before
public void before() throws Exception {
Configuration conf = new Configuration();
conf.set(OHConstants.HBASE_OCEANBASE_PARAM_URL, getUrl());
conf.set(OHConstants.HBASE_OCEANBASE_FULL_USER_NAME, getUsername());
conf.set(OHConstants.HBASE_OCEANBASE_PASSWORD, getPassword());
conf.set(OHConstants.HBASE_OCEANBASE_SYS_USER_NAME, OB_SERVER.getSysUsername());
conf.set(OHConstants.HBASE_OCEANBASE_SYS_PASSWORD, OB_SERVER.getSysPassword());
client = new OHTableClient(getTestTable(), conf);
client.init();
}

@After
public void after() throws Exception {
client.delete(
Arrays.asList(
deleteFamily("1", "family1"),
deleteFamily("1", "family2"),
deleteFamily("2", "family2"),
deleteFamily("3", "family1"),
deleteFamily("4", "family1"),
deleteFamily("4", "family2")));
client.close();
client = null;
}

private Delete deleteFamily(String rowKey, String family) {
return new Delete(Bytes.toBytes(rowKey)).deleteFamily(Bytes.toBytes(family));
}

@Test
public void testDataStreamSink() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

ResolvedSchema physicalSchema =
new ResolvedSchema(
Arrays.asList(
Column.physical("rowkey", DataTypes.STRING().notNull()),
Column.physical(
"family1",
DataTypes.ROW(
DataTypes.FIELD(
"q1", DataTypes.INT().nullable()))
.notNull()),
Column.physical(
"family2",
DataTypes.ROW(
DataTypes.FIELD(
"q2",
DataTypes.STRING().nullable()),
DataTypes.FIELD(
"q3", DataTypes.INT().nullable()))
.notNull())),
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("rowkey")));

List<RowData> dataSet =
Arrays.asList(
rowData("1", 1, "1", 1),
rowData("2", null, "2", null),
rowData("3", 3, null, null),
rowData("4", 4, "4", null));

OBKVHBaseConnectorOptions connectorOptions = new OBKVHBaseConnectorOptions(getOptions());
OBKVHBaseConnectionProvider connectionProvider =
new OBKVHBaseConnectionProvider(connectorOptions.getConnectionOptions());
OBKVHBaseStatementExecutor statementExecutor =
new OBKVHBaseStatementExecutor(
connectorOptions.getStatementOptions(),
new OBKVHBaseTableSchema(physicalSchema),
connectionProvider);
OceanBaseSink sink =
new OceanBaseSink(connectorOptions.getWriterOptions(), statementExecutor);
env.fromCollection(dataSet).sinkTo(sink);
env.execute();

validateSinkResults();
}

private RowData rowData(String rowKey, Integer q1, String q2, Integer q3) {
return GenericRowData.of(
StringData.fromString(rowKey),
GenericRowData.of(q1),
GenericRowData.of(StringData.fromString(q2), q3));
}

@Test
public void testSink() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
Expand All @@ -52,37 +179,16 @@ public void testSink() throws Exception {
StreamTableEnvironment.create(
execEnv, EnvironmentSettings.newInstance().inStreamingMode().build());

String hTable = "htable";
String family1 = "family1";
String family2 = "family2";

String url = String.format("%s&database=%s", CONFIG_URL, obServer.getDatabaseName());
String fullUsername = obServer.getUsername() + "#" + CLUSTER_NAME;

tEnv.executeSql(
String.format(
"CREATE TEMPORARY TABLE target ("
+ " rowkey STRING,"
+ " %s ROW<q1 INT>,"
+ " %s ROW<q2 STRING, q3 INT>,"
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") with ("
+ " 'connector'='obkv-hbase',"
+ " 'url'='%s',"
+ " 'table-name'='%s',"
+ " 'username'='%s',"
+ " 'password'='%s',"
+ " 'sys.username'='%s',"
+ " 'sys.password'='%s'"
+ ");",
family1,
family2,
url,
hTable,
fullUsername,
obServer.getPassword(),
obServer.getSysUsername(),
obServer.getSysPassword()));
"CREATE TEMPORARY TABLE target ("
+ " rowkey STRING,"
+ " family1 ROW<q1 INT>,"
+ " family2 ROW<q2 STRING, q3 INT>,"
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") with ("
+ " 'connector'='obkv-hbase',"
+ getOptionsString()
+ ");");

tEnv.executeSql(
String.format(
Expand All @@ -93,50 +199,28 @@ public void testSink() throws Exception {
row("4", 4, "4", null)))
.await();

Configuration conf = new Configuration();
conf.set(OHConstants.HBASE_OCEANBASE_PARAM_URL, url);
conf.set(OHConstants.HBASE_OCEANBASE_FULL_USER_NAME, fullUsername);
conf.set(OHConstants.HBASE_OCEANBASE_PASSWORD, obServer.getPassword());
conf.set(OHConstants.HBASE_OCEANBASE_SYS_USER_NAME, obServer.getSysUsername());
conf.set(OHConstants.HBASE_OCEANBASE_SYS_PASSWORD, obServer.getSysPassword());

OHTableClient client = new OHTableClient(hTable, conf);
client.init();

Function<KeyValue, String> valueFunc =
kv -> {
String column = Bytes.toString(kv.getQualifier());
if ("q2".equals(column)) {
return Bytes.toString(kv.getValue());
} else {
return String.valueOf(Bytes.toInt(kv.getValue()));
}
};
validateSinkResults();
}

private void validateSinkResults() throws Exception {
assertEqualsInAnyOrder(
Collections.singletonList("1,q1,1"), queryHTable(client, family1, "1", valueFunc));
assertTrue(queryHTable(client, family1, "2", valueFunc).isEmpty());
Collections.singletonList("1,q1,1"), queryHTable(client, "family1", "1"));
assertTrue(queryHTable(client, "family1", "2").isEmpty());
assertEqualsInAnyOrder(
Collections.singletonList("3,q1,3"), queryHTable(client, family1, "3", valueFunc));
Collections.singletonList("3,q1,3"), queryHTable(client, "family1", "3"));
assertEqualsInAnyOrder(
Collections.singletonList("4,q1,4"), queryHTable(client, family1, "4", valueFunc));
Collections.singletonList("4,q1,4"), queryHTable(client, "family1", "4"));

assertEqualsInAnyOrder(
Arrays.asList("1,q2,1", "1,q3,1"), queryHTable(client, family2, "1", valueFunc));
Arrays.asList("1,q2,1", "1,q3,1"), queryHTable(client, "family2", "1"));
assertEqualsInAnyOrder(
Collections.singletonList("2,q2,2"), queryHTable(client, family2, "2", valueFunc));
assertTrue(queryHTable(client, family2, "3", valueFunc).isEmpty());
Collections.singletonList("2,q2,2"), queryHTable(client, "family2", "2"));
assertTrue(queryHTable(client, "family2", "3").isEmpty());
assertEqualsInAnyOrder(
Collections.singletonList("4,q2,4"), queryHTable(client, family2, "4", valueFunc));

client.close();
Collections.singletonList("4,q2,4"), queryHTable(client, "family2", "4"));
}

private List<String> queryHTable(
OHTableClient client,
String family,
String rowKey,
Function<KeyValue, String> valueStringFunction)
private List<String> queryHTable(OHTableClient client, String family, String rowKey)
throws IOException {
List<String> result = new ArrayList<>();
Get get = new Get(Bytes.toBytes(rowKey));
Expand All @@ -146,12 +230,15 @@ private List<String> queryHTable(
return result;
}
for (KeyValue kv : r.list()) {
String column = Bytes.toString(kv.getQualifier());
result.add(
String.format(
"%s,%s,%s",
rowKey,
Bytes.toString(kv.getQualifier()),
valueStringFunction.apply(kv)));
column,
"q2".equals(column)
? Bytes.toString(kv.getValue())
: String.valueOf(Bytes.toInt(kv.getValue()))));
}
return result;
}
Expand Down
3 changes: 2 additions & 1 deletion flink-connector-obkv-hbase/src/test/resources/sql/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
-- specific language governing permissions and limitations
-- under the License.

use test;
CREATE DATABASE IF NOT EXISTS test;
USE test;

CREATE TABLE `htable$family1`
(
Expand Down
Loading

0 comments on commit 775bc52

Please sign in to comment.