Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wip/forklift kafka incremental controller #132

Merged
merged 16 commits into from
Jun 2, 2017

Conversation

AFrieze
Copy link
Collaborator

@AFrieze AFrieze commented May 24, 2017

Overview
Bridger discovered that calls to subscribe reset the fetch positions of topics. Additionally, there are some concerns that the topic partitions assigned for topics which do not change from subscription to subscription may be modified if the subscription occurs during some form of rebalance event. Although it should be possible to keep the existing approach, it was much simpler to simply move to a controller per topic setup. We may wish to re-visit this in the future should performance become an issue.

@AFrieze AFrieze assigned AFrieze, Kuroshii and dcshock and unassigned AFrieze May 24, 2017
if (controller == null || !controller.isRunning()) {
controller = createController();
KafkaController controller = controllers.get(name);
if(controller != null && controller.isRunning()){
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: spacing

});
// Start the consumer.
c1.listen();
Thread.sleep(3000);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this sleep? I'm fairly certain that Consumer.listen is blocking.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the sleep is to ensure that the consumer shuts down before the next consumer starts up.

Copy link
Collaborator

@Kuroshii Kuroshii May 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see... the consumer is done processing messages, but you want to make sure that the close() is done being called on the previous KafkaTopicConsumer before getting a new KafkaTopicConsumer to read from, because otherwise you could make the new consumer using the old controller (and old MessageStream)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Now that we can't add and remove from the same controller this test is a little less relevant although I think it's worth leaving it in.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking some more, I guess I'd blame this being weird on Consumer.shutdown being a signal rather than an synchronous action that completes when the consumer is truly "shutdown" or a asynchronous task with a CompletableFuture. A better name for what it's doing is signalShutdown. 🤷‍♂️

@Kuroshii
Copy link
Collaborator

Assuming this works, this should remove the need for your other bugfix PR?

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the added imports don't appear to be used

running = false;
kafkaConsumer.wakeup();
// executor.shutdownNow();
executor.shutdown();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we even need shutdown here? The kafkaConsumer.wakeup() call should be sufficient to make the message loop thread terminate.

The difference between the two shutdown methods is that shutdown prevents incoming tasks from being queued on the executor (which shouldn't matter here since you only call start once on a KafkaController), while shutdown sends a signal Thread.interrupt to it's threads to let them know that they should quit soon.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you meant shutdownNow causes an interrupt? I originally thought that I wouldn't need either of the shutdown calls as well when I did this refactor, but discovered that the awaitTermination requires a shutdown call of some sort to function. I'll remove the commented shutdownNow

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, whoops.

Hmm, maybe the designers wanted to be able to be sure that when awaitTermination is called that no new threads are going to be executed (because that makes the check more difficult than just a bunch of Thread.join calls). Seems good then.

@@ -184,6 +141,10 @@ private void controlLoop() {
throw t;
} finally {
running = false;
synchronized (this) {
//wake up any threads waiting on a control loop operation
this.notifyAll();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any waits to be woken up by this notify?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, with the removal of the add/remove topic methods this is no longer needed.

sentMessageIds.add(producer.send("message1"));
final Consumer c1 = new Consumer(StringConsumer.class, forklift);
// Shutdown the consumer after all the messages have been processed.
c1.setOutOfMessages((listener) -> {
Copy link
Collaborator

@Kuroshii Kuroshii May 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully sending just one message doesn't need timeouts for CI... 🤞

Copy link
Collaborator

@Kuroshii Kuroshii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good; much simpler than it was before 👍

@Kuroshii
Copy link
Collaborator

Kuroshii commented May 25, 2017

Wait, what about supporting the new GroupedTopic stuff? Make another PR after we merge both PRs?

The difference is that you can only reuse the given KafkaController if it has the same consumer group and the same topic as the GroupedTopic being requested. In essence, I think you'd a map from GroupedTopicSource to KafkaController (probably need to a hashCode to be able to do that though).

@Kuroshii Kuroshii merged commit b4eb361 into develop Jun 2, 2017
@Kuroshii Kuroshii deleted the wip/forklift-kafka-incremental-controller branch June 9, 2017 21:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants