Skip to content

Commit

Permalink
update obkv-test
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Jan 4, 2024
1 parent 3bbba60 commit 53e38d7
Showing 1 changed file with 48 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import com.alipay.oceanbase.hbase.constants.OHConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
Expand Down Expand Up @@ -59,6 +62,32 @@ protected String getTestTable() {
return "htable";
}

private OHTableClient client;

@Before
public void before() throws Exception {
Configuration conf = new Configuration();
conf.set(OHConstants.HBASE_OCEANBASE_PARAM_URL, getUrl());
conf.set(OHConstants.HBASE_OCEANBASE_FULL_USER_NAME, getUsername());
conf.set(OHConstants.HBASE_OCEANBASE_PASSWORD, getPassword());
conf.set(OHConstants.HBASE_OCEANBASE_SYS_USER_NAME, OB_SERVER.getSysUsername());
conf.set(OHConstants.HBASE_OCEANBASE_SYS_PASSWORD, OB_SERVER.getSysPassword());
client = new OHTableClient(getTestTable(), conf);
client.init();
}

@After
public void after() throws Exception {
client.delete(
Arrays.asList(
new Delete(Bytes.toBytes("1")),
new Delete(Bytes.toBytes("2")),
new Delete(Bytes.toBytes("3")),
new Delete(Bytes.toBytes("4"))));
client.close();
client = null;
}

@Test
public void testSink() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
Expand All @@ -67,24 +96,19 @@ public void testSink() throws Exception {
StreamTableEnvironment.create(
execEnv, EnvironmentSettings.newInstance().inStreamingMode().build());

String family1 = "family1";
String family2 = "family2";

tEnv.executeSql(
String.format(
"CREATE TEMPORARY TABLE target ("
+ " rowkey STRING,"
+ " %s ROW<q1 INT>,"
+ " %s ROW<q2 STRING, q3 INT>,"
+ " family1 ROW<q1 INT>,"
+ " family2 ROW<q2 STRING, q3 INT>,"
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") with ("
+ " 'connector'='obkv-hbase',"
+ " 'sys.username'='%s',"
+ " 'sys.password'='%s',"
+ getCommonOptionsString()
+ ");",
family1,
family2,
OB_SERVER.getSysUsername(),
OB_SERVER.getSysPassword()));

Expand All @@ -97,16 +121,10 @@ public void testSink() throws Exception {
row("4", 4, "4", null)))
.await();

Configuration conf = new Configuration();
conf.set(OHConstants.HBASE_OCEANBASE_PARAM_URL, getUrl());
conf.set(OHConstants.HBASE_OCEANBASE_FULL_USER_NAME, getUsername());
conf.set(OHConstants.HBASE_OCEANBASE_PASSWORD, getPassword());
conf.set(OHConstants.HBASE_OCEANBASE_SYS_USER_NAME, OB_SERVER.getSysUsername());
conf.set(OHConstants.HBASE_OCEANBASE_SYS_PASSWORD, OB_SERVER.getSysPassword());

OHTableClient client = new OHTableClient(getTestTable(), conf);
client.init();
validateSinkResults();
}

private void validateSinkResults() throws Exception {
Function<KeyValue, String> valueFunc =
kv -> {
String column = Bytes.toString(kv.getQualifier());
Expand All @@ -118,29 +136,25 @@ public void testSink() throws Exception {
};

assertEqualsInAnyOrder(
Collections.singletonList("1,q1,1"), queryHTable(client, family1, "1", valueFunc));
assertTrue(queryHTable(client, family1, "2", valueFunc).isEmpty());
Collections.singletonList("1,q1,1"), queryHTable(client, "family1", "1"));
assertTrue(queryHTable(client, "family1", "2").isEmpty());
assertEqualsInAnyOrder(
Collections.singletonList("3,q1,3"), queryHTable(client, family1, "3", valueFunc));
Collections.singletonList("3,q1,3"), queryHTable(client, "family1", "3"));
assertEqualsInAnyOrder(
Collections.singletonList("4,q1,4"), queryHTable(client, family1, "4", valueFunc));
Collections.singletonList("4,q1,4"), queryHTable(client, "family1", "4"));

assertEqualsInAnyOrder(
Arrays.asList("1,q2,1", "1,q3,1"), queryHTable(client, family2, "1", valueFunc));
Arrays.asList("1,q2,1", "1,q3,1"), queryHTable(client, "family2", "1"));
assertEqualsInAnyOrder(
Collections.singletonList("2,q2,2"), queryHTable(client, family2, "2", valueFunc));
assertTrue(queryHTable(client, family2, "3", valueFunc).isEmpty());
Collections.singletonList("2,q2,2"), queryHTable(client, "family2", "2"));
assertTrue(queryHTable(client, "family2", "3").isEmpty());
assertEqualsInAnyOrder(
Collections.singletonList("4,q2,4"), queryHTable(client, family2, "4", valueFunc));
Collections.singletonList("4,q2,4"), queryHTable(client, "family2", "4"));

client.close();
}

private List<String> queryHTable(
OHTableClient client,
String family,
String rowKey,
Function<KeyValue, String> valueStringFunction)
private List<String> queryHTable(OHTableClient client, String family, String rowKey)
throws IOException {
List<String> result = new ArrayList<>();
Get get = new Get(Bytes.toBytes(rowKey));
Expand All @@ -150,12 +164,15 @@ private List<String> queryHTable(
return result;
}
for (KeyValue kv : r.list()) {
String column = Bytes.toString(kv.getQualifier());
result.add(
String.format(
"%s,%s,%s",
rowKey,
Bytes.toString(kv.getQualifier()),
valueStringFunction.apply(kv)));
column,
"q2".equals(column)
? Bytes.toString(kv.getValue())
: String.valueOf(Bytes.toInt(kv.getValue()))));
}
return result;
}
Expand Down

0 comments on commit 53e38d7

Please sign in to comment.