Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error in onPartitionsAssigned in parallel consumer #326

Open
milansanjeev opened this issue Jul 4, 2022 · 14 comments
Open

Error in onPartitionsAssigned in parallel consumer #326

milansanjeev opened this issue Jul 4, 2022 · 14 comments

Comments

@milansanjeev
Copy link

milansanjeev commented Jul 4, 2022

Hi Team,

I want to use parallel consumer in one of over spring service to process kafka stream.
I am using core parallel-consumer-core 0.5.1.0 but getting below exception. We are secured kafka clsuter.


2022-07-04 15:14:49.224 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED ERROR 39138 --- [ pc-broker-poll] i.c.p.state.PartitionStateManager        : Error in onPartitionsAssigned

java.lang.RuntimeException: Unexpected magic: 1
	at io.confluent.parallelconsumer.offsets.OffsetEncoding.decode(OffsetEncoding.java:52)
	at io.confluent.parallelconsumer.offsets.EncodedOffsetPair.unwrap(EncodedOffsetPair.java:75)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodeCompressedOffsets(OffsetMapCodecManager.java:229)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:161)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:151)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodePartitionState(OffsetMapCodecManager.java:165)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.lambda$loadPartitionStateForAssignment$0(OffsetMapCodecManager.java:128)
	at java.base/java.util.HashMap.forEach(HashMap.java:1425)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.loadPartitionStateForAssignment(OffsetMapCodecManager.java:125)
	at io.confluent.parallelconsumer.state.PartitionStateManager.onPartitionsAssigned(PartitionStateManager.java:107)
	at io.confluent.parallelconsumer.state.WorkManager.onPartitionsAssigned(WorkManager.java:98)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.onPartitionsAssigned(AbstractParallelEoSStreamProcessor.java:360)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:449)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
	at io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.java:54)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:183)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:140)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.controlLoop(BrokerPollSystem.java:116)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)

2022-07-04 15:14:49.224 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED ERROR 39138 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=cre_impulse, groupId=cre_impulse] User provided listener io.confluent.parallelconsumer.ParallelEoSStreamProcessor failed on invocation of onPartitionsAssigned for partitions [commerce_order_eg_domain_event_v3-2, commerce_order_eg_domain_event_v3-1, commerce_order_eg_domain_event_v3-0]

java.lang.RuntimeException: Unexpected magic: 1
	at io.confluent.parallelconsumer.offsets.OffsetEncoding.decode(OffsetEncoding.java:52)
	at io.confluent.parallelconsumer.offsets.EncodedOffsetPair.unwrap(EncodedOffsetPair.java:75)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodeCompressedOffsets(OffsetMapCodecManager.java:229)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:161)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:151)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodePartitionState(OffsetMapCodecManager.java:165)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.lambda$loadPartitionStateForAssignment$0(OffsetMapCodecManager.java:128)
	at java.base/java.util.HashMap.forEach(HashMap.java:1425)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.loadPartitionStateForAssignment(OffsetMapCodecManager.java:125)
	at io.confluent.parallelconsumer.state.PartitionStateManager.onPartitionsAssigned(PartitionStateManager.java:107)
	at io.confluent.parallelconsumer.state.WorkManager.onPartitionsAssigned(WorkManager.java:98)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.onPartitionsAssigned(AbstractParallelEoSStreamProcessor.java:360)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:449)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
	at io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.java:54)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:183)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:140)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.controlLoop(BrokerPollSystem.java:116)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)

2022-07-04 15:14:49.224 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED ERROR 39138 --- [ pc-broker-poll] i.c.p.internal.BrokerPollSystem          : Unknown error

org.apache.kafka.common.KafkaException: User rebalance callback throws an error
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:449)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
	at io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.java:54)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:183)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:140)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.controlLoop(BrokerPollSystem.java:116)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.RuntimeException: Unexpected magic: 1
	at io.confluent.parallelconsumer.offsets.OffsetEncoding.decode(OffsetEncoding.java:52)
	at io.confluent.parallelconsumer.offsets.EncodedOffsetPair.unwrap(EncodedOffsetPair.java:75)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodeCompressedOffsets(OffsetMapCodecManager.java:229)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:161)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:151)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodePartitionState(OffsetMapCodecManager.java:165)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.lambda$loadPartitionStateForAssignment$0(OffsetMapCodecManager.java:128)
	at java.base/java.util.HashMap.forEach(HashMap.java:1425)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.loadPartitionStateForAssignment(OffsetMapCodecManager.java:125)
	at io.confluent.parallelconsumer.state.PartitionStateManager.onPartitionsAssigned(PartitionStateManager.java:107)
	at io.confluent.parallelconsumer.state.WorkManager.onPartitionsAssigned(WorkManager.java:98)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.onPartitionsAssigned(AbstractParallelEoSStreamProcessor.java:360)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430)
	... 15 common frames omitted

2022-07-04 15:14:51.489 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=cre_impulse, groupId=cre_impulse] Sending Heartbeat request with generation 15 and member id cre_impulse-f929f15d-aab5-4aaa-a7ed-0a821c208b03 to coordinator kafka-1g-us-east-1.egdp-test.aws.away.black:11302 (id: 2147483615 rack: null)
2022-07-04 15:14:51.490 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=cre_impulse, groupId=cre_impulse] Sending HEARTBEAT request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=cre_impulse, correlationId=8) and timeout 30000 to node 2147483615: HeartbeatRequestData(groupId='cre_impulse', generationId=15, memberId='cre_impulse-f929f15d-aab5-4aaa-a7ed-0a821c208b03', groupInstanceId=null)
2022-07-04 15:14:51.902 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=cre_impulse, groupId=cre_impulse] Received HEARTBEAT response from node 2147483615 for request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=cre_impulse, correlationId=8): HeartbeatResponseData(throttleTimeMs=0, errorCode=0)
2022-07-04 15:14:51.903 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=cre_impulse, groupId=cre_impulse] Received successful Heartbeat response
2022-07-04 15:14:51.997 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Mailbox results returned null, indicating timeToBlockFor (which was set as PT4.998615S)
2022-07-04 15:14:51.997 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Should commit this cycle? shouldCommitNow? true : shouldDoANormalCommit? true, commitFrequencyOK? true, lingerBeneficial? false, isCommandedToCommit? false
2022-07-04 15:14:51.997 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Committing offsets that are ready...
2022-07-04 15:14:51.997 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Committing offsets that are ready...
2022-07-04 15:14:51.997 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] i.c.p.internal.ConsumerOffsetCommitter   : Async commit to be requested
2022-07-04 15:14:51.998 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED ERROR 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Error from poll control thread, will attempt controlled shutdown, then rethrow. Error: Error in BrokerPollSystem system.

io.confluent.parallelconsumer.internal.InternalRuntimeError: Error in BrokerPollSystem system.
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.supervise(BrokerPollSystem.java:100)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.controlLoop(AbstractParallelEoSStreamProcessor.java:694)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:630)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.KafkaException: User rebalance callback throws an error
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.supervise(BrokerPollSystem.java:98)
	... 7 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: User rebalance callback throws an error
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:449)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
	at io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.java:54)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:183)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:140)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.controlLoop(BrokerPollSystem.java:116)
	... 5 common frames omitted
Caused by: java.lang.RuntimeException: Unexpected magic: 1
	at io.confluent.parallelconsumer.offsets.OffsetEncoding.decode(OffsetEncoding.java:52)
	at io.confluent.parallelconsumer.offsets.EncodedOffsetPair.unwrap(EncodedOffsetPair.java:75)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodeCompressedOffsets(OffsetMapCodecManager.java:229)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:161)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:151)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodePartitionState(OffsetMapCodecManager.java:165)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.lambda$loadPartitionStateForAssignment$0(OffsetMapCodecManager.java:128)
	at java.base/java.util.HashMap.forEach(HashMap.java:1425)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.loadPartitionStateForAssignment(OffsetMapCodecManager.java:125)
	at io.confluent.parallelconsumer.state.PartitionStateManager.onPartitionsAssigned(PartitionStateManager.java:107)
	at io.confluent.parallelconsumer.state.WorkManager.onPartitionsAssigned(WorkManager.java:98)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.onPartitionsAssigned(AbstractParallelEoSStreamProcessor.java:360)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430)
	... 15 common frames omitted

2022-07-04 15:14:51.998 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Starting close process (state: running)...
2022-07-04 15:14:51.998 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Shutting down execution pool...
2022-07-04 15:14:51.998 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Awaiting worker pool termination...
2022-07-04 15:14:51.998 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Still interrupted
2022-07-04 15:14:51.998 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Worker pool terminated.
2022-07-04 15:14:51.998 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Blocking normally until next commit time of PT4.998802S
2022-07-04 15:14:51.998 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Blocking poll on work until next scheduled offset commit attempt for PT4.998802S. active threads: 0, queue: 0
2022-07-04 15:14:54.573 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=cre_impulse, groupId=cre_impulse] Sending Heartbeat request with generation 15 and member id cre_impulse-f929f15d-aab5-4aaa-a7ed-0a821c208b03 to coordinator kafka-1g-us-east-1.egdp-test.aws.away.black:11302 (id: 2147483615 rack: null)
2022-07-04 15:14:54.574 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=cre_impulse, groupId=cre_impulse] Sending HEARTBEAT request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=cre_impulse, correlationId=9) and timeout 30000 to node 2147483615: HeartbeatRequestData(groupId='cre_impulse', generationId=15, memberId='cre_impulse-f929f15d-aab5-4aaa-a7ed-0a821c208b03', groupInstanceId=null)
2022-07-04 15:14:54.984 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=cre_impulse, groupId=cre_impulse] Received HEARTBEAT response from node 2147483615 for request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=cre_impulse, correlationId=9): HeartbeatResponseData(throttleTimeMs=0, errorCode=0)
2022-07-04 15:14:54.985 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=cre_impulse, groupId=cre_impulse] Received successful Heartbeat response
2022-07-04 15:14:57.001 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Mailbox results returned null, indicating timeToBlockFor (which was set as PT4.998802S)
2022-07-04 15:14:57.001 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Committing offsets that are ready...
2022-07-04 15:14:57.002 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Committing offsets that are ready...
2022-07-04 15:14:57.002 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] i.c.p.internal.ConsumerOffsetCommitter   : Async commit to be requested
2022-07-04 15:14:57.002 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Closing and waiting for broker poll system...
2022-07-04 15:14:57.002 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] i.c.p.internal.BrokerPollSystem          : Requesting broker polling system to close...
2022-07-04 15:14:57.002 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] i.c.p.internal.BrokerPollSystem          : Poller transitioning to closing, waking up consumer
2022-07-04 15:14:57.002 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] i.c.p.internal.BrokerPollSystem          : Wait for loop to finish ending...
2022-07-04 15:14:57.002 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED ERROR 39138 --- [     pc-control] i.c.p.internal.BrokerPollSystem          : Execution or timeout exception waiting for broker poller thread to finish

java.util.concurrent.ExecutionException: org.apache.kafka.common.KafkaException: User rebalance callback throws an error
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.closeAndWait(BrokerPollSystem.java:244)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:506)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:633)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.kafka.common.KafkaException: User rebalance callback throws an error
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:449)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
	at io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.java:54)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:183)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:140)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.controlLoop(BrokerPollSystem.java:116)
	... 5 common frames omitted
Caused by: java.lang.RuntimeException: Unexpected magic: 1
	at io.confluent.parallelconsumer.offsets.OffsetEncoding.decode(OffsetEncoding.java:52)
	at io.confluent.parallelconsumer.offsets.EncodedOffsetPair.unwrap(EncodedOffsetPair.java:75)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodeCompressedOffsets(OffsetMapCodecManager.java:229)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:161)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:151)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodePartitionState(OffsetMapCodecManager.java:165)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.lambda$loadPartitionStateForAssignment$0(OffsetMapCodecManager.java:128)
	at java.base/java.util.HashMap.forEach(HashMap.java:1425)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.loadPartitionStateForAssignment(OffsetMapCodecManager.java:125)
	at io.confluent.parallelconsumer.state.PartitionStateManager.onPartitionsAssigned(PartitionStateManager.java:107)
	at io.confluent.parallelconsumer.state.WorkManager.onPartitionsAssigned(WorkManager.java:98)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.onPartitionsAssigned(AbstractParallelEoSStreamProcessor.java:360)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430)
	... 15 common frames omitted

2022-07-04 15:14:57.641 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=cre_impulse, groupId=cre_impulse] Sending Heartbeat request with generation 15 and member id cre_impulse-f929f15d-aab5-4aaa-a7ed-0a821c208b03 to coordinator kafka-1g-us-east-1.egdp-test.aws.away.black:11302 (id: 2147483615 rack: null)
2022-07-04 15:14:57.642 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=cre_impulse, groupId=cre_impulse] Sending HEARTBEAT request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=cre_impulse, correlationId=10) and timeout 30000 to node 2147483615: HeartbeatRequestData(groupId='cre_impulse', generationId=15, memberId='cre_impulse-f929f15d-aab5-4aaa-a7ed-0a821c208b03', groupInstanceId=null)
@milansanjeev milansanjeev changed the title Error in onPartitionsAssigned kakfa consumer Error in onPartitionsAssigned in parallel consumer Jul 4, 2022
@astubbs
Copy link
Contributor

astubbs commented Jul 4, 2022

Hi there! :) Welcome to the project…
Have you used this consumer group Id before? If so, with what?
Can you try a fresh unique consumer group id?

@milansanjeev
Copy link
Author

milansanjeev commented Jul 4, 2022

Thank you so much @astubbs.

Yes the consumer group id was used earlier with kstream.
I have changed it to new unique and error get resolved.

But now I am getting below error and after this error my application get closed.

2022-07-04 18:10:41.816 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [ pc-broker-poll] i.c.p.internal.BrokerPollSystem          : Long polling broker with timeout PT2S, might appear to sleep here if subs are paused, or no data available on broker. Run state: running
2022-07-04 18:10:41.816 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [ pc-broker-poll] i.c.p.internal.ConsumerManager           : Poll starting with timeout: PT2S
2022-07-04 18:10:41.817 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED ERROR 57504 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Error from poll control thread, will attempt controlled shutdown, then rethrow. Error: null

java.lang.NullPointerException: null
	at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1693)
	at io.confluent.parallelconsumer.state.ShardManager.addWorkContainer(ShardManager.java:139)
	at io.confluent.parallelconsumer.state.PartitionStateManager.maybeRegisterNewRecordAsWork(PartitionStateManager.java:361)
	at io.confluent.parallelconsumer.state.PartitionStateManager.maybeRegisterNewRecordAsWork(PartitionStateManager.java:339)
	at io.confluent.parallelconsumer.state.WorkManager.registerWork(WorkManager.java:128)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.processWorkCompleteMailBox(AbstractParallelEoSStreamProcessor.java:953)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.controlLoop(AbstractParallelEoSStreamProcessor.java:671)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:630)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)

2022-07-04 18:10:41.817 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Starting close process (state: running)...
2022-07-04 18:10:41.817 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Shutting down execution pool...
2022-07-04 18:10:41.817 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Awaiting worker pool termination...
2022-07-04 18:10:41.817 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Still interrupted
2022-07-04 18:10:41.817 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Worker pool terminated.
2022-07-04 18:10:41.817 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Blocking normally until next commit time of PT1.237956S
2022-07-04 18:10:41.817 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Blocking poll on work until next scheduled offset commit attempt for PT1.237956S. active threads: 0, queue: 0
2022-07-04 18:10:42.618 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [ pc-broker-poll] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=cre_impulse, groupId=CRE_IMPULSE_USE] Received FETCH response from node 12 for request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=cre_impulse, correlationId=78): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='commerce_order_eg_domain_event_v3', partitionResponses=[FetchablePartitionResponse(partition=1, errorCode=0, highWatermark=514020, lastStableOffset=514020, logStartOffset=489163, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, recordSet=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=3]))])])
2022-07-04 18:10:42.618 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [ pc-broker-poll] o.a.kafka.clients.FetchSessionHandler    : [Consumer clientId=cre_impulse, groupId=CRE_IMPULSE_USE] Node 12 sent a full fetch response with 1 response partition(s)
2022-07-04 18:10:42.618 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [ pc-broker-poll] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=cre_impulse, groupId=CRE_IMPULSE_USE] Fetch READ_UNCOMMITTED at offset 514020 for partition commerce_order_eg_domain_event_v3-1 returned fetch data (error=NONE, highWaterMark=514020, lastStableOffset = 514020, logStartOffset = 489163, preferredReadReplica = absent, abortedTransactions = null, divergingEpoch =Optional.empty, recordsSizeInBytes=0)
2022-07-04 18:10:42.618 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [ pc-broker-poll] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=cre_impulse, groupId=CRE_IMPULSE_USE] Added READ_UNCOMMITTED fetch request for partition commerce_order_eg_domain_event_v3-1 at position F

@astubbs
Copy link
Contributor

astubbs commented Jul 4, 2022

Are you using KEY ordering and have records with NULL keys? If so:

#318 is also fixed in:

Which is at the top of the merge queue (I just got back from PTO and some other stuff, so there's a bit of domino queue of new stuff to get merged :)

@astubbs
Copy link
Contributor

astubbs commented Jul 5, 2022

Progress?

@milansanjeev
Copy link
Author

Yes @astubbs

The issue has been resolved after making an UNORDERED ordering.

@astubbs
Copy link
Contributor

astubbs commented Jul 5, 2022

Ok, good to hear! Do you know if you're using null keys? Just want to confirm it is indeed the same issue.

@milansanjeev
Copy link
Author

@astubbs yes you were right. Key was null in my case.

@astubbs astubbs closed this as not planned Won't fix, can't repro, duplicate, stale Jul 5, 2022
@colinkuo
Copy link

colinkuo commented Jan 12, 2023

If we enable PC and reuse the existing consumer group, is that possible to avoid the error, such as Unexpected magic: 2.
To create a new consumer group will require copying the offsets from previous consumer group. Otherwise, we will lose or duplicate the messages during the migration

Any ideas or suggestions? Thanks!

@astubbs
Copy link
Contributor

astubbs commented Jan 12, 2023

What's the use case for reusing the group? Is it to try to use the same offsets?

Interesting.

So we have two situations:

  1. mistaken use of same group
  2. required use of same group

How should we distinguish between the two?

Since 2 is probably the exception, how about an option to explicitly "ignore" any existing data in the metadata payload? Which is off my default?

cc @nachomdo

@astubbs astubbs reopened this Jan 12, 2023
@colinkuo
Copy link

Thanks for the quick response @astubbs

Yes, we wanted to explore whether we can reuse the same offsets. I'd consider that It's similar to a new completely different Kafka client application using an existing consumer group. Even it's a different Kafka client application, it doesn't prevent the new application from reusing the offsets. PC uses Kafka client under the hood, which could be acting the same behavior like a native Kafka client.

My two cents

@astubbs
Copy link
Contributor

astubbs commented Jan 12, 2023

It's absolutely no problem to use the same offsets as far as PC is concerned. I just had assumed it would be a mistake :)

We'll probably add an option that you have to turn on, for it to be ignored (although it'd only be encountered once).

@colinkuo
Copy link

When you say to add an option for it to be ignored, does it mean the we don't see the error once we turn the option on. Also, PC will resume consuming the messages from the existing offsets of the consumer group? Thanks!

@astubbs
Copy link
Contributor

astubbs commented Jan 13, 2023

we don't see the error once we turn the option o

yes, just a warning. and it'll only show the first time, going forward it wouldn't show after PC installs it's own metadata.

Also, PC will resume consuming the messages from the existing offsets of the consumer group?

yup!

@colinkuo
Copy link

we don't see the error once we turn the option o

yes, just a warning. and it'll only show the first time, going forward it wouldn't show after PC installs it's own metadata.

Also, PC will resume consuming the messages from the existing offsets of the consumer group?

yup!

That would be great! Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants