Skip to content

Commit

Permalink
[FLINK-10245][hbase] Add an upsert table sink factory for HBase
Browse files Browse the repository at this point in the history
This commit adds full support for HBase to be used with Table & SQL API.

It includes:
- HBase upsert table sink (for append-only and updating queries)
- HBase table factory
- HBase table descriptors & validators
- Unit tests

This closes apache#9075
  • Loading branch information
Clarkkkkk authored and wuchong committed Jul 11, 2019
1 parent 1aa43fc commit 01031ad
Show file tree
Hide file tree
Showing 14 changed files with 1,068 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.addons.hbase;

import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.addons.hbase.util.HBaseReadHelper;
import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.functions.FunctionContext;
Expand Down Expand Up @@ -53,7 +53,7 @@ public class HBaseLookupFunction extends TableFunction<Row> {
private final byte[] serializedConfig;
private final HBaseTableSchema hbaseTableSchema;

private transient HBaseReadHelper readHelper;
private transient HBaseReadWriteHelper readHelper;
private transient Connection hConnection;
private transient HTable table;

Expand Down Expand Up @@ -115,7 +115,7 @@ public void open(FunctionContext context) {
LOG.error("Exception while creating connection to HBase.", ioe);
throw new RuntimeException("Cannot create connection to HBase.", ioe);
}
this.readHelper = new HBaseReadHelper(hbaseTableSchema);
this.readHelper = new HBaseReadWriteHelper(hbaseTableSchema);
LOG.info("end open.");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.addons.hbase;

import javax.annotation.Nullable;

import java.util.Objects;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Common Options for HBase.
*/
public class HBaseOptions {

private final String tableName;
private final String zkQuorum;
@Nullable private final String zkNodeParent;

private HBaseOptions(String tableName, String zkQuorum, @Nullable String zkNodeParent) {
this.tableName = tableName;
this.zkQuorum = zkQuorum;
this.zkNodeParent = zkNodeParent;
}

String getTableName() {
return tableName;
}

String getZkQuorum() {
return zkQuorum;
}

Optional<String> getZkNodeParent() {
return Optional.ofNullable(zkNodeParent);
}

@Override
public String toString() {
return "HBaseOptions{" +
"tableName='" + tableName + '\'' +
", zkQuorum='" + zkQuorum + '\'' +
", zkNodeParent='" + zkNodeParent + '\'' +
'}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
HBaseOptions that = (HBaseOptions) o;
return Objects.equals(tableName, that.tableName) &&
Objects.equals(zkQuorum, that.zkQuorum) &&
Objects.equals(zkNodeParent, that.zkNodeParent);
}

@Override
public int hashCode() {
return Objects.hash(tableName, zkQuorum, zkNodeParent);
}

/**
* Creates a builder of {@link HBaseOptions}.
*/
public static Builder builder() {
return new Builder();
}

/**
* Builder for {@link HBaseOptions}.
*/
public static class Builder {

private String tableName;
private String zkQuorum;
private String zkNodeParent;

/**
* Required. Sets the HBase table name.
*/
public Builder setTableName(String tableName) {
checkNotNull(tableName);
this.tableName = tableName;
return this;
}

/**
* Required. Sets the HBase ZooKeeper quorum configuration.
*/
public Builder setZkQuorum(String zkQuorum) {
checkNotNull(zkQuorum);
this.zkQuorum = zkQuorum;
return this;
}

/**
* Optional. Sets the root dir in ZK for the HBase cluster. Default is "/hbase".
*/
public Builder setZkNodeParent(String zkNodeParent) {
checkNotNull(zkNodeParent);
this.zkNodeParent = zkNodeParent;
return this;
}

/**
* Creates an instance of {@link HBaseOptions}.
*/
public HBaseOptions build() {
checkNotNull(zkQuorum, "Zookeeper quorum is not set.");
checkNotNull(tableName, "TableName is not set.");
return new HBaseOptions(tableName, zkQuorum, zkNodeParent);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.addons.hbase;

import org.apache.flink.addons.hbase.util.HBaseReadHelper;
import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
Expand Down Expand Up @@ -52,7 +52,7 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
private final HBaseTableSchema schema;

private transient org.apache.hadoop.conf.Configuration conf;
private transient HBaseReadHelper readHelper;
private transient HBaseReadWriteHelper readHelper;

public HBaseRowInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, HBaseTableSchema schema) {
this.tableName = tableName;
Expand All @@ -64,7 +64,7 @@ public HBaseRowInputFormat(org.apache.hadoop.conf.Configuration conf, String tab
public void configure(Configuration parameters) {
LOG.info("Initializing HBase configuration.");
// prepare hbase read helper
this.readHelper = new HBaseReadHelper(schema);
this.readHelper = new HBaseReadWriteHelper(schema);
connectToTable();
if (table != null) {
scan = getScanner();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
package org.apache.flink.addons.hbase;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.HBaseValidator;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;

Expand All @@ -39,32 +42,71 @@
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_TABLE_NAME;
import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_ZK_QUORUM;
import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME;
import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL;
import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS;
import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE;
import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_NODE_PARENT;
import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_QUORUM;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;

/**
* Factory for creating configured instances of {@link HBaseTableSource} or sink.
*/
public class HBaseTableFactory implements StreamTableSourceFactory<Row> {
public class HBaseTableFactory implements StreamTableSourceFactory<Row>, StreamTableSinkFactory<Tuple2<Boolean, Row>> {

@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
Configuration hbaseClientConf = HBaseConfiguration.create();
String hbaseZk = properties.get(CONNECTOR_HBASE_ZK_QUORUM);
String hbaseZk = descriptorProperties.getString(CONNECTOR_ZK_QUORUM);
hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseZk);
String hTableName = descriptorProperties.getString(CONNECTOR_HBASE_TABLE_NAME);
descriptorProperties
.getOptionalString(CONNECTOR_ZK_NODE_PARENT)
.ifPresent(v -> hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));

String hTableName = descriptorProperties.getString(CONNECTOR_TABLE_NAME);
TableSchema tableSchema = descriptorProperties.getTableSchema(SCHEMA);
HBaseTableSchema hbaseSchema = validateTableSchema(tableSchema);
return new HBaseTableSource(hbaseClientConf, hTableName, hbaseSchema, null);
}

@Override
public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
HBaseOptions.Builder hbaseOptionsBuilder = HBaseOptions.builder();
hbaseOptionsBuilder.setZkQuorum(descriptorProperties.getString(CONNECTOR_ZK_QUORUM));
hbaseOptionsBuilder.setTableName(descriptorProperties.getString(CONNECTOR_TABLE_NAME));
descriptorProperties
.getOptionalString(CONNECTOR_ZK_NODE_PARENT)
.ifPresent(hbaseOptionsBuilder::setZkNodeParent);

TableSchema tableSchema = descriptorProperties.getTableSchema(SCHEMA);
HBaseTableSchema hbaseSchema = validateTableSchema(tableSchema);

HBaseWriteOptions.Builder writeBuilder = HBaseWriteOptions.builder();
descriptorProperties
.getOptionalInt(CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS)
.ifPresent(writeBuilder::setBufferFlushMaxRows);
descriptorProperties
.getOptionalMemorySize(CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE)
.ifPresent(v -> writeBuilder.setBufferFlushMaxSizeInBytes(v.getBytes()));
descriptorProperties
.getOptionalDuration(CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL)
.ifPresent(v -> writeBuilder.setBufferFlushIntervalMillis(v.toMillis()));

return new HBaseUpsertTableSink(
hbaseSchema,
hbaseOptionsBuilder.build(),
writeBuilder.build()
);
}

private HBaseTableSchema validateTableSchema(TableSchema schema) {
HBaseTableSchema hbaseSchema = new HBaseTableSchema();
String[] fieldNames = schema.getFieldNames();
Expand Down Expand Up @@ -106,8 +148,12 @@ public Map<String, String> requiredContext() {
public List<String> supportedProperties() {
List<String> properties = new ArrayList<>();

properties.add(CONNECTOR_HBASE_TABLE_NAME);
properties.add(CONNECTOR_HBASE_ZK_QUORUM);
properties.add(CONNECTOR_TABLE_NAME);
properties.add(CONNECTOR_ZK_QUORUM);
properties.add(CONNECTOR_ZK_NODE_PARENT);
properties.add(CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE);
properties.add(CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS);
properties.add(CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL);

// schema
properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class HBaseTableSchema implements Serializable {
* @param qualifier the qualifier name
* @param clazz the data type of the qualifier
*/
void addColumn(String family, String qualifier, Class<?> clazz) {
public void addColumn(String family, String qualifier, Class<?> clazz) {
Preconditions.checkNotNull(family, "family name");
Preconditions.checkNotNull(qualifier, "qualifier name");
Preconditions.checkNotNull(clazz, "class type");
Expand All @@ -78,7 +78,7 @@ void addColumn(String family, String qualifier, Class<?> clazz) {
* @param rowKeyName the row key field name
* @param clazz the data type of the row key
*/
void setRowKey(String rowKeyName, Class<?> clazz) {
public void setRowKey(String rowKeyName, Class<?> clazz) {
Preconditions.checkNotNull(rowKeyName, "row key field name");
Preconditions.checkNotNull(clazz, "row key class type");
if (!HBaseTypeUtils.isSupportedType(clazz)) {
Expand All @@ -97,7 +97,7 @@ void setRowKey(String rowKeyName, Class<?> clazz) {
*
* @param charset the charset for value strings and HBase identifiers.
*/
void setCharset(String charset) {
public void setCharset(String charset) {
this.charset = charset;
}

Expand Down
Loading

0 comments on commit 01031ad

Please sign in to comment.