Skip to content

Commit

Permalink
[FLINK-17538][hbase] Refactor flink-hbase connector structure
Browse files Browse the repository at this point in the history
(1) rename flink-hbase module to flink-connector-hbase
(2) Move interfaces and classes to org.apache.flink.connector.hbase
(3) Keep ancient TableInputFormat in old package and mark deprecated
(4) Restructure classes into sub-packages
(5) Update documentations

This closes apache#12102
  • Loading branch information
wuchong committed May 14, 2020
1 parent 3771835 commit 360abcc
Show file tree
Hide file tree
Showing 38 changed files with 156 additions and 84 deletions.
2 changes: 1 addition & 1 deletion docs/dev/table/connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ The following tables list all available connectors and formats. Their mutual com
| Apache Kafka | 0.10 | `flink-connector-kafka-0.10` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-sql-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}.jar) |
| Apache Kafka | 0.11 | `flink-connector-kafka-0.11` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-sql-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}.jar) |
| Apache Kafka | 0.11+ (`universal`) | `flink-connector-kafka` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka{{site.scala_version_suffix}}/{{site.version}}/flink-sql-connector-kafka{{site.scala_version_suffix}}-{{site.version}}.jar) |
| HBase | 1.4.3 | `flink-hbase` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-hbase{{site.scala_version_suffix}}/{{site.version}}/flink-hbase{{site.scala_version_suffix}}-{{site.version}}.jar) |
| Apache HBase | 1.4.3 | `flink-connector-hbase` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-hbase{{site.scala_version_suffix}}/{{site.version}}/flink-connector-hbase{{site.scala_version_suffix}}-{{site.version}}.jar) |
| JDBC | | `flink-connector-jdbc` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc{{site.scala_version_suffix}}/{{site.version}}/flink-connector-jdbc{{site.scala_version_suffix}}-{{site.version}}.jar) |

### Formats
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/table/connect.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ The following tables list all available connectors and formats. Their mutual com
| Apache Kafka | 0.10 | `flink-connector-kafka-0.10` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-sql-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}.jar) |
| Apache Kafka | 0.11 | `flink-connector-kafka-0.11` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-sql-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}.jar) |
| Apache Kafka | 0.11+ (`universal`) | `flink-connector-kafka` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka{{site.scala_version_suffix}}/{{site.version}}/flink-sql-connector-kafka{{site.scala_version_suffix}}-{{site.version}}.jar) |
| HBase | 1.4.3 | `flink-hbase` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-hbase{{site.scala_version_suffix}}/{{site.version}}/flink-hbase{{site.scala_version_suffix}}-{{site.version}}.jar) |
| Apache HBase | 1.4.3 | `flink-connector-hbase` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-hbase{{site.scala_version_suffix}}/{{site.version}}/flink-connector-hbase{{site.scala_version_suffix}}-{{site.version}}.jar) |
| JDBC | | `flink-connector-jdbc` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc{{site.scala_version_suffix}}/{{site.version}}/flink-connector-jdbc{{site.scala_version_suffix}}-{{site.version}}.jar) |

### Formats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ under the License.
<relativePath>..</relativePath>
</parent>

<artifactId>flink-hbase_${scala.binary.version}</artifactId>
<name>flink-hbase</name>
<artifactId>flink-connector-hbase_${scala.binary.version}</artifactId>
<name>flink-connector-hbase</name>
<packaging>jar</packaging>

<properties>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.addons.hbase;

import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.connector.hbase.source.HBaseInputFormat;

/**
* {@link InputFormat} subclass that wraps the access for HTables.
*
* @deprecated please use {@link org.apache.flink.connector.hbase.source.HBaseInputFormat}.
*/
@Deprecated
public abstract class TableInputFormat<T extends Tuple> extends HBaseInputFormat<T> {
private static final long serialVersionUID = 1L;

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@
* limitations under the License.
*/

package org.apache.flink.addons.hbase;
package org.apache.flink.connector.hbase;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.hbase.options.HBaseOptions;
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.sink.HBaseUpsertTableSink;
import org.apache.flink.connector.hbase.source.HBaseTableSource;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.HBaseValidator;
Expand Down Expand Up @@ -70,6 +76,7 @@
/**
* Factory for creating configured instances of {@link HBaseTableSource} or sink.
*/
@Internal
public class HBaseTableFactory implements StreamTableSourceFactory<Row>, StreamTableSinkFactory<Tuple2<Boolean, Row>> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
* limitations under the License.
*/

package org.apache.flink.addons.hbase;
package org.apache.flink.connector.hbase.options;

import org.apache.flink.annotation.Internal;

import javax.annotation.Nullable;

Expand All @@ -28,6 +30,7 @@
/**
* Common Options for HBase.
*/
@Internal
public class HBaseOptions {

private final String tableName;
Expand All @@ -40,15 +43,15 @@ private HBaseOptions(String tableName, String zkQuorum, @Nullable String zkNodeP
this.zkNodeParent = zkNodeParent;
}

String getTableName() {
public String getTableName() {
return tableName;
}

String getZkQuorum() {
public String getZkQuorum() {
return zkQuorum;
}

Optional<String> getZkNodeParent() {
public Optional<String> getZkNodeParent() {
return Optional.ofNullable(zkNodeParent);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
* limitations under the License.
*/

package org.apache.flink.addons.hbase;
package org.apache.flink.connector.hbase.options;

import org.apache.flink.annotation.Internal;

import org.apache.hadoop.hbase.client.ConnectionConfiguration;

Expand All @@ -28,6 +30,7 @@
/**
* Options for HBase writing.
*/
@Internal
public class HBaseWriteOptions implements Serializable {

private static final long serialVersionUID = 1L;
Expand All @@ -45,15 +48,15 @@ private HBaseWriteOptions(
this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
}

long getBufferFlushMaxSizeInBytes() {
public long getBufferFlushMaxSizeInBytes() {
return bufferFlushMaxSizeInBytes;
}

long getBufferFlushMaxRows() {
public long getBufferFlushMaxRows() {
return bufferFlushMaxRows;
}

long getBufferFlushIntervalMillis() {
public long getBufferFlushIntervalMillis() {
return bufferFlushIntervalMillis;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
* limitations under the License.
*/

package org.apache.flink.addons.hbase;
package org.apache.flink.connector.hbase.sink;

import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseReadWriteHelper;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
Expand Down Expand Up @@ -62,6 +64,7 @@
* The buffering strategy can be configured by {@code bufferFlushMaxSizeInBytes},
* {@code bufferFlushMaxMutations} and {@code bufferFlushIntervalMillis}.</p>
*/
@Internal
public class HBaseUpsertSinkFunction
extends RichSinkFunction<Tuple2<Boolean, Row>>
implements CheckpointedFunction, BufferedMutator.ExceptionListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
* limitations under the License.
*/

package org.apache.flink.addons.hbase;
package org.apache.flink.connector.hbase.sink;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.hbase.options.HBaseOptions;
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
Expand All @@ -41,6 +45,7 @@
/**
* An upsert {@link UpsertStreamTableSink} for HBase.
*/
@Internal
public class HBaseUpsertTableSink implements UpsertStreamTableSink<Row> {

private final HBaseTableSchema hbaseTableSchema;
Expand Down Expand Up @@ -112,17 +117,17 @@ public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInform
}

@VisibleForTesting
HBaseTableSchema getHBaseTableSchema() {
public HBaseTableSchema getHBaseTableSchema() {
return hbaseTableSchema;
}

@VisibleForTesting
HBaseOptions getHBaseOptions() {
public HBaseOptions getHBaseOptions() {
return hbaseOptions;
}

@VisibleForTesting
HBaseWriteOptions getWriteOptions() {
public HBaseWriteOptions getWriteOptions() {
return writeOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
* limitations under the License.
*/

package org.apache.flink.addons.hbase;
package org.apache.flink.connector.hbase.source;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
Expand All @@ -41,9 +42,11 @@
/**
* Abstract {@link InputFormat} to read data from HBase tables.
*/
public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInputSplit> {
@Internal
abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInputSplit> {

protected static final Logger LOG = LoggerFactory.getLogger(AbstractTableInputFormat.class);
private static final long serialVersionUID = 1L;

// helper variable to decide whether the input is exhausted or not
protected boolean endReached = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
* limitations under the License.
*/

package org.apache.flink.addons.hbase;
package org.apache.flink.connector.hbase.source;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
Expand All @@ -30,7 +31,8 @@
/**
* {@link InputFormat} subclass that wraps the access for HTables.
*/
public abstract class TableInputFormat<T extends Tuple> extends AbstractTableInputFormat<T> {
@Experimental
public abstract class HBaseInputFormat<T extends Tuple> extends AbstractTableInputFormat<T> {

private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
* limitations under the License.
*/

package org.apache.flink.addons.hbase;
package org.apache.flink.connector.hbase.source;

import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseReadWriteHelper;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
Expand All @@ -45,6 +47,7 @@
* The HBaseLookupFunction is a standard user-defined table function, it can be used in tableAPI
* and also useful for temporal table join plan in SQL.
*/
@Internal
public class HBaseLookupFunction extends TableFunction<Row> {
private static final Logger LOG = LoggerFactory.getLogger(HBaseLookupFunction.class);
private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -144,7 +147,7 @@ public void close() {
}

@VisibleForTesting
String getHTableName() {
public String getHTableName() {
return hTableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
* limitations under the License.
*/

package org.apache.flink.addons.hbase;
package org.apache.flink.connector.hbase.source;

import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.hbase.util.HBaseReadWriteHelper;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.types.Row;

import org.apache.hadoop.hbase.HBaseConfiguration;
Expand All @@ -42,6 +44,7 @@
/**
* {@link InputFormat} subclass that wraps the access for HTables. Returns the result as {@link Row}
*/
@Internal
public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implements ResultTypeQueryable<Row> {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
* limitations under the License.
*/

package org.apache.flink.addons.hbase;
package org.apache.flink.connector.hbase.source;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
Expand Down Expand Up @@ -63,6 +65,7 @@
* }
* </pre>
*/
@Internal
public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTableSource<Row>, StreamTableSource<Row>, LookupableTableSource<Row> {

private final Configuration conf;
Expand All @@ -80,7 +83,7 @@ public HBaseTableSource(Configuration conf, String tableName) {
this(conf, tableName, new HBaseTableSchema(), null);
}

HBaseTableSource(Configuration conf, String tableName, HBaseTableSchema hbaseSchema, int[] projectFields) {
public HBaseTableSource(Configuration conf, String tableName, HBaseTableSchema hbaseSchema, int[] projectFields) {
this.conf = conf;
this.tableName = Preconditions.checkNotNull(tableName, "Table name");
this.hbaseSchema = hbaseSchema;
Expand Down Expand Up @@ -189,7 +192,7 @@ public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
}

@VisibleForTesting
HBaseTableSchema getHBaseTableSchema() {
public HBaseTableSchema getHBaseTableSchema() {
return this.hbaseSchema;
}
}
Loading

0 comments on commit 360abcc

Please sign in to comment.