-
I got my application stuck once because the parallel-consumer auto close himself without throwing an exception, so other threads where still running while the parallel-consumer was stopped. Which is something I want to avoid ;) My current solution is to launch a thread to check regularly if the ParallelEoSStreamProcessor isClosedOrFailed. => How to correctly close a StreamsApp with a parallel-consumer ? For example : replacing the ParallelStreamProcessor used in StreamsApp by a this failing implementation : public class MyParallelEoSStreamProcessor<K, V> extends ParallelEoSStreamProcessor<K, V> {
public MyParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptions) {
super(newOptions);
}
@Override
protected void controlLoop(Function userFunction, Consumer callback) throws TimeoutException, ExecutionException, InterruptedException {
throw new ExecutionException(new TimeoutException("example error"));
}
} Will lead to these logs and a never closing app
PS launching an |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
Hi, I have tried to reproduce and yes - i noticed that if controlLoop dies - the exception is not thrown to main thread until close() is called - as state of controlTaskFutureResult is not checked until then - this is due to the fact that PC runs asynchronously - it is not possible to throw out the exception from background thread to main thread without blocking (or periodic checking etc). Given that execution is asynchronous - i am not sure of the top of my head what the best approach would be to monitor it - either as you mentioned - create a thread that checks status of the parallel consumer periodically add and expose an listener hook that is triggered on close / exception... Afaik the behaviour is similar to Kafka Streams actually - as it runs in async thread as well - so it cannot really throw out an exception to main thread on error. Feel free to raise an issue if you have a suggestion on how it could be improved. |
Beta Was this translation helpful? Give feedback.
Hi,
Excuse me for delay in looking into this.
I have tried to reproduce and yes - i noticed that if controlLoop dies - the exception is not thrown to main thread until close() is called - as state of controlTaskFutureResult is not checked until then - this is due to the fact that PC runs asynchronously - it is not possible to throw out the exception from background thread to main thread without blocking (or periodic checking etc).
Given that execution is asynchronous - i am not sure of the top of my head what the best approach would be to monitor it - either as you mentioned - create a thread that checks status of the parallel consumer periodically add and expose an listener hook that is …