-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
major: #18 Batch support and engine event system improvements #19
Conversation
bb29be2
to
113c94b
Compare
parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumer.java
Outdated
Show resolved
Hide resolved
parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumer.java
Outdated
Show resolved
Hide resolved
...el-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java
Outdated
Show resolved
Hide resolved
113c94b
to
abe8782
Compare
42029e9
to
8fe9766
Compare
I think documentation should clarify relation between the Consumer property In addition it would be useful if documentation clarifies how error handling and offset commits are handled in batch processing |
parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumer.java
Outdated
Show resolved
Hide resolved
good suggestion! Will do so - hold me to it.
what sort of errors are you thinking about? Off the top of my head, by default, if there is an error processing a batch of messages, the same thing would happen as if for a single message - the messages will all be returned to the internal queues, and will be retried after the default delay expires. |
I was not thinking on specific errors, but in general it would be useful to have a section in docs that states exactly what you are saying above, so users are aware of their responsibilities in this aspect! New feature request - "Retry settings" |
Thanks again for your input! We should track this in a different feature, it's something I've been planning. I've created: #65 Enhanced retry epic. Let's continue the conversation there, and let me know if you have any further thoughts! I'll do something basic for the next version (enable/disable and then skip or die probably)... |
@ismarslomic @JorgenRingen do you have any ordering (or relative ordering) expectations or requirements for the messages within the batches? |
abe8782
to
6d9af27
Compare
ok, implementation is ready for testing. I'm not entirely satisfied with the interface, but let me know what you think. I.e. we could make batching the only interface, and have the default batch size be == 1. But then the user function would always need to be a consumer which takes a list of records, which is kind of odd. Depends if you think batching would be the more common use case, or if single record processing would be.. |
Does anyone have time to test this branch out? If not that's ok, I'll release it as 3.1. |
...el-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java
Outdated
Show resolved
Hide resolved
bf382c4
to
97d09fc
Compare
parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java
Outdated
Show resolved
Hide resolved
...el-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/DynamicLoadFactor.java
Outdated
Show resolved
Hide resolved
...el-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/DynamicLoadFactor.java
Outdated
Show resolved
Hide resolved
parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java
Outdated
Show resolved
Hide resolved
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/BatchTest.java
Outdated
Show resolved
Hide resolved
4c62a9f
to
b5b166f
Compare
7a40a9f
to
851ce29
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
step
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great! ;)
parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java
Outdated
Show resolved
Hide resolved
parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java
Outdated
Show resolved
Hide resolved
...el-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/DynamicLoadFactor.java
Outdated
Show resolved
Hide resolved
parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java
Outdated
Show resolved
Hide resolved
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/BatchTest.java
Outdated
Show resolved
Hide resolved
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/BatchTest.java
Outdated
Show resolved
Hide resolved
ok, there’s a couple test failures still - i think they need updating. But the batching feature seems feature complete now. Need to implement some test for the vertx and reactor versions, but should have this merged early next week. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor
...el-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java
Outdated
Show resolved
Hide resolved
.../src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorTestBase.java
Outdated
Show resolved
Hide resolved
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/BatchTestMethods.java
Show resolved
Hide resolved
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/BatchTestMethods.java
Outdated
Show resolved
Hide resolved
...el-consumer-reactor/src/test/java/io/confluent/parallelconsumer/reactor/CopiedBatchTest.java
Outdated
Show resolved
Hide resolved
...l-consumer-reactor/src/test/java/io/confluent/parallelconsumer/reactor/ReactorBatchTest.java
Outdated
Show resolved
Hide resolved
parallel-consumer-vertx/src/test/java/io/confluent/parallelconsumer/vertx/VertxBatchTest.java
Outdated
Show resolved
Hide resolved
parallel-consumer-vertx/src/test/java/io/confluent/parallelconsumer/vertx/VertxBatchTest.java
Show resolved
Hide resolved
86ca22d
to
ad2e0be
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getting there... lots of improvements / refactoring in this to
...src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
Outdated
Show resolved
Hide resolved
...src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
Outdated
Show resolved
Hide resolved
...el-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/DynamicLoadFactor.java
Outdated
Show resolved
Hide resolved
parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java
Outdated
Show resolved
Hide resolved
parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java
Outdated
Show resolved
Hide resolved
parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java
Outdated
Show resolved
Hide resolved
...er-vertx/src/main/java/io/confluent/parallelconsumer/vertx/VertxParallelStreamProcessor.java
Show resolved
Hide resolved
...l-consumer-reactor/src/main/java/io/confluent/parallelconsumer/reactor/ReactorProcessor.java
Show resolved
Hide resolved
2fba344
to
cc1167e
Compare
Target should be 100% now. Protect for negative delta. |
3122d14
to
16d39ec
Compare
.../src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorTestBase.java
Outdated
Show resolved
Hide resolved
96eb5a0
to
b7b631c
Compare
- test for observing dropping average batch size - performance: process mailbox timeout calculation not needed as poll box is interrupted either with work results, or with new messages from broker poller - improvements to event based system and LongPollingMockConsumer - eventing and timing improvements and simplifications / refactorings, better batch test code reuse - only sleep for retry delay if starved - remove clean/dirty state cache - too complex and unnecessary - stay tuned for replacement - change close ordering of operations to fix final commit execution - teach ProgressTracker to use time - batch size request - always request enough to fill all batches Co-authored-by: Antony Stubbs <[email protected]> Co-authored-by: Benedikt Linse <[email protected]>
b7b631c
to
7e26075
Compare
#18 - parallel batching - this PR changes the engine to always use lists of records, where non batch processing is the case where the batch size is one.
Add support for parallel processing of batches / bulk sets of messages #18
Vertx
Reactor
Docs
vertx batch test
reactor batch test
Failure tests
ensure loop cycle is good
todos
investigate build speed change - Improve test suite speed to pre-batch merge levels #226
Related: