Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot exit and close when there are rebalancing storm #787

Closed
sangreal opened this issue Jun 4, 2024 · 6 comments
Closed

Cannot exit and close when there are rebalancing storm #787

sangreal opened this issue Jun 4, 2024 · 6 comments

Comments

@sangreal
Copy link
Contributor

sangreal commented Jun 4, 2024

Version: 0.5.2.5 (+ stale container fix (#623))

Scenario:
When using, some of the health check logic on our end will trigger rebalancing storm by re-initialization. The closing is constantly interrupted by wake-up interrupt by poller. And we met with below exception.

java.lang.InterruptedException: null
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
    at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
    at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:523)
    at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.controlLoop(AbstractParallelEoSStreamProcessor.java:706)
    at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:648)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Solution:

  1. We will improve the health check and eliminate the rebalancing storm.
  2. Meanwhile, the exit logic should also improve to handle these kind of scenario, as should calculate the waiting time for shutting down the workerThreadPool. Should not just reset the flag.
    as https://github.com/confluentinc/parallel-consumer/blob/master/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java#L6573.

I will draft a pr when I have time.

@rkolesnev
Copy link
Member

rkolesnev commented Jun 4, 2024

Hi @sangreal - can you please elaborate a bit more - what gets interrupted by which wake-up?

Are you observing - that workerThreadPool.get().awaitTermination(toSeconds(timeout), SECONDS) is interrupted by consumerManager.wakeup() calls in BrokerPoller?

@sangreal
Copy link
Contributor Author

sangreal commented Jun 4, 2024

Hi @rkolesnev since we were experiencing rebalancing storm. I think this should be from assigning partition.
But since we don't enable debug level log. I am not sure.
https://github.com/confluentinc/parallel-consumer/blob/master/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java#L446

@sangreal
Copy link
Contributor Author

sangreal commented Jun 4, 2024

But it is workerThreadPool.get().awaitTermination(toSeconds(timeout), SECONDS) be interrupted.

@rkolesnev
Copy link
Member

Ok, i see, thanks.

The interrupt / blocking thread handling always makes my head hurt a bit :)
So - i guess we can either add a check to notifySomethingToDo() method - if in Closing state (or not polling mailbox right now) - don't interrupt control thread - actually it used to have that check - there is still the atomicBoolean - currentlyPollingWorkCompleteMailBox but it is no longer being checked prior to interrupting the control thread.

Alternatively - wrap the awaitTermination() call in the loop with try / catch for interrupted exception to prevent finishing the wait early.

https://github.com/confluentinc/parallel-consumer/blob/cdaf7ccabc05bacbe0f25a81bbde021ac1459827/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java#L1402C17-L1402C36

@sangreal
Copy link
Contributor Author

sangreal commented Jun 4, 2024

@rkolesnev I think checking Closing state in notifySomethingToDo is the neat way to handle this.
If you don't mind I will draft a pr for this one.

sangreal added a commit to sangreal/parallel-consumer that referenced this issue Jun 5, 2024
rkolesnev pushed a commit that referenced this issue Jun 6, 2024
…orm (#789)

* fix issue #787 for cannot close and exit properly when rebalancing storm

* update changelog

* address comments
@rkolesnev
Copy link
Member

Closing as #789 is merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants