Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Connector: Consumer Topic + Grouped Topic Support #139

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Prev Previous commit
Add tests for that sources are mapped to GroupedTopics correctly
  • Loading branch information
Kuroshii committed Mar 13, 2018
commit 20b3900e123f5031a3dda0333dfc7b844e5114cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package forklift.connectors;

import forklift.source.sources.GroupedTopicSource;
import forklift.source.sources.QueueSource;
import forklift.source.sources.TopicSource;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KafkaConnectorTests {
private static final String GROUP_ID = "test-default";
private KafkaConnector connector;

@Before
public void setup() throws Exception {
connector = new KafkaConnector("blah", "blah", GROUP_ID);
}

@Test
public void testQueueMapping() {
final String topicName = "test-topic";

final GroupedTopicSource mappedSource = connector.mapToGroupedTopic(new QueueSource(topicName));
Assert.assertEquals(new GroupedTopicSource(topicName, GROUP_ID), mappedSource);
}

@Test
public void testTopicMapping() {
final String topicName = "test-topic";
final TopicSource consumerSource = new TopicSource(topicName);
consumerSource.setContextClass(FakeConsumer.class);

final GroupedTopicSource mappedConsumerSource = connector.mapToGroupedTopic(consumerSource);
Assert.assertEquals(new GroupedTopicSource(topicName, "test-default-FakeConsumer"), mappedConsumerSource);
}

@Test
public void testDefaultTopicMapping() {
final String topicName = "test-topic";
final TopicSource anonymousSource = new TopicSource(topicName);

final GroupedTopicSource mappedConsumerSource = connector.mapToGroupedTopic(anonymousSource);
Assert.assertEquals(new GroupedTopicSource(topicName, GROUP_ID), mappedConsumerSource);
}

@Test
public void testGroupedTopicMapping() {
final String topicName = "test-topic";
final String groupId = "test-group";
final GroupedTopicSource unmappedSource = new GroupedTopicSource(topicName, groupId);

final GroupedTopicSource mappedSource = connector.mapToGroupedTopic(unmappedSource);
Assert.assertEquals(unmappedSource, mappedSource);
}

private final class FakeConsumer {}
}