Skip to content

Commit

Permalink
[FLINK-18604][hbase] HBase ConnectorDescriptor can not work in Table API
Browse files Browse the repository at this point in the history
This closes apache#13276
  • Loading branch information
pyscala committed Sep 16, 2020
1 parent c60aaff commit 97e7c73
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class HBase extends ConnectorDescriptor {
private DescriptorProperties properties = new DescriptorProperties();

public HBase() {
super(CONNECTOR_TYPE_VALUE_HBASE, 1, true);
super(CONNECTOR_TYPE_VALUE_HBASE, 1, false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@
import org.apache.flink.connector.hbase.util.PlannerType;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.descriptors.HBase;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -235,7 +238,36 @@ public void testTableSourceFieldOrder() {
}

@Test
public void testTableSourceReadAsByteArray() {
public void testTableSourceWithTableAPI() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
tEnv.connect(new HBase()
.version("1.4.3")
.tableName(TEST_TABLE_1)
.zookeeperQuorum(getZookeeperQuorum()))
.withSchema(new Schema()
.field("rowkey", DataTypes.INT())
.field("family2", DataTypes.ROW(DataTypes.FIELD("col1", DataTypes.STRING()), DataTypes.FIELD("col2", DataTypes.BIGINT())))
.field("family3", DataTypes.ROW(DataTypes.FIELD("col1", DataTypes.DOUBLE()), DataTypes.FIELD("col2", DataTypes.BOOLEAN()), DataTypes.FIELD("col3", DataTypes.STRING())))
.field("family1", DataTypes.ROW(DataTypes.FIELD("col1", DataTypes.INT()))))
.createTemporaryTable("hTable");
Table table = tEnv.sqlQuery("SELECT * FROM hTable AS h");
List<Row> results = CollectionUtil.iteratorToList(table.execute().collect());
String expected =
"1,Hello-1,100,1.01,false,Welt-1,10\n" +
"2,Hello-2,200,2.02,true,Welt-2,20\n" +
"3,Hello-3,300,3.03,false,Welt-3,30\n" +
"4,null,400,4.04,true,Welt-4,40\n" +
"5,Hello-5,500,5.05,false,Welt-5,50\n" +
"6,Hello-6,600,6.06,true,Welt-6,60\n" +
"7,Hello-7,700,7.07,false,Welt-7,70\n" +
"8,null,800,8.08,true,Welt-8,80\n";

TestBaseUtils.compareResultAsText(results, expected);
}

@Test
public void testTableSourceReadAsByteArray() throws Exception {
TableEnvironment tEnv = createBatchTableEnv();

if (isLegacyConnector) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,21 @@

package org.apache.flink.connector.hbase;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.Registration;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.descriptors.ConnectTableDescriptor;
import org.apache.flink.table.descriptors.Descriptor;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.DescriptorTestBase;
import org.apache.flink.table.descriptors.DescriptorValidator;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.HBase;
import org.apache.flink.table.descriptors.HBaseValidator;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.descriptors.StreamTableDescriptor;

import org.junit.Assert;
import org.junit.Test;
Expand All @@ -33,6 +41,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

/**
* Test case for {@link HBase} descriptor.
Expand Down Expand Up @@ -117,4 +126,30 @@ public void testRequiredFields() {
Assert.assertTrue("The case#" + i + " didn't get the expected error", caughtExpectedException);
}
}

@Test
public void testFormatNeed(){
String expected = "The connector org.apache.flink.table.descriptors.HBase does not require a format description but org.apache.flink.connector.hbase.HBaseDescriptorTest$1 found.";
AtomicReference<CatalogTableImpl> reference = new AtomicReference<>();
HBase hBase = new HBase();
Registration registration = (path, table) -> reference.set((CatalogTableImpl) table);
ConnectTableDescriptor descriptor = new StreamTableDescriptor(
registration, hBase)
.withFormat(new FormatDescriptor("myFormat", 1) {
@Override
protected Map<String, String> toFormatProperties() {
return new HashMap<>();
}
})
.withSchema(new Schema()
.field("f0", DataTypes.INT())
.rowtime(new Rowtime().timestampsFromField("f0")));
String actual = null;
try {
descriptor.toProperties();
} catch (Exception e) {
actual = e.getMessage();
}
Assert.assertEquals(expected, actual);
}
}

0 comments on commit 97e7c73

Please sign in to comment.