Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Low sendTimeout when producing to kafka can cause reprocessing and duplicates #120

Closed
JorgenRingen opened this issue Jun 22, 2021 · 2 comments
Assignees

Comments

@JorgenRingen
Copy link
Contributor

JorgenRingen commented Jun 22, 2021

Noticed this timeout from time to time:

io.confluent.parallelconsumer.InternalRuntimeError: java.util.concurrent.TimeoutException: Timeout after waiting for 2000 ms.
	at io.confluent.parallelconsumer.ProducerManager.produceMessage(ProducerManager.java:145) ~[parallel-consumer-core-0.3.0.2.jar:?]
	at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.lambda$pollAndProduceMany$6(ParallelEoSStreamProcessor.java:403) ~[parallel-consumer-core-0.3.0.2.jar:?]
	at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.userFunctionRunner(ParallelEoSStreamProcessor.java:997) ~[parallel-consumer-core-0.3.0.2.jar:?]
	at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.lambda$submitWorkToPool$14(ParallelEoSStreamProcessor.java:968) ~[parallel-consumer-core-0.3.0.2.jar:?]
	at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
	at java.lang.Thread.run(Unknown Source) [?:?]
	Caused by: java.util.concurrent.TimeoutException: Timeout after waiting for 2000 ms.

This happens especially often during restarts of kafka-brokers and network issues as Producer#send can take some time to return.

The timeout in ProducerManager#sendTimeoutSeconds is hardcoded to 2 seconds (https://github.com/confluentinc/parallel-consumer/blob/master/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ProducerManager.java#L45), which causes timeouts if Producer#send takes more than 2 sec (https://github.com/confluentinc/parallel-consumer/blob/master/parallel-consumer-co[…]rc/main/java/io/confluent/parallelconsumer/ProducerManager.java).

This has some side-effects:

  • records will be re-processed inside consumer-function if the timeout occurs
  • the output-records from the consumer-function can be sent multiple times to kafka as the 2sec-timeout isn’t aligned with the timeout(s) in KafkaProducer (delivery.timeout.ms is 2 minutes by default for example). KafkaProducer#send will likely succeed even if the timeout occurs.

Some suggestions:

  • increase timeout or make it configurable
  • rely on the timeout-settings of the kafka-producer (delivery.timeout.ms probably?) instead of using a separate timeout.

Related epic: #65

@JorgenRingen JorgenRingen changed the title Low sendTimeout causes retries/duplicates when producing messages to kafka Low sendTimeout when producing to kafka can cause reprocessing and duplicates Jun 22, 2021
@JorgenRingen
Copy link
Contributor Author

PR: #121

@astubbs
Copy link
Contributor

astubbs commented Oct 7, 2021

@JorgenRingen I think this is addressed now with #121 so will close.

@astubbs astubbs closed this as completed Oct 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants