Skip to content

Commit

Permalink
Retrieve controllers according to topic and group name
Browse files Browse the repository at this point in the history
  • Loading branch information
Kuroshii committed Mar 13, 2018
1 parent 77379c9 commit 4f5793a
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 28 deletions.
3 changes: 3 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ link:doc/forklift.adoc[Documentation]
== Releases
link:doc/prev_releases.adoc[Previous Releases]

* *Unreleased*
** Kafka: Add internal support for separate dedicated consumers on the same topic

* *January 17th 2017* - v3.1
** Upgrade kafka clients to use Kafka 0.11
** Fix bug in the kafka connector that made it not provide heartbeats in certain situations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,24 @@ public class KafkaConnector implements ForkliftConnectorI {

private final String kafkaHosts;
private final String schemaRegistries;
private final String groupId;
private final String defaultGroupId;

private KafkaProducer<?, ?> kafkaProducer;
private KafkaController controller;
private ForkliftSerializer serializer;
private Map<String, KafkaController> controllers = new HashMap<>();
private Map<GroupedTopicSource, KafkaController> controllers = new HashMap<>();

/**
* Constructs a new instance of the KafkaConnector
*
* @param kafkaHosts list of kafka servers in host:port,... format
* @param schemaRegistries list of schema registry servers in https://host:port,... format
* @param groupId the groupId to use when subscribing to topics
* @param defaultGroupId the default groupId to use when subscribing to topics
*/
public KafkaConnector(String kafkaHosts, String schemaRegistries, String groupId) {
public KafkaConnector(String kafkaHosts, String schemaRegistries, String defaultGroupId) {
this.kafkaHosts = kafkaHosts;
this.schemaRegistries = schemaRegistries;
this.groupId = groupId;
this.defaultGroupId = defaultGroupId;
this.serializer = new KafkaSerializer(this, newSerializer(), newDeserializer());
}

Expand Down Expand Up @@ -101,7 +101,7 @@ private KafkaProducer createKafkaProducer() {
return new KafkaProducer(producerProperties);
}

private KafkaController createController(String topicName) {
private KafkaController createController(String topicName, String groupId) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHosts);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
Expand Down Expand Up @@ -137,48 +137,41 @@ public synchronized void stop() throws ConnectorException {

@Override
public ForkliftConsumerI getConsumerForSource(SourceI source) throws ConnectorException {
return source
.apply(QueueSource.class, queue -> getQueue(queue.getName()))
.apply(TopicSource.class, topic -> getTopic(topic.getName()))
.apply(GroupedTopicSource.class, topic -> getGroupedTopic(topic))
.apply(RoleInputSource.class, roleSource -> {
final ForkliftConsumerI rawConsumer = getConsumerForSource(roleSource.getActionSource(this));
return new RoleInputConsumerWrapper(rawConsumer);
})
.elseUnsupportedError();
if (source instanceof RoleInputSource) {
final RoleInputSource roleSource = (RoleInputSource) source;
final ForkliftConsumerI rawConsumer = getConsumerForSource(roleSource.getActionSource(this));
return new RoleInputConsumerWrapper(rawConsumer);
}
return getGroupedTopic(mapToGroupedTopic(source));
}

public synchronized ForkliftConsumerI getGroupedTopic(GroupedTopicSource source) throws ConnectorException {
if (!source.groupSpecified()) {
source.overrideGroup(groupId);
source.overrideGroup(defaultGroupId);
}

if (!source.getGroup().equals(groupId)) { //TODO actually support GroupedTopics
throw new ConnectorException("Unexpected group '" + source.getGroup() + "'; only the connector group '" + groupId + "' is allowed");
}

KafkaController controller = controllers.get(source.getName());
KafkaController controller = controllers.get(source);
if (controller != null && controller.isRunning()) {
log.warn("Consumer for topic already exists under this controller's groupname. Messages will be divided amongst consumers.");
log.warn("Consumer for topic and group already exists. Messages will be divided amongst consumers.");
} else {
controller = createController(source.getName());
this.controllers.put(source.getName(), controller);
controller = createController(source.getName(), source.getGroup());
this.controllers.put(source, controller);
controller.start();
}

return new KafkaTopicConsumer(source.getName(), controller);
}

@Override
public ForkliftConsumerI getQueue(String name) throws ConnectorException {
return getGroupedTopic(new GroupedTopicSource(name, groupId));
return getGroupedTopic(mapToGroupedTopic(new QueueSource(name)));
}

@Override
public ForkliftConsumerI getTopic(String name) throws ConnectorException {
return getGroupedTopic(new GroupedTopicSource(name, groupId));
return getGroupedTopic(mapToGroupedTopic(new TopicSource(name)));
}


@Override
public ForkliftProducerI getQueueProducer(String name) {
return getTopicProducer(name);
Expand All @@ -200,11 +193,28 @@ public ActionSource mapSource(LogicalSource source) {
}

protected GroupedTopicSource mapRoleInputSource(RoleInputSource roleSource) {
return new GroupedTopicSource("forklift-role-" + roleSource.getRole(), groupId);
return new GroupedTopicSource("forklift-role-" + roleSource.getRole(), defaultGroupId);
}

@Override
public boolean supportsResponse() {
return true;
}

/* visible for testing */
protected GroupedTopicSource mapToGroupedTopic(SourceI source) {
return source
.apply(QueueSource.class, queueSource -> new GroupedTopicSource(queueSource.getName(), defaultGroupId))
.apply(TopicSource.class, topicSource -> topicToGroupedTopic(topicSource))
.apply(GroupedTopicSource.class, groupedTopicSource -> groupedTopicSource)
.elseUnsupportedError();
}

private GroupedTopicSource topicToGroupedTopic(TopicSource topicSource) {
if (topicSource.getContextClass() == null) {
return new GroupedTopicSource(topicSource.getName(), defaultGroupId);
}
final String groupName = defaultGroupId + "-" + topicSource.getContextClass().getSimpleName();
return new GroupedTopicSource(topicSource.getName(), groupName);
}
}

0 comments on commit 4f5793a

Please sign in to comment.