-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed bug with adding a removed topic #127
Conversation
@Test | ||
public void topicTurnoverTest() throws StartupException, ConnectorException, InterruptedException, ProducerException { | ||
|
||
String topic = "forklift-string-topic"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You declare this "constant" but don't use it
for(int i = 0; i < 10; i++){ | ||
this.controller.addTopic(topic1); | ||
} | ||
assertTrue(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems a bit redundant; if you want to clarify this test, an comment would be more expressive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. This is a bad test as if it locks you have to kill the test... I'll rework it.
public void addingTheSameTopicRepeatedlyDoesntLock() throws InterruptedException { | ||
|
||
this.controller.start(); | ||
String topic1 = "topic1"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MInor: could be final
*/ | ||
@Test | ||
public void topicTurnoverTest() throws StartupException, ConnectorException, InterruptedException, ProducerException { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your use of newlines like this seems inconsistent in this PR; I personally don't like this style, but 🤷♂️
@@ -18,7 +18,7 @@ | |||
* Tests which focus on causing partitions to be rebalanced. | |||
*/ | |||
public class RebalanceTests extends BaseIntegrationTest { | |||
|
|||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: indentation
if (topicToRemove != null) { | ||
topics.remove(topicToRemove); | ||
messageStream.removeTopic(topicToRemove); | ||
Set<TopicPartition> removed = kafkaConsumer.assignment().stream().filter(partition -> !topics.contains(partition.topic())).collect(Collectors.toSet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it not equivalent to check partition.topic() == topicToRemove
instead of !topics.contains(partition.topic())
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch
acknowledgmentHandler.removePartitions(kafkaConsumer.assignment()); | ||
private boolean processTopicChanges() throws InterruptedException { | ||
boolean topicsChanged = processTopicAdd(); | ||
topicsChanged = processTopicRemoved() || topicsChanged; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that you're trying to avoid short-circuit evaluation; I think this would look clearer as two variable declarations. E.g.
boolean topicsAdded = processTopicAdd();
boolean topicsRemoved = processTopicRemoved();
if (topicsAdded || topicsRemoved) {
kafkaConsumer.subscribe(topics, new RebalanceListener());
}
At the very least, I think a comment is warranted for clarity, since it isn't really intuitively obvious why the code is structured this way and, the processTopicAdd()
and processTopicRemoved()
methods do have symmetric roles.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was a little concerned at first about the efficiency of the synchronization around topics, but considering that adding and removing topics should be relatively uncommon operations. Even if there is a better synchronization method, it's a very minor issue.
There are only a few minor style-type issues to address. Once those are handled I will approve.
wip/forklift-kafka-incremental-controller removes the need for this bug fix. |
This fixes a bug with adding a topic which was previously removed.