Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-5487] [elasticsearch] At-least-once ElasticsearchSink #3358

Closed
wants to merge 12 commits into from

Conversation

tzulitai
Copy link
Contributor

@tzulitai tzulitai commented Feb 20, 2017

This PR adds proper support for an at-least-once ElasticsearchSink. This is based on the pluggable error handling strategy functionality added in #3246, so only the last commit is relevant.

Like the Kafka producer, the way it works is that pending requests not yet acknowledged by Elasticsearch needs to be flushed before proceeding with the next record from upstream.
Slight difference is that for the ElasticsearchSink, since we're allowing re-adding failed requests back to the internal BulkProcessor (as part of #3246), we'll also need to wait for the re-added requests. The docs warn that if requests are re-added, it may lead to longer checkpoints since we need to wait for those too.

Flushing is enabled by default, but we provide a disableFlushOnCheckpoint method to switch it off. The docs and Javadoc of the method warns the user how this would affect at-least-once delivery.

static-max and others added 3 commits February 20, 2017 11:49
…rors.

Covered exceptions are: Timeouts, No Master, UnavailableShardsException, bulk queue on node full
…csearchSink

This commit fixes both FLINK-5353 and FLINK-5122. It allows users to implement a
failure handler to control how failed action requests are dealt with.

The commit also includes general improvements to FLINK-5122:
1. Use the built-in backoff functionality in the Elasticsearch BulkProcessor (not
available for Elasticsearch 1.x)
2. Migrate the `checkErrorAndRetryBulk` functionality to the new failure handler
Copy link
Contributor

@rmetzger rmetzger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done an initial 80% pass over the code and left some comments we need to address before we can merge the change.

need to wait for the re-added requests to be flushed when checkpointing.
This also means that if re-added requests never succeed, the checkpoint will
never finish.
</p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the docs should mention the NoOpActionRequestFailureHandler.

Also I wonder if we should offer a default RetryActionRequestFailureHandler. I suspect that many users will need that. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'll add to mention that by default, the sink uses the NoOpActionRequestFailureHandler .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a pre-implemented ActionRequestFailureHandler that re-adds requests for full queue exceptions will be nice, and useful out-of-the box for a large portion of users. Great idea!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't suggest adding a ActionRequestFailureHandler that out-of-the-box retries for all exceptions, though. That could let users easily overlook some exceptions that simply cannot be retried without custom logic (for example, malformed documents with wrong field types).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should provide a reasonable default behavior, instead of just retrying.

More information about Elasticsearch can be found [here](https://elastic.co).

#### Packaging the Elasticsearch Connector into an Uber-Jar
## Packaging the Elasticsearch Connector into an Uber-Jar
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the reworked documentation page a lot!


/**
* An implementation of {@link ActionRequestFailureHandler} is provided by the user to define how failed
* {@link ActionRequest ActionRequests} should be handled, ex. dropping them, reprocessing malformed documents, or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

public void setMaxRetryCount(int maxRetryCount) {
checkArgument(maxRetryCount > 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't 0 also an acceptable value here? If users want to disable retries entirely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, 0 should be acceptable.

}

public void setDelayMillis(long delayMillis) {
checkArgument(delayMillis > 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should accept 0 here as well, if users want to retry immediately (for whatever reason :) )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, 0 should be acceptable. Nice catches.

throw new IllegalArgumentException(
"The implementation of the provided ActionRequestFailureHandler is not serializable. " +
"The object probably contains or references non serializable fields.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a bit like duplicate code. I think adding a utility into the InstantiationUtil that is called isSerializable() would be cleaner and save some LOC.

failureThrowable.compareAndSet(null, failure);
LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);

if (failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be better if the onFailure method would not return a boolean but throw a Throwable?
This way users have more flexibility in implementing their failure handler.

For example if a failure handler is doing three retries and fails afterwards, the original exception will be thrown. If the onFailure() method can throw their own exception, you can throw a custom exception that tells the user about the three retries.

We can definitively discuss this because this change is annoying to do (docs & javadocs need to be updated).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's actually a good idea, I didn't think it that way. I would like to change it to throw a Throwable instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the use case you mentioned, that would mean the user implements a stateful ActionRequestFailureHandler, with its state being the number of failures so far, correct?

I didn't think about this too much, but I guess there shouldn't be a problem for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mh. I don't know if the use case I've mentioned makes a lot of sense. Probably most of the users just want to use a custom logic to decide how to do the retries / discards.

I think we shouldn't do complicated things like checkpointing the state of the failure handler. Its good enough if the user keeps it locally (and loses it on failure)

Copy link
Contributor Author

@tzulitai tzulitai Feb 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, then I'll simply just change the boolean return usage to throwing a Throwable, and add some Javadoc stating that any state in the failure handler is volatile.

// the Elasticsearch side; simply retry all action requests in the bulk
for (ActionRequest action : request.requests()) {
requestIndexer.add(action);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should provide a custom, pluggable retry logic here as well. If you are sure that only connection issues cause this, we can leave it as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll need to double check this. The ES documents don't say much about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I'm undecided if we want to add this here or not.
Just based on my experience with the Kafka connector, at some point there is a user who wants to have a very specific custom behavior :) But we can also keep it as is and fix it if a user needs it (worst case: they have to override our implementation)

Copy link
Contributor Author

@tzulitai tzulitai Feb 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we will need to use the failure handler here too.
Any exception that the Elasticsearch Client throws while issuing the bulk request can appear here too. So, exceptions like unreachable node can pop out here as well, and I don't think we should implicitly treat them as temporary.

requestIndexer.add(action);
}

numPendingRequests.getAndAdd(-request.numberOfActions());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets say a bulk with 500 actions fails, so we re-add the bulk again, but subtract 500 actions from the pending requests.

Now the bulk succeeds and we subtract 500 actions again. Which would make the num pending requests negative? and void the at least once guarantees?

Am I overseeing something here?

Copy link
Contributor Author

@tzulitai tzulitai Feb 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BulkProcessorIndexer (the implementation of the RequestIndexer provided to the user) will increment numPendingRequests whenever the user calls add(ActionRequest). So, in your description, when the user re-adds the 500 requests, numPendingRequests first becomes 500+500=1000. Then, we consider the failed 500 requests to have completed when this line is reached, so numPendingRequests becomes 1000-500=500.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Puh, that's good :) Thx for the explanation.
I didn't look close enough on your changes.

@rmetzger
Copy link
Contributor

I finished checking the code.
The only thing I'm missing from the change is a test case ensuring that the implementation works.

I think we can build a test similar to what we did with Kafka. (With a mock producer)

@tzulitai
Copy link
Contributor Author

Yes, I overlooked adding tests for this.

Thanks a lot for the reviews @rmetzger! I'll address your comments and tests for the additional features.
Will ping you once it's ready for another review.

@tzulitai
Copy link
Contributor Author

tzulitai commented Feb 22, 2017

@rmetzger All of your comments have been addressed, and tests for the new features have been added (in ElasticsearchSinkBaseTest). Can you take another look? Thanks a lot!

Some notes on changes I made that weren't previously discussed:

  1. Renamed NoOpActionRequestFailureHandler to just NoOpFailureHandler - less of a mouthful ;-)
  2. I added the responsed REST status code through the failure handler's onFailure(...) callback. The reason for this is explained in the doc / Javadoc changes of the last follow-up commit (c594523).

Copy link
Contributor

@rmetzger rmetzger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for addressing my comments.
I know a lot of work went into it.

I had some questions on the change, otherwise, good to go.


checkArgument(InstantiationUtil.isSerializable(failureHandler),
"The implementation of the provided ActionRequestFailureHandler is not serializable. " +
"The object probably contains or references non-serializable fields.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's so much nicer now :)


if (flushOnCheckpoint) {
do {
bulkProcessor.flush();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flush() might be a noop if bulkRequest.numberOfActions() == 0 in the bulkProcessor implementation.
If so, this loop turns into a busy loop wasting CPU cycles.
I wonder if we should wait on the numPendingRequests and notify on it once we update it?

(Sorry that I bring this up in the second review)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see the problem here ...
The bulk processor's internal bulkRequest.numberOfActions() == 0 will become true as soon as it starts executing the flush, and not after afterBulk is invoked.

So, since our numPendingRequests implementation relies on the afterBulk callback, we might have busy loops on bulkProcessor.flush() while we wait for numPendingRequests to become 0.

This is quite a nice catch actually! So no worries on bringing it up now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting on numPendingRequests makes sense, I'll try and see if it works out.

Copy link
Contributor Author

@tzulitai tzulitai Feb 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second look, I think my previous statement is incorrect.

To elaborate, this is the way the BulkProcessor's flush is implemented:

if(this.bulkRequest.numberOfActions() > 0) {
    this.execute();
}

execute() doesn't return until afterBulk is called on the listener (we're hard-configuring the bulk processor to not allow concurrent requests, so the process is blocking; see line 335 of ElasticsearchSinkBase).
Since we can re-add requests to the bulk processor within afterBulk, the bulkRequest.numberOfActions() > 0 will be true again and re-enters the loop.

Therefore, the bulkProcessor.flush() can actually just be called once, and will work with our failure-handler re-adding strategy so that the flush also waits for re-added requests before returning. We can just check once on numPendingRequests after the flush to make sure the flush works as expected.

So, to sum up, we can just do:

if (flushOnCheckpoint) {
    bulkProcessor.flush();
    if (numPendingRequests.get() > 0) {
        throw new IllegalStateException(...);
    }
}

and it will work with the failure handler.

Do you think this makes sense?

Copy link
Contributor Author

@tzulitai tzulitai Feb 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following my arguments above, I think the busy loop you mentioned shouldn't happen, because bulk processor's internal bulkRequest.numberOfActions() should always be synced with our numPendingRecords. (i.e., it should not occur at any point in time that bulkRequest.numberOfActions() == 0 but our own numPendingRecords != 0).

So in that case, if bulkRequest.numberOfActions() == 0 then my original loop implementation just fallbacks to a single pass with 2 condition checks.

To a certain extent, I think it might be better to stick to the original loop implementation, so that we're not locked-in with how the BulkProcessor's flush is implemented. As you can see from a commit I just pushed (2956f99) which modifies the mock bulk processor in tests to correctly mimic the flushing behaviour I described above, the loop implementation still pass the tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for looking into this in detail.
I think calling flush() this way is okay then.

@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
indexer.add(action);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think this is worth a LOG.debug statement?
Or will it happen too often / is too uninformative?

I wonder if we could use the metrics system for exposing stuff like error rate, retry rate etc. (Maybe we should file a JIRA for the ElasticSearch connectors to "metricify" them)

Copy link
Contributor Author

@tzulitai tzulitai Feb 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BulkProcessorListener actually logs them as LOG.error before they are processed by the failure handler (line 374 and line 398 of ElasticsearchSinkBase). So, these failures are always logged regardless of whether the failure handler chooses to log them. Do you think that's ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the frequency of EsRejectedExecutionException, from my experience with ES before, they pop up a lot with under-resourced / configured ES clusters.

It can flood logs if it isn't treated accordingly, but not logging them can be bad too because you'll know nothing about it, unless the sink eventually fails with it.

We could also remove the failure logging from the ElasticsearchSinkBase and let the user be responsible for that, but I'm a bit undecided here. Open to suggestions for this!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metricifing the ES connectors seems like a good idea, especially with its growing popularity. I'll think about it and file a JIRA with some initial proposals.

@rmetzger
Copy link
Contributor

+1 to merge.

Thank you for answering all my comments so detailed!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants