Skip to content

Commit

Permalink
[FLINK-5487] [elasticsearch] At-least-once Elasticsearch Sink
Browse files Browse the repository at this point in the history
This closes apache#3358.
  • Loading branch information
tzulitai committed Feb 24, 2017
1 parent 3743e89 commit 2437da6
Show file tree
Hide file tree
Showing 13 changed files with 935 additions and 117 deletions.
94 changes: 78 additions & 16 deletions docs/dev/connectors/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,41 @@ This will buffer elements before sending them in bulk to the cluster. The `BulkP
executes bulk requests one at a time, i.e. there will be no two concurrent
flushes of the buffered actions in progress.

### Elasticsearch Sinks and Fault Tolerance

With Flink’s checkpointing enabled, the Flink Elasticsearch Sink guarantees
at-least-once delivery of action requests to Elasticsearch clusters. It does
so by waiting for all pending action requests in the `BulkProcessor` at the
time of checkpoints. This effectively assures that all requests before the
checkpoint was triggered have been successfully acknowledged by Elasticsearch, before
proceeding to process more records sent to the sink.

More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{site.baseurl}}/internals/stream_checkpointing.html).

To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
{% endhighlight %}
</div>
</div>

<p style="border-radius: 5px; padding: 5px" class="bg-danger">
<b>NOTE</b>: Users can disable flushing if they wish to do so, by calling
<b>disableFlushOnCheckpoint()</b> on the created <b>ElasticsearchSink</b>. Be aware
that this essentially means the sink will not provide any strong
delivery guarantees anymore, even with checkpoint for the topology enabled.
</p>

### Communication using Embedded Node (only for Elasticsearch 1.x)

For Elasticsearch versions 1.x, communication using an embedded node is
Expand Down Expand Up @@ -293,19 +328,20 @@ input.addSink(new ElasticsearchSink<>(
new ElasticsearchSinkFunction<String>() {...},
new ActionRequestFailureHandler() {
@Override
boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
// this example uses Apache Commons to search for nested exceptions

if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) {
void onFailure(ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer) throw Throwable {

if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
// full queue; re-add document for indexing
indexer.add(action);
return false;
} else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) {
} else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
// malformed document; simply drop request without failing sink
return false;
} else {
// for all other failures, fail the sink
return true;
// here the failure is simply rethrown, but users can also choose to throw custom exceptions
throw failure;
}
}
}));
Expand All @@ -319,19 +355,21 @@ input.addSink(new ElasticsearchSink(
config, transportAddresses,
new ElasticsearchSinkFunction[String] {...},
new ActionRequestFailureHandler {
override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
// this example uses Apache Commons to search for nested exceptions
@throws(classOf[Throwable])
override def onFailure(ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer) {

if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) {
if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
// full queue; re-add document for indexing
indexer.add(action)
return false
} else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) {
} else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
// malformed document; simply drop request without failing sink
return false
} else {
// for all other failures, fail the sink
return true
// here the failure is simply rethrown, but users can also choose to throw custom exceptions
throw failure
}
}
}))
Expand All @@ -349,7 +387,31 @@ Note that `onFailure` is called for failures that still occur only after the
By default, the `BulkProcessor` retries to a maximum of 8 attempts with
an exponential backoff. For more information on the behaviour of the
internal `BulkProcessor` and how to configure it, please see the following section.


By default, if a failure handler is not provided, the sink uses a
`NoOpFailureHandler` that simply fails for all kinds of exceptions. The
connector also provides a `RetryRejectedExecutionFailureHandler` implementation
that always re-add requests that have failed due to queue capacity saturation.

<p style="border-radius: 5px; padding: 5px" class="bg-danger">
<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b>
on failures will lead to longer checkpoints, as the sink will also
need to wait for the re-added requests to be flushed when checkpointing.
For example, when using <b>RetryRejectedExecutionFailureHandler</b>, checkpoints
will need to wait until Elasticsearch node queues have enough capacity for
all the pending requests. This also means that if re-added requests never
succeed, the checkpoint will never finish.
</p>

<p style="border-radius: 5px; padding: 5px" class="bg-warning">
<b>Failure handling for Elasticsearch 1.x</b>: For Elasticsearch 1.x, it
is not feasible to match the type of the failure because the exact type
could not be retrieved through the older version Java client APIs (thus,
the types will be general <b>Exception</b>s and only differ in the
failure message). In this case, it is recommended to match on the
provided REST status code.
</p>

### Configuring the Internal Bulk Processor

The internal `BulkProcessor` can be further configured for its behaviour
Expand Down
8 changes: 8 additions & 0 deletions flink-connectors/flink-connector-elasticsearch-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.10</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

/**
* An implementation of {@link ActionRequestFailureHandler} is provided by the user to define how failed
* {@link ActionRequest ActionRequests} should be handled, ex. dropping them, reprocessing malformed documents, or
* {@link ActionRequest ActionRequests} should be handled, e.g. dropping them, reprocessing malformed documents, or
* simply requesting them to be sent to Elasticsearch again if the failure is only temporary.
*
* <p>
Expand All @@ -34,19 +34,16 @@
* private static class ExampleActionRequestFailureHandler implements ActionRequestFailureHandler {
*
* @Override
* boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
* // this example uses Apache Commons to search for nested exceptions
*
* if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) {
* void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
* if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
* // full queue; re-add document for indexing
* indexer.add(action);
* return false;
* } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) {
* } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
* // malformed document; simply drop request without failing sink
* return false;
* } else {
* // for all other failures, fail the sink
* return true;
* // for all other failures, fail the sink;
* // here the failure is simply rethrown, but users can also choose to throw custom exceptions
* throw failure;
* }
* }
* }
Expand All @@ -56,6 +53,11 @@
* <p>
* The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests
* with malformed documents, without failing the sink. For all other failures, the sink will fail.
*
* <p>
* Note: For Elasticsearch 1.x, it is not feasible to match the type of the failure because the exact type
* could not be retrieved through the older version Java client APIs (thus, the types will be general {@link Exception}s
* and only differ in the failure message). In this case, it is recommended to match on the provided REST status code.
*/
public interface ActionRequestFailureHandler extends Serializable {

Expand All @@ -64,9 +66,12 @@ public interface ActionRequestFailureHandler extends Serializable {
*
* @param action the {@link ActionRequest} that failed due to the failure
* @param failure the cause of failure
* @param restStatusCode the REST status code of the failure (-1 if none can be retrieved)
* @param indexer request indexer to re-add the failed action, if intended to do so
* @return the implementation should return {@code true} if the sink should fail due to this failure, and {@code false} otherwise
*
* @throws Throwable if the sink should fail on this failure, the implementation should rethrow
* the exception or a custom one
*/
boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer);
void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable;

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkProcessor;

import java.util.concurrent.atomic.AtomicLong;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
* {@link ActionRequest ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
Expand All @@ -30,14 +34,21 @@ class BulkProcessorIndexer implements RequestIndexer {
private static final long serialVersionUID = 6841162943062034253L;

private final BulkProcessor bulkProcessor;
private final boolean flushOnCheckpoint;
private final AtomicLong numPendingRequestsRef;

BulkProcessorIndexer(BulkProcessor bulkProcessor) {
this.bulkProcessor = bulkProcessor;
BulkProcessorIndexer(BulkProcessor bulkProcessor, boolean flushOnCheckpoint, AtomicLong numPendingRequestsRef) {
this.bulkProcessor = checkNotNull(bulkProcessor);
this.flushOnCheckpoint = flushOnCheckpoint;
this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
}

@Override
public void add(ActionRequest... actionRequests) {
for (ActionRequest actionRequest : actionRequests) {
if (flushOnCheckpoint) {
numPendingRequestsRef.getAndIncrement();
}
this.bulkProcessor.add(actionRequest);
}
}
Expand Down
Loading

0 comments on commit 2437da6

Please sign in to comment.