Skip to content

Commit

Permalink
[FLINK-21426][docs] adds English details to jdbc sink connector
Browse files Browse the repository at this point in the history
This closes apache#14975
  • Loading branch information
sv3ndk authored and sjwiesman committed Feb 22, 2021
1 parent bd91b6c commit e3f053e
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 27 deletions.
129 changes: 104 additions & 25 deletions docs/content/docs/connectors/datastream/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,46 +28,125 @@ under the License.

This connector provides a sink that writes data to a JDBC database.

To use it, add the following dependency to your project (along with your JDBC-driver):
To use it, add the following dependency to your project (along with your JDBC driver):

{{< artifact flink-connector-jdbc withScalaVersion >}}

Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/datastream/project-configuration" >}}).

Created JDBC sink provides at-least-once guarantee.
Effectively exactly-once can be achieved using upsert statements or idempotent updates.

Example usage:
## `JdbcSink.sink`

The JDBC sink provides at-least-once guarantee.
Effectively though, exactly-once can be achieved by crafting upsert SQL statements or idempotent SQL updates.
Configuration goes as follow (see also {{< javadoc file="/api/java/org/apache/flink/connector/jdbc/JdbcSink.html" name="JdbcSink javadoc" >}}.

```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromElements(...)
.addSink(JdbcSink.sink(
"insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
(ps, t) -> {
ps.setInt(1, t.id);
ps.setString(2, t.title);
ps.setString(3, t.author);
ps.setDouble(4, t.price);
ps.setInt(5, t.qty);
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(getDbMetadata().getUrl())
.withDriverName(getDbMetadata().getDriverClass())
.build()));
env.execute();
JdbcSink.sink(
sqlDmlStatement, // mandatory
jdbcStatementBuilder, // mandatory
jdbcExecutionOptions, // optional
jdbcConnectionOptions // mandatory
);
```

### SQL DML statement and JDBC statement builder

The sink builds one [JDBC prepared statement](https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/java/sql/PreparedStatement.html) from a user-provider SQL string, e.g.:

```sql
INSERT INTO some_table field1, field2 values (?, ?)
```

It then repeatedly calls a user-provided function to update that prepared statement with each value of the stream, e.g.:

```
(preparedStatement, someRecord) -> { ... update here the preparedStatement with values from someRecord ... }
```

Please refer to the [API documentation]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/jdbc/JdbcSink.html) for more details.
### JDBC execution options

The SQL DML statements are executed in batches, which can optionally be configured with the following instance (see also {{< javadoc name="JdbcExecutionOptions javadoc" file="/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.html" >}})

```java
JdbcExecutionOptions.builder()
.withBatchIntervalMs(200) // optional: default = 0, meaning no time-based execution is done
.withBathSize(1000) // optional: default = 5000 values
.withMaxRetries(5) // optional: default = 3
.build()
```

A JDBC batch is executed as soon as one of the following condition is true:

* the configured batch interval time is elapsed
* the maximum batch size is reached
* a Flink checkpoint has started

### JDBC connection parameters

The connection to the database is configured with a `JdbcConnectionOptions` instance.
Please see {{< javadoc name="JdbcConnectionOptions javadoc" file="/api/java/org/apache/flink/connector/jdbc/JdbcConnectionOptions.html" >}}) for details

### Full example

```java
public class JdbcSinkExample {

static class Book {
public Book(Long id, String title, String authors, Integer year) {
this.id = id;
this.title = title;
this.authors = authors;
this.year = year;
}
final Long id;
final String title;
final String authors;
final Integer year;
}

public static void main(String[] args) throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();

env.fromElements(
new Book(101L, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
new Book(102L, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
new Book(103L, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
new Book(104L, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)
).addSink(
JdbcSink.sink(
"insert into books (id, title, authors, year) values (?, ?, ?, ?)",
(statement, book) -> {
statement.setLong(1, book.id);
statement.setString(2, book.title);
statement.setString(3, book.authors);
statement.setInt(4, book.year);
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql:https://dbhost:5432/postgresdb")
.withDriverName("org.postgresql.Driver")
.withUsername("someUser")
.withPassword("somePassword")
.build()
));

env.execute();
}
}
```

## Exactly-once
## `JdbcSink.exactlyOnceSink`

Since 1.13, Flink JDBC sink supports exactly-once mode. The implementation relies on the JDBC driver support of XA [standard](https://pubs.opengroup.org/onlinepubs/009680699/toc.pdf).

To use it, create a sink using `exactlyOnceSink()` method as above and additionally provide:
- [exactly-once options]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.html)
- [execution options]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.html)
- {{< javadoc name="exactly-once options" file="/api/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.html" >}}
- {{< javadoc name="execution options" file="/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.html" >}}
- [XA DataSource](https://docs.oracle.com/javase/8/docs/api/javax/sql/XADataSource.html) Supplier

```java
Expand Down
4 changes: 2 additions & 2 deletions docs/layouts/shortcodes/javadoc.html
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@
Parmeters:
- name: The rendered link name (required)
*/}}
<a href="{{ .Site.Params.JavaDoc }}">
<a href="{{ .Site.Params.JavaDoc }}/{{ .Get "file" }}">
{{ .Get "name" }}
</a>
</a>

0 comments on commit e3f053e

Please sign in to comment.