-
Notifications
You must be signed in to change notification settings - Fork 13.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-5487] [elasticsearch] At-least-once ElasticsearchSink #3358
Conversation
…rors. Covered exceptions are: Timeouts, No Master, UnavailableShardsException, bulk queue on node full
…csearchSink This commit fixes both FLINK-5353 and FLINK-5122. It allows users to implement a failure handler to control how failed action requests are dealt with. The commit also includes general improvements to FLINK-5122: 1. Use the built-in backoff functionality in the Elasticsearch BulkProcessor (not available for Elasticsearch 1.x) 2. Migrate the `checkErrorAndRetryBulk` functionality to the new failure handler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've done an initial 80% pass over the code and left some comments we need to address before we can merge the change.
docs/dev/connectors/elasticsearch.md
Outdated
need to wait for the re-added requests to be flushed when checkpointing. | ||
This also means that if re-added requests never succeed, the checkpoint will | ||
never finish. | ||
</p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the docs should mention the NoOpActionRequestFailureHandler
.
Also I wonder if we should offer a default RetryActionRequestFailureHandler
. I suspect that many users will need that. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I'll add to mention that by default, the sink uses the NoOpActionRequestFailureHandler
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a pre-implemented ActionRequestFailureHandler
that re-adds requests for full queue exceptions will be nice, and useful out-of-the box for a large portion of users. Great idea!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't suggest adding a ActionRequestFailureHandler
that out-of-the-box retries for all exceptions, though. That could let users easily overlook some exceptions that simply cannot be retried without custom logic (for example, malformed documents with wrong field types).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we should provide a reasonable default behavior, instead of just retrying.
More information about Elasticsearch can be found [here](https://elastic.co). | ||
|
||
#### Packaging the Elasticsearch Connector into an Uber-Jar | ||
## Packaging the Elasticsearch Connector into an Uber-Jar |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the reworked documentation page a lot!
|
||
/** | ||
* An implementation of {@link ActionRequestFailureHandler} is provided by the user to define how failed | ||
* {@link ActionRequest ActionRequests} should be handled, ex. dropping them, reprocessing malformed documents, or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if the "ex." is correct here: http:https://english.stackexchange.com/questions/16197/whats-the-difference-between-e-g-and-ex
} | ||
|
||
public void setMaxRetryCount(int maxRetryCount) { | ||
checkArgument(maxRetryCount > 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't 0 also an acceptable value here? If users want to disable retries entirely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, 0 should be acceptable.
} | ||
|
||
public void setDelayMillis(long delayMillis) { | ||
checkArgument(delayMillis > 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should accept 0 here as well, if users want to retry immediately (for whatever reason :) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, 0 should be acceptable. Nice catches.
throw new IllegalArgumentException( | ||
"The implementation of the provided ActionRequestFailureHandler is not serializable. " + | ||
"The object probably contains or references non serializable fields."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks a bit like duplicate code. I think adding a utility into the InstantiationUtil
that is called isSerializable()
would be cleaner and save some LOC.
failureThrowable.compareAndSet(null, failure); | ||
LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); | ||
|
||
if (failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would be better if the onFailure
method would not return a boolean but throw a Throwable?
This way users have more flexibility in implementing their failure handler.
For example if a failure handler is doing three retries and fails afterwards, the original exception will be thrown. If the onFailure()
method can throw their own exception, you can throw a custom exception that tells the user about the three retries.
We can definitively discuss this because this change is annoying to do (docs & javadocs need to be updated).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's actually a good idea, I didn't think it that way. I would like to change it to throw a Throwable
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the use case you mentioned, that would mean the user implements a stateful ActionRequestFailureHandler
, with its state being the number of failures so far, correct?
I didn't think about this too much, but I guess there shouldn't be a problem for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mh. I don't know if the use case I've mentioned makes a lot of sense. Probably most of the users just want to use a custom logic to decide how to do the retries / discards.
I think we shouldn't do complicated things like checkpointing the state of the failure handler. Its good enough if the user keeps it locally (and loses it on failure)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, then I'll simply just change the boolean
return usage to throwing a Throwable
, and add some Javadoc stating that any state in the failure handler is volatile.
// the Elasticsearch side; simply retry all action requests in the bulk | ||
for (ActionRequest action : request.requests()) { | ||
requestIndexer.add(action); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should provide a custom, pluggable retry logic here as well. If you are sure that only connection issues cause this, we can leave it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll need to double check this. The ES documents don't say much about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. I'm undecided if we want to add this here or not.
Just based on my experience with the Kafka connector, at some point there is a user who wants to have a very specific custom behavior :) But we can also keep it as is and fix it if a user needs it (worst case: they have to override our implementation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we will need to use the failure handler here too.
Any exception that the Elasticsearch Client
throws while issuing the bulk request can appear here too. So, exceptions like unreachable node can pop out here as well, and I don't think we should implicitly treat them as temporary.
requestIndexer.add(action); | ||
} | ||
|
||
numPendingRequests.getAndAdd(-request.numberOfActions()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets say a bulk with 500 actions fails, so we re-add the bulk again, but subtract 500 actions from the pending requests.
Now the bulk succeeds and we subtract 500 actions again. Which would make the num pending requests negative? and void the at least once guarantees?
Am I overseeing something here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BulkProcessorIndexer
(the implementation of the RequestIndexer
provided to the user) will increment numPendingRequests
whenever the user calls add(ActionRequest)
. So, in your description, when the user re-adds the 500 requests, numPendingRequests
first becomes 500+500=1000
. Then, we consider the failed 500 requests to have completed when this line is reached, so numPendingRequests
becomes 1000-500=500
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Puh, that's good :) Thx for the explanation.
I didn't look close enough on your changes.
I finished checking the code. I think we can build a test similar to what we did with Kafka. (With a mock producer) |
Yes, I overlooked adding tests for this. Thanks a lot for the reviews @rmetzger! I'll address your comments and tests for the additional features. |
…edExecutionFailureHandler
@rmetzger All of your comments have been addressed, and tests for the new features have been added (in Some notes on changes I made that weren't previously discussed:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for addressing my comments.
I know a lot of work went into it.
I had some questions on the change, otherwise, good to go.
|
||
checkArgument(InstantiationUtil.isSerializable(failureHandler), | ||
"The implementation of the provided ActionRequestFailureHandler is not serializable. " + | ||
"The object probably contains or references non-serializable fields."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's so much nicer now :)
|
||
if (flushOnCheckpoint) { | ||
do { | ||
bulkProcessor.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flush() might be a noop if bulkRequest.numberOfActions() == 0 in the bulkProcessor implementation.
If so, this loop turns into a busy loop wasting CPU cycles.
I wonder if we should wait on the numPendingRequests and notify on it once we update it?
(Sorry that I bring this up in the second review)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see the problem here ...
The bulk processor's internal bulkRequest.numberOfActions() == 0
will become true
as soon as it starts executing the flush, and not after afterBulk
is invoked.
So, since our numPendingRequests
implementation relies on the afterBulk
callback, we might have busy loops on bulkProcessor.flush()
while we wait for numPendingRequests
to become 0.
This is quite a nice catch actually! So no worries on bringing it up now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Waiting on numPendingRequests
makes sense, I'll try and see if it works out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a second look, I think my previous statement is incorrect.
To elaborate, this is the way the BulkProcessor
's flush
is implemented:
if(this.bulkRequest.numberOfActions() > 0) {
this.execute();
}
execute()
doesn't return until afterBulk
is called on the listener (we're hard-configuring the bulk processor to not allow concurrent requests, so the process is blocking; see line 335 of ElasticsearchSinkBase
).
Since we can re-add requests to the bulk processor within afterBulk
, the bulkRequest.numberOfActions() > 0
will be true again and re-enters the loop.
Therefore, the bulkProcessor.flush()
can actually just be called once, and will work with our failure-handler re-adding strategy so that the flush also waits for re-added requests before returning. We can just check once on numPendingRequests
after the flush to make sure the flush works as expected.
So, to sum up, we can just do:
if (flushOnCheckpoint) {
bulkProcessor.flush();
if (numPendingRequests.get() > 0) {
throw new IllegalStateException(...);
}
}
and it will work with the failure handler.
Do you think this makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following my arguments above, I think the busy loop you mentioned shouldn't happen, because bulk processor's internal bulkRequest.numberOfActions()
should always be synced with our numPendingRecords
. (i.e., it should not occur at any point in time that bulkRequest.numberOfActions() == 0
but our own numPendingRecords != 0
).
So in that case, if bulkRequest.numberOfActions() == 0
then my original loop implementation just fallbacks to a single pass with 2 condition checks.
To a certain extent, I think it might be better to stick to the original loop implementation, so that we're not locked-in with how the BulkProcessor
's flush is implemented. As you can see from a commit I just pushed (2956f99) which modifies the mock bulk processor in tests to correctly mimic the flushing behaviour I described above, the loop implementation still pass the tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for looking into this in detail.
I think calling flush() this way is okay then.
@Override | ||
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable { | ||
if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) { | ||
indexer.add(action); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think this is worth a LOG.debug statement?
Or will it happen too often / is too uninformative?
I wonder if we could use the metrics system for exposing stuff like error rate, retry rate etc. (Maybe we should file a JIRA for the ElasticSearch connectors to "metricify" them)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BulkProcessorListener
actually logs them as LOG.error before they are processed by the failure handler (line 374 and line 398 of ElasticsearchSinkBase
). So, these failures are always logged regardless of whether the failure handler chooses to log them. Do you think that's ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the frequency of EsRejectedExecutionException
, from my experience with ES before, they pop up a lot with under-resourced / configured ES clusters.
It can flood logs if it isn't treated accordingly, but not logging them can be bad too because you'll know nothing about it, unless the sink eventually fails with it.
We could also remove the failure logging from the ElasticsearchSinkBase
and let the user be responsible for that, but I'm a bit undecided here. Open to suggestions for this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Metricifing the ES connectors seems like a good idea, especially with its growing popularity. I'll think about it and file a JIRA with some initial proposals.
+1 to merge. Thank you for answering all my comments so detailed! |
This PR adds proper support for an at-least-once
ElasticsearchSink
. This is based on the pluggable error handling strategy functionality added in #3246, so only the last commit is relevant.Like the Kafka producer, the way it works is that pending requests not yet acknowledged by Elasticsearch needs to be flushed before proceeding with the next record from upstream.
Slight difference is that for the
ElasticsearchSink
, since we're allowing re-adding failed requests back to the internalBulkProcessor
(as part of #3246), we'll also need to wait for the re-added requests. The docs warn that if requests are re-added, it may lead to longer checkpoints since we need to wait for those too.Flushing is enabled by default, but we provide a
disableFlushOnCheckpoint
method to switch it off. The docs and Javadoc of the method warns the user how this would affect at-least-once delivery.