Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed bug with adding a removed topic #127

Closed
wants to merge 6 commits into from

Conversation

AFrieze
Copy link
Collaborator

@AFrieze AFrieze commented Apr 19, 2017

This fixes a bug with adding a topic which was previously removed.

@Test
public void topicTurnoverTest() throws StartupException, ConnectorException, InterruptedException, ProducerException {

String topic = "forklift-string-topic";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You declare this "constant" but don't use it

for(int i = 0; i < 10; i++){
this.controller.addTopic(topic1);
}
assertTrue(true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a bit redundant; if you want to clarify this test, an comment would be more expressive

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. This is a bad test as if it locks you have to kill the test... I'll rework it.

public void addingTheSameTopicRepeatedlyDoesntLock() throws InterruptedException {

this.controller.start();
String topic1 = "topic1";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MInor: could be final

*/
@Test
public void topicTurnoverTest() throws StartupException, ConnectorException, InterruptedException, ProducerException {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your use of newlines like this seems inconsistent in this PR; I personally don't like this style, but 🤷‍♂️

@@ -18,7 +18,7 @@
* Tests which focus on causing partitions to be rebalanced.
*/
public class RebalanceTests extends BaseIntegrationTest {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indentation

if (topicToRemove != null) {
topics.remove(topicToRemove);
messageStream.removeTopic(topicToRemove);
Set<TopicPartition> removed = kafkaConsumer.assignment().stream().filter(partition -> !topics.contains(partition.topic())).collect(Collectors.toSet());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not equivalent to check partition.topic() == topicToRemove instead of !topics.contains(partition.topic()) here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

acknowledgmentHandler.removePartitions(kafkaConsumer.assignment());
private boolean processTopicChanges() throws InterruptedException {
boolean topicsChanged = processTopicAdd();
topicsChanged = processTopicRemoved() || topicsChanged;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you're trying to avoid short-circuit evaluation; I think this would look clearer as two variable declarations. E.g.

boolean topicsAdded = processTopicAdd();
boolean topicsRemoved = processTopicRemoved();

if (topicsAdded || topicsRemoved) {
    kafkaConsumer.subscribe(topics, new RebalanceListener());
}

At the very least, I think a comment is warranted for clarity, since it isn't really intuitively obvious why the code is structured this way and, the processTopicAdd() and processTopicRemoved() methods do have symmetric roles.

Copy link
Collaborator

@Kuroshii Kuroshii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a little concerned at first about the efficiency of the synchronization around topics, but considering that adding and removing topics should be relatively uncommon operations. Even if there is a better synchronization method, it's a very minor issue.

There are only a few minor style-type issues to address. Once those are handled I will approve.

@AFrieze
Copy link
Collaborator Author

AFrieze commented May 25, 2017

wip/forklift-kafka-incremental-controller removes the need for this bug fix.

@AFrieze AFrieze closed this May 25, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants