diff --git a/README.adoc b/README.adoc index 0af3481..a8b9199 100644 --- a/README.adoc +++ b/README.adoc @@ -10,6 +10,9 @@ link:doc/forklift.adoc[Documentation] == Releases link:doc/prev_releases.adoc[Previous Releases] +* *Unreleased* +** Kafka: Add internal support for separate dedicated consumers on the same topic + * *January 17th 2017* - v3.1 ** Upgrade kafka clients to use Kafka 0.11 ** Fix bug in the kafka connector that made it not provide heartbeats in certain situations diff --git a/connectors/kafka/src/main/java/forklift/connectors/KafkaConnector.java b/connectors/kafka/src/main/java/forklift/connectors/KafkaConnector.java index 8044d32..ba292bd 100644 --- a/connectors/kafka/src/main/java/forklift/connectors/KafkaConnector.java +++ b/connectors/kafka/src/main/java/forklift/connectors/KafkaConnector.java @@ -39,24 +39,24 @@ public class KafkaConnector implements ForkliftConnectorI { private final String kafkaHosts; private final String schemaRegistries; - private final String groupId; + private final String defaultGroupId; private KafkaProducer kafkaProducer; private KafkaController controller; private ForkliftSerializer serializer; - private Map controllers = new HashMap<>(); + private Map controllers = new HashMap<>(); /** * Constructs a new instance of the KafkaConnector * * @param kafkaHosts list of kafka servers in host:port,... format * @param schemaRegistries list of schema registry servers in http://host:port,... format - * @param groupId the groupId to use when subscribing to topics + * @param defaultGroupId the default groupId to use when subscribing to topics */ - public KafkaConnector(String kafkaHosts, String schemaRegistries, String groupId) { + public KafkaConnector(String kafkaHosts, String schemaRegistries, String defaultGroupId) { this.kafkaHosts = kafkaHosts; this.schemaRegistries = schemaRegistries; - this.groupId = groupId; + this.defaultGroupId = defaultGroupId; this.serializer = new KafkaSerializer(this, newSerializer(), newDeserializer()); } @@ -101,7 +101,7 @@ private KafkaProducer createKafkaProducer() { return new KafkaProducer(producerProperties); } - private KafkaController createController(String topicName) { + private KafkaController createController(String topicName, String groupId) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHosts); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); @@ -137,48 +137,41 @@ public synchronized void stop() throws ConnectorException { @Override public ForkliftConsumerI getConsumerForSource(SourceI source) throws ConnectorException { - return source - .apply(QueueSource.class, queue -> getQueue(queue.getName())) - .apply(TopicSource.class, topic -> getTopic(topic.getName())) - .apply(GroupedTopicSource.class, topic -> getGroupedTopic(topic)) - .apply(RoleInputSource.class, roleSource -> { - final ForkliftConsumerI rawConsumer = getConsumerForSource(roleSource.getActionSource(this)); - return new RoleInputConsumerWrapper(rawConsumer); - }) - .elseUnsupportedError(); + if (source instanceof RoleInputSource) { + final RoleInputSource roleSource = (RoleInputSource) source; + final ForkliftConsumerI rawConsumer = getConsumerForSource(roleSource.getActionSource(this)); + return new RoleInputConsumerWrapper(rawConsumer); + } + return getGroupedTopic(mapToGroupedTopic(source)); } public synchronized ForkliftConsumerI getGroupedTopic(GroupedTopicSource source) throws ConnectorException { if (!source.groupSpecified()) { - source.overrideGroup(groupId); + source.overrideGroup(defaultGroupId); } - if (!source.getGroup().equals(groupId)) { //TODO actually support GroupedTopics - throw new ConnectorException("Unexpected group '" + source.getGroup() + "'; only the connector group '" + groupId + "' is allowed"); - } - - KafkaController controller = controllers.get(source.getName()); + KafkaController controller = controllers.get(source); if (controller != null && controller.isRunning()) { - log.warn("Consumer for topic already exists under this controller's groupname. Messages will be divided amongst consumers."); + log.warn("Consumer for topic and group already exists. Messages will be divided amongst consumers."); } else { - controller = createController(source.getName()); - this.controllers.put(source.getName(), controller); + controller = createController(source.getName(), source.getGroup()); + this.controllers.put(source, controller); controller.start(); } + return new KafkaTopicConsumer(source.getName(), controller); } @Override public ForkliftConsumerI getQueue(String name) throws ConnectorException { - return getGroupedTopic(new GroupedTopicSource(name, groupId)); + return getGroupedTopic(mapToGroupedTopic(new QueueSource(name))); } @Override public ForkliftConsumerI getTopic(String name) throws ConnectorException { - return getGroupedTopic(new GroupedTopicSource(name, groupId)); + return getGroupedTopic(mapToGroupedTopic(new TopicSource(name))); } - @Override public ForkliftProducerI getQueueProducer(String name) { return getTopicProducer(name); @@ -200,11 +193,28 @@ public ActionSource mapSource(LogicalSource source) { } protected GroupedTopicSource mapRoleInputSource(RoleInputSource roleSource) { - return new GroupedTopicSource("forklift-role-" + roleSource.getRole(), groupId); + return new GroupedTopicSource("forklift-role-" + roleSource.getRole(), defaultGroupId); } @Override public boolean supportsResponse() { return true; } + + /* visible for testing */ + protected GroupedTopicSource mapToGroupedTopic(SourceI source) { + return source + .apply(QueueSource.class, queueSource -> new GroupedTopicSource(queueSource.getName(), defaultGroupId)) + .apply(TopicSource.class, topicSource -> topicToGroupedTopic(topicSource)) + .apply(GroupedTopicSource.class, groupedTopicSource -> groupedTopicSource) + .elseUnsupportedError(); + } + + private GroupedTopicSource topicToGroupedTopic(TopicSource topicSource) { + if (topicSource.getContextClass() == null) { + return new GroupedTopicSource(topicSource.getName(), defaultGroupId); + } + final String groupName = defaultGroupId + "-" + topicSource.getContextClass().getSimpleName(); + return new GroupedTopicSource(topicSource.getName(), groupName); + } } diff --git a/connectors/kafka/src/test/java/forklift/connectors/KafkaConnectorTests.java b/connectors/kafka/src/test/java/forklift/connectors/KafkaConnectorTests.java new file mode 100644 index 0000000..6abcdac --- /dev/null +++ b/connectors/kafka/src/test/java/forklift/connectors/KafkaConnectorTests.java @@ -0,0 +1,58 @@ +package forklift.connectors; + +import forklift.source.sources.GroupedTopicSource; +import forklift.source.sources.QueueSource; +import forklift.source.sources.TopicSource; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class KafkaConnectorTests { + private static final String GROUP_ID = "test-default"; + private KafkaConnector connector; + + @Before + public void setup() throws Exception { + connector = new KafkaConnector("blah", "blah", GROUP_ID); + } + + @Test + public void testQueueMapping() { + final String topicName = "test-topic"; + + final GroupedTopicSource mappedSource = connector.mapToGroupedTopic(new QueueSource(topicName)); + Assert.assertEquals(new GroupedTopicSource(topicName, GROUP_ID), mappedSource); + } + + @Test + public void testTopicMapping() { + final String topicName = "test-topic"; + final TopicSource consumerSource = new TopicSource(topicName); + consumerSource.setContextClass(FakeConsumer.class); + + final GroupedTopicSource mappedConsumerSource = connector.mapToGroupedTopic(consumerSource); + Assert.assertEquals(new GroupedTopicSource(topicName, "test-default-FakeConsumer"), mappedConsumerSource); + } + + @Test + public void testDefaultTopicMapping() { + final String topicName = "test-topic"; + final TopicSource anonymousSource = new TopicSource(topicName); + + final GroupedTopicSource mappedConsumerSource = connector.mapToGroupedTopic(anonymousSource); + Assert.assertEquals(new GroupedTopicSource(topicName, GROUP_ID), mappedConsumerSource); + } + + @Test + public void testGroupedTopicMapping() { + final String topicName = "test-topic"; + final String groupId = "test-group"; + final GroupedTopicSource unmappedSource = new GroupedTopicSource(topicName, groupId); + + final GroupedTopicSource mappedSource = connector.mapToGroupedTopic(unmappedSource); + Assert.assertEquals(unmappedSource, mappedSource); + } + + private final class FakeConsumer {} +} diff --git a/core/src/main/java/forklift/source/sources/GroupedTopicSource.java b/core/src/main/java/forklift/source/sources/GroupedTopicSource.java index 9377e92..0d961dc 100644 --- a/core/src/main/java/forklift/source/sources/GroupedTopicSource.java +++ b/core/src/main/java/forklift/source/sources/GroupedTopicSource.java @@ -50,6 +50,11 @@ public void overrideGroup(String group) { this.group = group; } + @Override + public int hashCode() { + return Objects.hash(name, group); + } + @Override public boolean equals(Object o) { if (this == o)