Skip to content

Commit

Permalink
[FLINK-3875] [connectors] Add an upsert table sink factory for Elasti…
Browse files Browse the repository at this point in the history
…csearch

This commit adds full support for Elasticsearch to be used with Table & SQL API as well as SQL Client.

It includes:
- Elasticsearch 6 upsert table sink (for append-only and updating queries)
- Elasticsearch 6 table factory
- Elasticsearch table descriptors & validators
- Unit tests, SQL Client end-to-end test
- Website documentation

This closes apache#6611.
  • Loading branch information
twalthr committed Oct 1, 2018
1 parent d4129c5 commit 10f9f1d
Show file tree
Hide file tree
Showing 26 changed files with 2,795 additions and 28 deletions.
106 changes: 106 additions & 0 deletions docs/dev/table/connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ The following table list all available connectors and formats. Their mutual comp
| Name | Version | Maven dependency | SQL Client JAR |
| :---------------- | :------------ | :--------------------------- | :----------------------|
| Filesystem | | Built-in | Built-in |
| Elasticsearch | 6 | `flink-connector-elasticsearch6` | [Download](http:https://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
| Apache Kafka | 0.8 | `flink-connector-kafka-0.8` | Not available |
| Apache Kafka | 0.9 | `flink-connector-kafka-0.9` | [Download](http:https://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
| Apache Kafka | 0.10 | `flink-connector-kafka-0.10` | [Download](http:https://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
Expand Down Expand Up @@ -588,6 +589,111 @@ Make sure to add the version-specific Kafka dependency. In addition, a correspon

{% top %}

### Elasticsearch Connector

<span class="label label-primary">Sink: Streaming Append Mode</span>
<span class="label label-primary">Sink: Streaming Upsert Mode</span>
<span class="label label-info">Format: JSON-only</span>

The Elasticsearch connector allows for writing into an index of the Elasticsearch search engine.

The connector can operate in [upsert mode](#update-modes) for exchanging UPSERT/DELETE messages with the external system using a [key defined by the query](streaming.html#table-to-stream-conversion).

For append-only queries, the connector can also operate in [append mode](#update-modes) for exchanging only INSERT messages with the external system. If no key is defined by the query, a key is automatically generated by Elasticsearch.

The connector can be defined as follows:

<div class="codetabs" markdown="1">
<div data-lang="Java/Scala" markdown="1">
{% highlight java %}
.connect(
new Elasticsearch()
.version("6") // required: valid connector versions are "6"
.host("localhost", 9200, "http") // required: one or more Elasticsearch hosts to connect to
.index("MyUsers") // required: Elasticsearch index
.documentType("user") // required: Elasticsearch document type

.keyDelimiter("$") // optional: delimiter for composite keys ("_" by default)
// e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
.keyNullLiteral("n/a") // optional: representation for null fields in keys ("null" by default)

// optional: failure handling strategy in case a request to Elasticsearch fails (fail by default)
.failureHandlerFail() // optional: throws an exception if a request fails and causes a job failure
.failureHandlerIgnore() // or ignores failures and drops the request
.failureHandlerRetryRejected() // or re-adds requests that have failed due to queue capacity saturation
.failureHandlerCustom(...) // or custom failure handling with a ActionRequestFailureHandler subclass

// optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
.disableFlushOnCheckpoint() // optional: disables flushing on checkpoint (see notes below!)
.bulkFlushMaxActions(42) // optional: maximum number of actions to buffer for each bulk request
.bulkFlushMaxSize("42 mb") // optional: maximum size of buffered actions in bytes per bulk request
// (only MB granularity is supported)
.bulkFlushInterval(60000L) // optional: bulk flush interval (in milliseconds)

.bulkFlushBackoffConstant() // optional: use a constant backoff type
.bulkFlushBackoffExponential() // or use an exponential backoff type
.bulkFlushBackoffMaxRetries(3) // optional: maximum number of retries
.bulkFlushBackoffDelay(30000L) // optional: delay between each backoff attempt (in milliseconds)

// optional: connection properties to be used during REST communication to Elasticsearch
.connectionMaxRetryTimeout(3) // optional: maximum timeout (in milliseconds) between retries
.connectionPathPrefix("/v1") // optional: prefix string to be added to every REST communication
)
{% endhighlight %}
</div>

<div data-lang="YAML" markdown="1">
{% highlight yaml %}
connector:
type: elasticsearch
version: 6 # required: valid connector versions are "6"
hosts: # required: one or more Elasticsearch hosts to connect to
- hostname: "localhost"
port: 9200
protocol: "http"
index: "MyUsers" # required: Elasticsearch index
document-type: "user" # required: Elasticsearch document type

key-delimiter: "$" # optional: delimiter for composite keys ("_" by default)
# e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
key-null-literal: "n/a" # optional: representation for null fields in keys ("null" by default)

# optional: failure handling strategy in case a request to Elasticsearch fails ("fail" by default)
failure-handler: ... # valid strategies are "fail" (throws an exception if a request fails and
# thus causes a job failure), "ignore" (ignores failures and drops the request),
# "retry-rejected" (re-adds requests that have failed due to queue capacity
# saturation), or "custom" for failure handling with a
# ActionRequestFailureHandler subclass

# optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
flush-on-checkpoint: true # optional: disables flushing on checkpoint (see notes below!) ("true" by default)
bulk-flush:
max-actions: 42 # optional: maximum number of actions to buffer for each bulk request
max-size: 42 mb # optional: maximum size of buffered actions in bytes per bulk request
# (only MB granularity is supported)
interval: 60000 # optional: bulk flush interval (in milliseconds)
back-off: # optional: backoff strategy ("disabled" by default)
type: ... # valid strategis are "disabled", "constant", or "exponential"
max-retries: 3 # optional: maximum number of retries
delay: 30000 # optional: delay between each backoff attempt (in milliseconds)

# optional: connection properties to be used during REST communication to Elasticsearch
connection-max-retry-timeout: 3 # optional: maximum timeout (in milliseconds) between retries
connection-path-prefix: "/v1" # optional: prefix string to be added to every REST communication
{% endhighlight %}
</div>
</div>

**Bulk flushing:** For more information about characteristics of the optional flushing parameters see the [corresponding low-level documentation]({{ site.baseurl }}/dev/connectors/elasticsearch.html).

**Disabling flushing on checkpoint:** When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests.

**Key extraction:** Flink automatically extracts valid keys from a query. For example, a query `SELECT a, b, c FROM t GROUP BY a, b` defines a composite key of the fields `a` and `b`. The Elasticsearch connector generates a document ID string for every row by concatenating all key fields in the order defined in the query using a key delimiter. A custom representation of null literals for key fields can be defined.

<span class="label label-danger">Attention</span> A JSON format defines how to encode documents for the external system, therefore, it must be added as a [dependency](connect.html#formats).

{% top %}

Table Formats
-------------

Expand Down
27 changes: 27 additions & 0 deletions flink-connectors/flink-connector-elasticsearch-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ under the License.
</exclusions>
</dependency>

<!-- Used for the Elasticsearch table sink. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<!-- Projects depending on this project, won't depend on flink-table. -->
<optional>true</optional>
</dependency>

<!-- test dependencies -->

<dependency>
Expand All @@ -95,6 +105,23 @@ under the License.
<type>test-jar</type>
</dependency>

<!-- Elasticsearch table descriptor testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- Elasticsearch table sink factory testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Loading

0 comments on commit 10f9f1d

Please sign in to comment.