Skip to content

Commit

Permalink
[FLINK-9885] [elasticsearch] Major cleanup to finalize Elasticsearch …
Browse files Browse the repository at this point in the history
…6.x connector

This closes apache#6391.
  • Loading branch information
tzulitai committed Aug 1, 2018
1 parent 01adb69 commit abbd6b0
Show file tree
Hide file tree
Showing 26 changed files with 703 additions and 395 deletions.
192 changes: 166 additions & 26 deletions docs/dev/connectors/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,23 @@ The example below shows how to configure and create a sink:
<div class="codetabs" markdown="1">
<div data-lang="java, Elasticsearch 1.x" markdown="1">
{% highlight java %}
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

DataStream<String> input = ...;

Map<String, String> config = new HashMap<>();
Expand Down Expand Up @@ -115,6 +132,22 @@ input.addSink(new ElasticsearchSink<>(config, transportAddresses, new Elasticsea
</div>
<div data-lang="java, Elasticsearch 2.x / 5.x" markdown="1">
{% highlight java %}
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

DataStream<String> input = ...;

Map<String, String> config = new HashMap<>();
Expand Down Expand Up @@ -145,31 +178,83 @@ input.addSink(new ElasticsearchSink<>(config, transportAddresses, new Elasticsea
</div>
<div data-lang="java, Elasticsearch 6.x" markdown="1">
{% highlight java %}
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;

import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

DataStream<String> input = ...;

List<HttpHost> httpHost = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));

input.addSink(new ElasticsearchSink<>(httpHosts, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);

return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}

@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);

return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}

@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
}));{% endhighlight %}
);

// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
builder.setBulkFlushMaxActions(1);

// provide a RestClientFactory for custom configuration on the internally created REST client
builder.setRestClientBuilder(
restClientBuilder -> {
restClientBuilder.setDefaultHeaders(...)
restClientBuilder.setMaxRetryTimeoutMillis(...)
restClientBuilder.setPathPrefix(...)
restClientBuilder.setHttpClientConfigCallback(...)
}
);

// finally, build and add the sink to the job's pipeline
input.addSink(esSinkBuilder.build());
{% endhighlight %}
</div>
<div data-lang="scala, Elasticsearch 1.x" markdown="1">
{% highlight scala %}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer

import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.common.transport.TransportAddress

import java.net.InetAddress
import java.util.ArrayList
import java.util.HashMap
import java.util.List
import java.util.Map

val input: DataStream[String] = ...

val config = new java.util.HashMap[String, String]
Expand All @@ -196,6 +281,22 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc
</div>
<div data-lang="scala, Elasticsearch 2.x / 5.x" markdown="1">
{% highlight scala %}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink

import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests

import java.net.InetAddress
import java.net.InetSocketAddress
import java.util.ArrayList
import java.util.HashMap
import java.util.List
import java.util.Map

val input: DataStream[String] = ...

val config = new java.util.HashMap[String, String]
Expand All @@ -222,33 +323,72 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc
</div>
<div data-lang="scala, Elasticsearch 6.x" markdown="1">
{% highlight scala %}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink

import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests

import java.util.ArrayList
import java.util.List

val input: DataStream[String] = ...

val httpHosts = new java.util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("127.0.0.1", 9300, "http"))
httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))

input.addSink(new ElasticsearchSink(httpHosts, new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)

return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json)
val esSinkBuilder = new ElasticsearchSink.Builer[String](
httpHosts,
new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)

return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json)
}
}
}))
)

// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
builder.setBulkFlushMaxActions(1)

// provide a RestClientFactory for custom configuration on the internally created REST client
builder.setRestClientBuilder(
restClientBuilder -> {
restClientBuilder.setDefaultHeaders(...)
restClientBuilder.setMaxRetryTimeoutMillis(...)
restClientBuilder.setPathPrefix(...)
restClientBuilder.setHttpClientConfigCallback(...)
}
)

// finally, build and add the sink to the job's pipeline
input.addSink(esSinkBuilder.build)
{% endhighlight %}
</div>
</div>

Note how `TransportClient` based version use a `Map` of `String`s is used to configure the `ElasticsearchSink`.
For Elasticsearch versions that still uses the now deprecated `TransportClient` to communicate
with the Elasticsearch cluster (i.e., versions equal or below 5.x), note how a `Map` of `String`s
is used to configure the `ElasticsearchSink`. This config map will be directly
forwarded when creating the internally used `TransportClient`.
The configuration keys are documented in the Elasticsearch documentation
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
Especially important is the `cluster.name` parameter that must correspond to
the name of your cluster.

For Elasticsearch 6.x and above, internally, the `RestHighLevelClient` is used for cluster communication.
By default, the connector uses the default configurations for the REST client. To have custom
configuration for the REST client, users can provide a `RestClientFactory` implementation when
setting up the `ElasticsearchClient.Builder` that builds the sink.

Also note that the example only demonstrates performing a single index
request for each incoming element. Generally, the `ElasticsearchSinkFunction`
can be used to perform multiple requests of different types (ex.,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.util.Map;

Expand All @@ -36,27 +37,36 @@
* <p>Implementations are allowed to be stateful. For example, for Elasticsearch 1.x, since connecting via an embedded node
* is allowed, the call bridge will hold reference to the created embedded node. Each instance of the sink will hold
* exactly one instance of the call bridge, and state cleanup is performed when the sink is closed.
*
* @param <C> The Elasticsearch client, that implements {@link AutoCloseable}.
*/
@Internal
public abstract class ElasticsearchApiCallBridge implements Serializable {
public interface ElasticsearchApiCallBridge<C extends AutoCloseable> extends Serializable {

/**
* Creates an Elasticsearch client implementing {@link AutoCloseable}.
*
* @param clientConfig The configuration to use when constructing the client.
* @return The created client.
*/
public abstract AutoCloseable createClient(Map<String, String> clientConfig);
C createClient(Map<String, String> clientConfig) throws IOException;

public abstract BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener);
/**
* Creates a {@link BulkProcessor.Builder} for creating the bulk processor.
*
* @param client the Elasticsearch client.
* @param listener the bulk processor listender.
* @return the bulk processor builder.
*/
BulkProcessor.Builder createBulkProcessorBuilder(C client, BulkProcessor.Listener listener);

/**
* Extracts the cause of failure of a bulk item action.
*
* @param bulkItemResponse the bulk item response to extract cause of failure
* @return the extracted {@link Throwable} from the response ({@code null} is the response is successful).
*/
public abstract @Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
@Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);

/**
* Set backoff-related configurations on the provided {@link BulkProcessor.Builder}.
Expand All @@ -65,14 +75,14 @@ public abstract class ElasticsearchApiCallBridge implements Serializable {
* @param builder the {@link BulkProcessor.Builder} to configure.
* @param flushBackoffPolicy user-provided backoff retry settings ({@code null} if the user disabled backoff retries).
*/
public abstract void configureBulkProcessorBackoff(
void configureBulkProcessorBackoff(
BulkProcessor.Builder builder,
@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy);

/**
* Perform any necessary state cleanup.
*/
public void cleanup() {
default void cleanup() {
// nothing to cleanup by default
}

Expand Down
Loading

0 comments on commit abbd6b0

Please sign in to comment.