Skip to content

Commit

Permalink
[FLINK-17537][jdbc] Refactor flink-jdbc connector structure
Browse files Browse the repository at this point in the history
(1) Use Jdbc instead of JDBC.
(2) Move interfaces and classes to org.apache.flink.connector.jdbc.
(3) Keep ancient JDBCOutputFormat, JDBCInputFormat and ParameterValuesProvider in old package.
(4) Add tests/ITCase for ancient Classes and new classes.
(5) rename flink-jdbc module to flink-connector-jdbc.
(6) update docs.

This closes apache#12036
  • Loading branch information
leonardBang committed May 13, 2020
1 parent 09495d1 commit 6a6a439
Show file tree
Hide file tree
Showing 97 changed files with 2,192 additions and 1,396 deletions.
4 changes: 2 additions & 2 deletions docs/dev/batch/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,7 @@ DataSet<Long> numbers = env.generateSequence(1, 10000000);
// Read data from a relational database using the JDBC input format
DataSet<Tuple2<String, Integer> dbData =
env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
JdbcInputFormat.buildJdbcInputFormat()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:persons")
.setQuery("select name, age from persons")
Expand Down Expand Up @@ -1414,7 +1414,7 @@ DataSet<Tuple3<String, Integer, Double>> myResult = [...]
// write Tuple DataSet to a relational database
myResult.output(
// build and configure OutputFormat
JDBCOutputFormat.buildJDBCOutputFormat()
JdbcOutputFormat.buildJdbcOutputFormat()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:persons")
.setQuery("insert into persons (name, age, height) values (?,?,?)")
Expand Down
4 changes: 2 additions & 2 deletions docs/dev/batch/index.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ DataSet<Long> numbers = env.generateSequence(1, 10000000);
// Read data from a relational database using the JDBC input format
DataSet<Tuple2<String, Integer> dbData =
env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
JdbcInputFormat.buildJdbcInputFormat()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:persons")
.setQuery("select name, age from persons")
Expand Down Expand Up @@ -1189,7 +1189,7 @@ DataSet<Tuple3<String, Integer, Double>> myResult = [...]
// write Tuple DataSet to a relational database
myResult.output(
// build and configure OutputFormat
JDBCOutputFormat.buildJDBCOutputFormat()
JdbcOutputFormat.buildJdbcOutputFormat()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:persons")
.setQuery("insert into persons (name, age, height) values (?,?,?)")
Expand Down
6 changes: 3 additions & 3 deletions docs/dev/connectors/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ To use it, add the following dependency to your project (along with your JDBC-dr
{% highlight xml %}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc{{ site.scala_version_suffix }}</artifactId>
<artifactId>flink-connector-jdbc{{ site.scala_version_suffix }}</artifactId>
<version>{{site.version }}</version>
</dependency>
{% endhighlight %}
Expand All @@ -78,7 +78,7 @@ Example usage:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromElements(...)
.addSink(JdbcFacade.sink(
.addSink(JdbcSink.sink(
"insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
(ps, t) -> {
ps.setInt(1, t.id);
Expand All @@ -94,4 +94,4 @@ env
env.execute();
{% endhighlight %}

Please refer to the [API documentation]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/api/java/io/jdbc/JdbcSink.html) for more details.
Please refer to the [API documentation]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/jdbc/JdbcSink.html) for more details.
6 changes: 3 additions & 3 deletions docs/dev/connectors/jdbc.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ To use it, add the following dependency to your project (along with your JDBC-dr
{% highlight xml %}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc{{ site.scala_version_suffix }}</artifactId>
<artifactId>flink-connector-jdbc{{ site.scala_version_suffix }}</artifactId>
<version>{{site.version }}</version>
</dependency>
{% endhighlight %}
Expand All @@ -78,7 +78,7 @@ Example usage:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromElements(...)
.addSink(JdbcFacade.sink(
.addSink(JdbcSink.sink(
"insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
(ps, t) -> {
ps.setInt(1, t.id);
Expand All @@ -94,4 +94,4 @@ env
env.execute();
{% endhighlight %}

Please refer to the [API documentation]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/api/java/io/jdbc/JdbcSink.html) for more details.
Please refer to the [API documentation]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/jdbc/JdbcSink.html) for more details.
18 changes: 9 additions & 9 deletions docs/dev/table/catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ Catalog greatly simplifies steps required to get started with Flink with users'

The `GenericInMemoryCatalog` is an in-memory implementation of a catalog. All objects will be available only for the lifetime of the session.

### JDBCCatalog
### JdbcCatalog

The `JDBCCatalog` enables users to connect Flink to relational databases over JDBC protocol.
The `JdbcCatalog` enables users to connect Flink to relational databases over JDBC protocol.

#### PostgresCatalog

`PostgresCatalog` is the only implementation of JDBC Catalog at the moment.

#### Usage of JDBCCatalog
#### Usage of JdbcCatalog

Set a `JDBCatalog` with the following parameters:
Set a `JdbcCatalog` with the following parameters:

- name: required, name of the catalog
- default database: required, default database to connect to
Expand All @@ -72,10 +72,10 @@ String username = "...";
String password = "...";
String baseUrl = "..."

JDBCCatalog catalog = new JDBCCatalog(name, defaultDatabase, username, password, baseUrl);
JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
tableEnv.registerCatalog("mypg", catalog);

// set the JDBCCatalog as the current catalog of the session
// set the JdbcCatalog as the current catalog of the session
tableEnv.useCatalog("mypg");
{% endhighlight %}
</div>
Expand All @@ -91,10 +91,10 @@ val username = "..."
val password = "..."
val baseUrl = "..."

val catalog = new JDBCCatalog(name, defaultDatabase, username, password, baseUrl)
val catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl)
tableEnv.registerCatalog("mypg", catalog)

// set the JDBCCatalog as the current catalog of the session
// set the JdbcCatalog as the current catalog of the session
tableEnv.useCatalog("mypg")
{% endhighlight %}
</div>
Expand All @@ -117,7 +117,7 @@ USE CATALOG mypg;
execution:
planner: blink
...
current-catalog: mypg # set the JDBCCatalog as the current catalog of the session
current-catalog: mypg # set the JdbcCatalog as the current catalog of the session
current-database: mydb

catalogs:
Expand Down
18 changes: 9 additions & 9 deletions docs/dev/table/catalogs.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ Catalog 提供了元数据信息,例如数据库、表、分区、视图以及

`GenericInMemoryCatalog` 是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。

### JDBCCatalog
### JdbcCatalog

The `JDBCCatalog` enables users to connect Flink to relational databases over JDBC protocol.
The `JdbcCatalog` enables users to connect Flink to relational databases over JDBC protocol.

#### PostgresCatalog

`PostgresCatalog` is the only implementation of JDBC Catalog at the moment.

#### Usage of JDBCCatalog
#### Usage of JdbcCatalog

Set a `JDBCatalog` with the following parameters:
Set a `Jdbcatalog` with the following parameters:

- name: required, name of the catalog
- default database: required, default database to connect to
Expand All @@ -68,10 +68,10 @@ String username = "...";
String password = "...";
String baseUrl = "..."

JDBCCatalog catalog = new JDBCCatalog(name, defaultDatabase, username, password, baseUrl);
JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
tableEnv.registerCatalog("mypg", catalog);

// set the JDBCCatalog as the current catalog of the session
// set the JdbcCatalog as the current catalog of the session
tableEnv.useCatalog("mypg");
{% endhighlight %}
</div>
Expand All @@ -87,10 +87,10 @@ val username = "..."
val password = "..."
val baseUrl = "..."

val catalog = new JDBCCatalog(name, defaultDatabase, username, password, baseUrl)
val catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl)
tableEnv.registerCatalog("mypg", catalog)

// set the JDBCCatalog as the current catalog of the session
// set the JdbcCatalog as the current catalog of the session
tableEnv.useCatalog("mypg")
{% endhighlight %}
</div>
Expand All @@ -113,7 +113,7 @@ USE CATALOG mypg;
execution:
planner: blink
...
current-catalog: mypg # set the JDBCCatalog as the current catalog of the session
current-catalog: mypg # set the JdbcCatalog as the current catalog of the session
current-database: mydb

catalogs:
Expand Down
62 changes: 2 additions & 60 deletions docs/dev/table/connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ The following tables list all available connectors and formats. Their mutual com
| Apache Kafka | 0.11 | `flink-connector-kafka-0.11` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-sql-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}.jar) |
| Apache Kafka | 0.11+ (`universal`) | `flink-connector-kafka` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka{{site.scala_version_suffix}}/{{site.version}}/flink-sql-connector-kafka{{site.scala_version_suffix}}-{{site.version}}.jar) |
| HBase | 1.4.3 | `flink-hbase` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-hbase{{site.scala_version_suffix}}/{{site.version}}/flink-hbase{{site.scala_version_suffix}}-{{site.version}}.jar) |
| JDBC | | `flink-jdbc` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-jdbc{{site.scala_version_suffix}}/{{site.version}}/flink-jdbc{{site.scala_version_suffix}}-{{site.version}}.jar) |
| JDBC | | `flink-connector-jdbc` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc{{site.scala_version_suffix}}/{{site.version}}/flink-connector-jdbc{{site.scala_version_suffix}}-{{site.version}}.jar) |

### Formats

Expand Down Expand Up @@ -1281,7 +1281,7 @@ To use JDBC connector, need to choose an actual driver to use. Here are drivers

**Catalog**

JDBC Connector can be used together with [`JDBCCatalog`]({{ site.baseurl }}/dev/table/catalogs.html#jdbccatalog) to greatly simplify development effort and improve user experience.
JDBC Connector can be used together with [`JdbcCatalog`]({{ site.baseurl }}/dev/table/catalogs.html#jdbccatalog) to greatly simplify development effort and improve user experience.

<br/>

Expand Down Expand Up @@ -2004,7 +2004,6 @@ These are the additional `TableSink`s which are provided with Flink:

| **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | **Description**
| `CsvTableSink` | `flink-table` | Y | Append | A simple sink for CSV files.
| `JDBCAppendTableSink` | `flink-jdbc` | Y | Append | Writes a Table to a JDBC table.
| `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append | Writes a Table to a Cassandra table.

### OrcTableSource
Expand Down Expand Up @@ -2130,63 +2129,6 @@ table.insert_into("csvOutputTable")
</div>
</div>

### JDBCAppendTableSink

The `JDBCAppendTableSink` emits a `Table` to a JDBC connection. The sink only supports append-only streaming tables. It cannot be used to emit a `Table` that is continuously updated. See the [documentation on Table to Stream conversions](./streaming/dynamic_tables.html#table-to-stream-conversion) for details.

The `JDBCAppendTableSink` inserts each `Table` row at least once into the database table (if checkpointing is enabled). However, you can specify the insertion query using <code>REPLACE</code> or <code>INSERT OVERWRITE</code> to perform upsert writes to the database.

To use the JDBC sink, you have to add the JDBC connector dependency (<code>flink-jdbc</code>) to your project. Then you can create the sink using <code>JDBCAppendSinkBuilder</code>:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}

JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:ebookshop")
.setQuery("INSERT INTO books (id) VALUES (?)")
.setParameterTypes(INT_TYPE_INFO)
.build();

tableEnv.registerTableSink(
"jdbcOutputTable",
// specify table schema
new String[]{"id"},
new TypeInformation[]{Types.INT},
sink);

Table table = ...
table.insertInto("jdbcOutputTable");
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:ebookshop")
.setQuery("INSERT INTO books (id) VALUES (?)")
.setParameterTypes(INT_TYPE_INFO)
.build()

tableEnv.registerTableSink(
"jdbcOutputTable",
// specify table schema
Array[String]("id"),
Array[TypeInformation[_]](Types.INT),
sink)

val table: Table = ???
table.insertInto("jdbcOutputTable")
{% endhighlight %}
</div>
</div>

Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify the name of the JDBC driver, the JDBC URL, the query to be executed, and the field types of the JDBC table.

{% top %}

### CassandraAppendTableSink

The `CassandraAppendTableSink` emits a `Table` to a Cassandra table. The sink only supports append-only streaming tables. It cannot be used to emit a `Table` that is continuously updated. See the [documentation on Table to Stream conversions](./streaming/dynamic_tables.html#table-to-stream-conversion) for details.
Expand Down
62 changes: 2 additions & 60 deletions docs/dev/table/connect.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ The following tables list all available connectors and formats. Their mutual com
| Apache Kafka | 0.11 | `flink-connector-kafka-0.11` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-sql-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}.jar) |
| Apache Kafka | 0.11+ (`universal`) | `flink-connector-kafka` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka{{site.scala_version_suffix}}/{{site.version}}/flink-sql-connector-kafka{{site.scala_version_suffix}}-{{site.version}}.jar) |
| HBase | 1.4.3 | `flink-hbase` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-hbase{{site.scala_version_suffix}}/{{site.version}}/flink-hbase{{site.scala_version_suffix}}-{{site.version}}.jar) |
| JDBC | | `flink-jdbc` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-jdbc{{site.scala_version_suffix}}/{{site.version}}/flink-jdbc{{site.scala_version_suffix}}-{{site.version}}.jar) |
| JDBC | | `flink-connector-jdbc` | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc{{site.scala_version_suffix}}/{{site.version}}/flink-connector-jdbc{{site.scala_version_suffix}}-{{site.version}}.jar) |

### Formats

Expand Down Expand Up @@ -1258,7 +1258,7 @@ To use JDBC connector, need to choose an actual driver to use. Here are drivers

**Catalog**

JDBC Connector can be used together with [`JDBCCatalog`]({{ site.baseurl }}/dev/table/catalogs.html#jdbccatalog) to greatly simplify development effort and improve user experience.
JDBC Connector can be used together with [`JdbcCatalog`]({{ site.baseurl }}/dev/table/catalogs.html#jdbccatalog) to greatly simplify development effort and improve user experience.

<br/>

Expand Down Expand Up @@ -1982,7 +1982,6 @@ These are the additional `TableSink`s which are provided with Flink:

| **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | **Description**
| `CsvTableSink` | `flink-table` | Y | Append | A simple sink for CSV files.
| `JDBCAppendTableSink` | `flink-jdbc` | Y | Append | Writes a Table to a JDBC table.
| `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append | Writes a Table to a Cassandra table.

### OrcTableSource
Expand Down Expand Up @@ -2108,63 +2107,6 @@ table.insert_into("csvOutputTable")
</div>
</div>

### JDBCAppendTableSink

The `JDBCAppendTableSink` emits a `Table` to a JDBC connection. The sink only supports append-only streaming tables. It cannot be used to emit a `Table` that is continuously updated. See the [documentation on Table to Stream conversions](./streaming/dynamic_tables.html#table-to-stream-conversion) for details.

The `JDBCAppendTableSink` inserts each `Table` row at least once into the database table (if checkpointing is enabled). However, you can specify the insertion query using <code>REPLACE</code> or <code>INSERT OVERWRITE</code> to perform upsert writes to the database.

To use the JDBC sink, you have to add the JDBC connector dependency (<code>flink-jdbc</code>) to your project. Then you can create the sink using <code>JDBCAppendSinkBuilder</code>:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}

JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:ebookshop")
.setQuery("INSERT INTO books (id) VALUES (?)")
.setParameterTypes(INT_TYPE_INFO)
.build();

tableEnv.registerTableSink(
"jdbcOutputTable",
// specify table schema
new String[]{"id"},
new TypeInformation[]{Types.INT},
sink);

Table table = ...
table.insertInto("jdbcOutputTable");
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:ebookshop")
.setQuery("INSERT INTO books (id) VALUES (?)")
.setParameterTypes(INT_TYPE_INFO)
.build()

tableEnv.registerTableSink(
"jdbcOutputTable",
// specify table schema
Array[String]("id"),
Array[TypeInformation[_]](Types.INT),
sink)

val table: Table = ???
table.insertInto("jdbcOutputTable")
{% endhighlight %}
</div>
</div>

Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify the name of the JDBC driver, the JDBC URL, the query to be executed, and the field types of the JDBC table.

{% top %}

### CassandraAppendTableSink

The `CassandraAppendTableSink` emits a `Table` to a Cassandra table. The sink only supports append-only streaming tables. It cannot be used to emit a `Table` that is continuously updated. See the [documentation on Table to Stream conversions](./streaming/dynamic_tables.html#table-to-stream-conversion) for details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ under the License.
<relativePath>..</relativePath>
</parent>

<artifactId>flink-jdbc_${scala.binary.version}</artifactId>
<name>flink-jdbc</name>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<name>flink-connector-jdbc</name>

<packaging>jar</packaging>

Expand Down
Loading

0 comments on commit 6a6a439

Please sign in to comment.