Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kafka simple consumer buffer and timeout to stream config #3584

Merged
merged 3 commits into from
Dec 6, 2018

Conversation

jamesyfshao
Copy link
Contributor

When the kafka broker cluster has max message size config bigger than our current hard-coded value of low-level consumer buffer size (512K), pinot kafka consumer could encounter messages from kafka that are bigger than our buffer size. When this happens, kafka low-level consumer will fail silently and returns an empty message result list to low-level message ingester. From pinot users' point of view, pinot kafka low-level consumer stuck at the large message offset. This change aims to allow users to define their kafka ingestor buffer size so it can handle the failure scenarios above.

return _kafkaSocketTimeout;
}

private int getConfigWithDefault(Map<String, String> configMap, String key, int defaultValue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it's better to indicate int to the function name since this is specific to getting an int value. e.g. getIntConfigWithDefault

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated in new diff

@@ -45,6 +46,11 @@ public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, i
super(clientId, streamConfig, partition, kafkaSimpleConsumerFactory);
}

@VisibleForTesting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this function should locate in the base class that defines simpleConsumer. Can we move this to KafkaConnectionHandler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated in new diff

@codecov-io
Copy link

codecov-io commented Dec 4, 2018

Codecov Report

Merging #3584 into master will increase coverage by 0.1%.
The diff coverage is 75%.

Impacted file tree graph

@@             Coverage Diff             @@
##             master    #3584     +/-   ##
===========================================
+ Coverage      70.8%   70.91%   +0.1%     
  Complexity        4        4             
===========================================
  Files          1007     1007             
  Lines         46179    46203     +24     
  Branches       6193     6197      +4     
===========================================
+ Hits          32697    32763     +66     
+ Misses        11334    11287     -47     
- Partials       2148     2153      +5
Impacted Files Coverage Δ Complexity Δ
...altime/impl/kafka/KafkaStreamConfigProperties.java 25% <ø> (ø) 0 <0> (ø) ⬇️
...re/realtime/impl/kafka/KafkaConnectionHandler.java 69.69% <100%> (+0.94%) 0 <0> (ø) ⬇️
...realtime/impl/kafka/KafkaLowLevelStreamConfig.java 61.53% <66.66%> (+11.53%) 0 <0> (ø) ⬇️
...a/manager/realtime/RealtimeSegmentDataManager.java 75% <0%> (-25%) 0% <0%> (ø)
.../impl/dictionary/LongOffHeapMutableDictionary.java 81.81% <0%> (-10.91%) 0% <0%> (ø)
...impl/dictionary/DoubleOnHeapMutableDictionary.java 68.88% <0%> (-6.67%) 0% <0%> (ø)
...e/operator/dociditerators/SortedDocIdIterator.java 54.05% <0%> (-5.41%) 0% <0%> (ø)
...mpl/dictionary/DoubleOffHeapMutableDictionary.java 74.54% <0%> (-3.64%) 0% <0%> (ø)
...inot/core/operator/filter/FilterOperatorUtils.java 84.37% <0%> (-3.13%) 0% <0%> (ø)
...ot/core/query/pruner/ColumnValueSegmentPruner.java 87.32% <0%> (-1.41%) 0% <0%> (ø)
... and 16 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 683b22d...2aaa0dc. Read the comment docs.

Copy link
Contributor

@snleee snleee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM otherwise

@mcvsubbu @npawar Do you have the context on how we decided the consumer socket buffer size to 512kb? I think that this number can be higher (e.g. 1MB) Can one of you double check this before merging this commit?

Copy link
Contributor

@npawar npawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
I'm not aware of why that is the default. But this change will now let us increase it in the config if we desire.

@snleee
Copy link
Contributor

snleee commented Dec 6, 2018

@npawar Thanks for the confirmation. I'm merging the pr.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants