Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kafka simple consumer buffer and timeout to stream config #3584

Merged
merged 3 commits into from
Dec 6, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
*/
public class KafkaConnectionHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConnectionHandler.class);
private static final int SOCKET_TIMEOUT_MILLIS = 10000;
private static final int SOCKET_BUFFER_SIZE = 512000;

enum ConsumerState {
CONNECTING_TO_BOOTSTRAP_NODE,
Expand All @@ -65,6 +63,8 @@ enum ConsumerState {
KafkaBrokerWrapper _leader;
String _currentHost;
int _currentPort;
int _bufferSize;
int _socketTimeout;

final KafkaSimpleConsumerFactory _simpleConsumerFactory;
SimpleConsumer _simpleConsumer;
Expand Down Expand Up @@ -110,6 +110,8 @@ public KafkaConnectionHandler(String clientId, StreamConfig streamConfig,
isPartitionProvided = false;
_partition = Integer.MIN_VALUE;

_bufferSize = kafkaLowLevelStreamConfig.getKafkaBufferSize();
_socketTimeout = kafkaLowLevelStreamConfig.getKafkaSocketTimeout();
initializeBootstrapNodeList(kafkaLowLevelStreamConfig.getBootstrapHosts());
setCurrentState(new ConnectingToBootstrapNode());
}
Expand All @@ -133,6 +135,8 @@ public KafkaConnectionHandler(String clientId, StreamConfig streamConfig, int pa
isPartitionProvided = true;
_partition = partition;

_bufferSize = kafkaLowLevelStreamConfig.getKafkaBufferSize();
_socketTimeout = kafkaLowLevelStreamConfig.getKafkaSocketTimeout();
initializeBootstrapNodeList(kafkaLowLevelStreamConfig.getBootstrapHosts());
setCurrentState(new ConnectingToBootstrapNode());
}
Expand Down Expand Up @@ -216,8 +220,8 @@ public void process() {

try {
LOGGER.info("Connecting to bootstrap host {}:{} for topic {}", _currentHost, _currentPort, _topic);
_simpleConsumer = _simpleConsumerFactory.buildSimpleConsumer(_currentHost, _currentPort, SOCKET_TIMEOUT_MILLIS,
SOCKET_BUFFER_SIZE, _clientId);
_simpleConsumer = _simpleConsumerFactory.buildSimpleConsumer(_currentHost, _currentPort, _socketTimeout,
_bufferSize, _clientId);
setCurrentState(new ConnectedToBootstrapNode());
} catch (Exception e) {
handleConsumerException(e);
Expand Down Expand Up @@ -326,8 +330,8 @@ void process() {
// Connect to the partition leader
try {
_simpleConsumer =
_simpleConsumerFactory.buildSimpleConsumer(_leader.host(), _leader.port(), SOCKET_TIMEOUT_MILLIS,
SOCKET_BUFFER_SIZE, _clientId);
_simpleConsumerFactory.buildSimpleConsumer(_leader.host(), _leader.port(), _socketTimeout,
_bufferSize, _clientId);

setCurrentState(new ConnectedToPartitionLeader());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.google.common.base.Preconditions;
import com.linkedin.pinot.common.utils.EqualityUtils;
import com.linkedin.pinot.core.realtime.stream.StreamConfig;
import org.apache.commons.lang.StringUtils;

import java.util.Map;


Expand All @@ -28,6 +30,8 @@ public class KafkaLowLevelStreamConfig {

private String _kafkaTopicName;
private String _bootstrapHosts;
private int _kafkaBufferSize;
private int _kafkaSocketTimeout;

/**
* Builds a wrapper around {@link StreamConfig} to fetch kafka partition level consumer related configs
Expand All @@ -40,7 +44,15 @@ public KafkaLowLevelStreamConfig(StreamConfig streamConfig) {

String llcBrokerListKey =
KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST);
String llcBufferKey =
KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE);
String llcTimeoutKey =
KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT);
_bootstrapHosts = streamConfigMap.get(llcBrokerListKey);
_kafkaBufferSize = getConfigWithDefault(streamConfigMap, llcBufferKey,
KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT);
_kafkaSocketTimeout = getConfigWithDefault(streamConfigMap, llcTimeoutKey,
KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT);
Preconditions.checkNotNull(_bootstrapHosts,
"Must specify kafka brokers list " + llcBrokerListKey + " in case of low level kafka consumer");
}
Expand All @@ -53,12 +65,36 @@ public String getBootstrapHosts() {
return _bootstrapHosts;
}

public int getKafkaBufferSize() {
return _kafkaBufferSize;
}

public int getKafkaSocketTimeout() {
return _kafkaSocketTimeout;
}

private int getConfigWithDefault(Map<String, String> configMap, String key, int defaultValue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it's better to indicate int to the function name since this is specific to getting an int value. e.g. getIntConfigWithDefault

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated in new diff

String stringValue = configMap.get(key);
try {
if (StringUtils.isNotEmpty(stringValue)) {
return Integer.parseInt(stringValue);
}
return defaultValue;
} catch (NumberFormatException ex) {
return defaultValue;
}
}

@Override
public String toString() {
return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\'' + ", _bootstrapHosts='"
+ _bootstrapHosts + '\'' + '}';
return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\''
+ ", _bootstrapHosts='" + _bootstrapHosts + '\''
+ ", _kafkaBufferSize='" + _kafkaBufferSize + '\''
+ ", _kafkaSocketTimeout='" + _kafkaSocketTimeout + '\''
+ '}';
}


@Override
public boolean equals(Object o) {
if (EqualityUtils.isSameReference(this, o)) {
Expand All @@ -71,14 +107,18 @@ public boolean equals(Object o) {

KafkaLowLevelStreamConfig that = (KafkaLowLevelStreamConfig) o;

return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && EqualityUtils.isEqual(_bootstrapHosts,
that._bootstrapHosts);
return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName)
&& EqualityUtils.isEqual(_bootstrapHosts, that._bootstrapHosts)
&& EqualityUtils.isEqual(_kafkaBufferSize, that._kafkaBufferSize)
&& EqualityUtils.isEqual(_kafkaSocketTimeout, that._kafkaSocketTimeout);
}

@Override
public int hashCode() {
int result = EqualityUtils.hashCodeOf(_kafkaTopicName);
result = EqualityUtils.hashCodeOf(result, _bootstrapHosts);
result = EqualityUtils.hashCodeOf(result, _kafkaBufferSize);
result = EqualityUtils.hashCodeOf(result, _kafkaSocketTimeout);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
Expand All @@ -45,6 +46,11 @@ public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, i
super(clientId, streamConfig, partition, kafkaSimpleConsumerFactory);
}

@VisibleForTesting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this function should locate in the base class that defines simpleConsumer. Can we move this to KafkaConnectionHandler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated in new diff

public SimpleConsumer getSimpleConsumer() {
return _simpleConsumer;
}

/**
* Fetch messages and the per-partition high watermark from Kafka between the specified offsets.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public static class HighLevelConsumer {

public static class LowLevelConsumer {
public static final String KAFKA_BROKER_LIST = "kafka.broker.list";
public static final String KAFKA_BUFFER_SIZE = "kafka.buffer.size";
public static final int KAFKA_BUFFER_SIZE_DEFAULT = 512000;
public static final String KAFKA_SOCKET_TIMEOUT = "kafka.socket.timeout";
public static final int KAFKA_SOCKET_TIMEOUT_DEFAULT = 10000;
}

public static final String KAFKA_CONSUMER_PROP_PREFIX = "kafka.consumer.prop";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.linkedin.pinot.core.realtime.impl.kafka;
jamesyfshao marked this conversation as resolved.
Show resolved Hide resolved

import com.google.common.collect.ImmutableMap;
import com.linkedin.pinot.core.realtime.stream.StreamConfig;
import com.linkedin.pinot.core.realtime.stream.StreamConfigProperties;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.HashMap;
import java.util.Map;

import static org.testng.Assert.*;
import static com.linkedin.pinot.core.realtime.impl.kafka.KafkaStreamConfigProperties.LowLevelConsumer.*;

public class KafkaLowLevelStreamConfigTest {

private KafkaLowLevelStreamConfig getStreamConfig(String topic, String bootstrapHosts, String buffer, String socketTimeout) {
Map<String, String> streamConfigMap = new HashMap<>();
String streamType = "kafka";
String consumerType = StreamConfig.ConsumerType.LOWLEVEL.toString();
String consumerFactoryClassName = KafkaConsumerFactory.class.getName();
String decoderClass = KafkaAvroMessageDecoder.class.getName();
streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
streamConfigMap.put(
StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), topic);
streamConfigMap.put(
StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
consumerType);
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), consumerFactoryClassName);
streamConfigMap.put(
StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
decoderClass);
streamConfigMap.put("stream.kafka.broker.list", bootstrapHosts);
if (buffer != null) {
streamConfigMap.put("stream.kafka.buffer.size", buffer);
}
if (socketTimeout != null) {
streamConfigMap.put("stream.kafka.socket.timeout", String.valueOf(socketTimeout));
}
return new KafkaLowLevelStreamConfig(new StreamConfig(streamConfigMap));
}

@Test
public void testGetKafkaTopicName() {
KafkaLowLevelStreamConfig config = getStreamConfig("topic", "", "", "");
Assert.assertEquals("topic", config.getKafkaTopicName());
}

@Test
public void testGetBootstrapHosts() {
KafkaLowLevelStreamConfig config = getStreamConfig("topic", "host1", "", "");
Assert.assertEquals("host1", config.getBootstrapHosts());
}

@Test
public void testGetKafkaBufferSize() {
// test default
KafkaLowLevelStreamConfig config = getStreamConfig("topic", "host1", null, "");
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT, config.getKafkaBufferSize());

config = getStreamConfig("topic", "host1", "", "");
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT, config.getKafkaBufferSize());

config = getStreamConfig("topic", "host1", "bad value", "");
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT, config.getKafkaBufferSize());

// correct config
config = getStreamConfig("topic", "host1", "100", "");
Assert.assertEquals(100, config.getKafkaBufferSize());
}

@Test
public void testGetKafkaSocketTimeout() {
// test default
KafkaLowLevelStreamConfig config = getStreamConfig("topic", "host1", "",null);
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT, config.getKafkaSocketTimeout());

config = getStreamConfig("topic", "host1", "","");
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT, config.getKafkaSocketTimeout());

config = getStreamConfig("topic", "host1", "","bad value");
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT, config.getKafkaSocketTimeout());

// correct config
config = getStreamConfig("topic", "host1", "", "100");
Assert.assertEquals(100, config.getKafkaSocketTimeout());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.base.Preconditions;
import com.linkedin.pinot.core.realtime.impl.kafka.KafkaPartitionLevelConsumer;
import com.linkedin.pinot.core.realtime.impl.kafka.KafkaSimpleConsumerFactory;
import com.linkedin.pinot.core.realtime.impl.kafka.KafkaStreamConfigProperties;
import com.linkedin.pinot.core.realtime.impl.kafka.KafkaStreamMetadataProvider;
import com.linkedin.pinot.core.realtime.stream.OffsetCriteria;
import com.linkedin.pinot.core.realtime.stream.StreamConfig;
Expand Down Expand Up @@ -208,6 +209,55 @@ public SimpleConsumer buildSimpleConsumer(String host, int port, int soTimeout,
}
}

@Test
public void testBuildConsumer() throws Exception {
String streamType = "kafka";
String streamKafkaTopicName = "theTopic";
String streamKafkaBrokerList = "abcd:1234,bcde:2345";
String streamKafkaConsumerType = "simple";
String clientId = "clientId";

MockKafkaSimpleConsumerFactory mockKafkaSimpleConsumerFactory = new MockKafkaSimpleConsumerFactory(
new String[] { "abcd", "bcde" },
new int[] { 1234, 2345 },
new long[] { 12345L, 23456L },
new long[] { 23456L, 34567L },
new int[] { 0, 1 },
streamKafkaTopicName
);

Map<String, String> streamConfigMap = new HashMap<>();
streamConfigMap.put("streamType", streamType);
streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName);
streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
streamConfigMap.put("stream.kafka.consumer.factory.class.name", mockKafkaSimpleConsumerFactory.getClass().getName());
streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
StreamConfig streamConfig = new StreamConfig(streamConfigMap);

KafkaStreamMetadataProvider streamMetadataProvider =
new KafkaStreamMetadataProvider(clientId, streamConfig, mockKafkaSimpleConsumerFactory);

// test default value
KafkaPartitionLevelConsumer kafkaSimpleStreamConsumer =
new KafkaPartitionLevelConsumer(clientId, streamConfig, 0, mockKafkaSimpleConsumerFactory);
kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
kafkaSimpleStreamConsumer.getSimpleConsumer().bufferSize());
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
kafkaSimpleStreamConsumer.getSimpleConsumer().soTimeout());

// test user defined values
streamConfigMap.put("stream.kafka.buffer.size", "100");
streamConfigMap.put("stream.kafka.socket.timeout", "1000");
streamConfig = new StreamConfig(streamConfigMap);
kafkaSimpleStreamConsumer =
new KafkaPartitionLevelConsumer(clientId, streamConfig, 0, mockKafkaSimpleConsumerFactory);
kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
Assert.assertEquals(100, kafkaSimpleStreamConsumer.getSimpleConsumer().bufferSize());
Assert.assertEquals(1000, kafkaSimpleStreamConsumer.getSimpleConsumer().soTimeout());
}

@Test
public void testGetPartitionCount() {
String streamType = "kafka";
Expand Down