-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add kafka simple consumer buffer and timeout to stream config #3584
Conversation
...src/test/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
Show resolved
Hide resolved
return _kafkaSocketTimeout; | ||
} | ||
|
||
private int getConfigWithDefault(Map<String, String> configMap, String key, int defaultValue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it's better to indicate int
to the function name since this is specific to getting an int value. e.g. getIntConfigWithDefault
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated in new diff
@@ -45,6 +46,11 @@ public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, i | |||
super(clientId, streamConfig, partition, kafkaSimpleConsumerFactory); | |||
} | |||
|
|||
@VisibleForTesting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this function should locate in the base class that defines simpleConsumer
. Can we move this to KafkaConnectionHandler
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated in new diff
Codecov Report
@@ Coverage Diff @@
## master #3584 +/- ##
===========================================
+ Coverage 70.8% 70.91% +0.1%
Complexity 4 4
===========================================
Files 1007 1007
Lines 46179 46203 +24
Branches 6193 6197 +4
===========================================
+ Hits 32697 32763 +66
+ Misses 11334 11287 -47
- Partials 2148 2153 +5
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
I'm not aware of why that is the default. But this change will now let us increase it in the config if we desire.
@npawar Thanks for the confirmation. I'm merging the pr. |
When the kafka broker cluster has max message size config bigger than our current hard-coded value of low-level consumer buffer size (512K), pinot kafka consumer could encounter messages from kafka that are bigger than our buffer size. When this happens, kafka low-level consumer will fail silently and returns an empty message result list to low-level message ingester. From pinot users' point of view, pinot kafka low-level consumer stuck at the large message offset. This change aims to allow users to define their kafka ingestor buffer size so it can handle the failure scenarios above.