Skip to content

Commit

Permalink
[FLINK-5122] [elasticsearch] Retry temporary Elasticsearch request er…
Browse files Browse the repository at this point in the history
…rors.

Covered exceptions are: Timeouts, No Master, UnavailableShardsException, bulk queue on node full
  • Loading branch information
static-max authored and tzulitai committed Feb 24, 2017
1 parent 0ba08b4 commit aaac7c2
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 4 deletions.
5 changes: 5 additions & 0 deletions docs/dev/connectors/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String
The difference is that now we do not need to provide a list of addresses
of Elasticsearch nodes.

Optionally, the sink can try to re-execute the bulk request when the error
message matches certain patterns indicating a timeout or a overloaded cluster.
This behaviour is disabled by default and can be enabled by setting `checkErrorAndRetryBulk(true)`.


More information about Elasticsearch can be found [here](https://elastic.co).

#### Packaging the Elasticsearch Connector into an Uber-Jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.InstantiationUtil;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -85,6 +88,13 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
/** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */
private transient BulkProcessorIndexer requestIndexer;

/**
* When set to <code>true</code> and the bulk action fails, the error message will be checked for
* common patterns like <i>timeout</i>, <i>UnavailableShardsException</i> or a full buffer queue on the node.
* When a matching pattern is found, the bulk will be retried.
*/
protected boolean checkErrorAndRetryBulk = false;

// ------------------------------------------------------------------------
// Internals for the Flink Elasticsearch Sink
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -165,20 +175,49 @@ public void beforeBulk(long executionId, BulkRequest request) { }
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
boolean allRequestsRepeatable = true;

for (BulkItemResponse itemResp : response.getItems()) {
Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
if (failure != null) {
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
failureThrowable.compareAndSet(null, failure);
String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();

// Check if index request can be retried
if (checkErrorAndRetryBulk && (
failureMessageLowercase.contains("timeout") ||
failureMessageLowercase.contains("timed out") || // Generic timeout errors
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) || // Shard not available due to rebalancing or node down
(failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")))) // Bulk index queue on node full
{
LOG.debug("Retry bulk: {}", itemResp.getFailureMessage());
} else {
// Cannot retry action
allRequestsRepeatable = false;
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
failureThrowable.compareAndSet(null, failure);
}
}
}

if (allRequestsRepeatable) {
reAddBulkRequest(request);
}
}
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
failureThrowable.compareAndSet(null, failure);
if (checkErrorAndRetryBulk && (
failure instanceof ClusterBlockException // Examples: "no master"
|| failure instanceof ElasticsearchTimeoutException) // ElasticsearchTimeoutException sounded good, not seen in stress tests yet
)
{
LOG.debug("Retry bulk on throwable: {}", failure.getMessage());
reAddBulkRequest(request);
} else {
LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
failureThrowable.compareAndSet(null, failure);
}
}
}
);
Expand Down Expand Up @@ -228,6 +267,21 @@ public void close() throws Exception {
checkErrorAndRethrow();
}

/**
* Adds all requests of the bulk to the BulkProcessor. Used when trying again.
* @param bulkRequest
*/
public void reAddBulkRequest(BulkRequest bulkRequest) {
//TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs.

for (IndicesRequest req : bulkRequest.subRequests()) {
if (req instanceof ActionRequest) {
// There is no waiting time between index requests, so this may produce additional pressure on cluster
bulkProcessor.add((ActionRequest<?>) req);
}
}
}

private void checkErrorAndRethrow() {
Throwable cause = failureThrowable.get();
if (cause != null) {
Expand Down

0 comments on commit aaac7c2

Please sign in to comment.