Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: Does Parallel-consumer have state that we can read from? #484

Open
Ehud-Lev-Forter opened this issue Nov 27, 2022 · 10 comments
Open
Labels
question Further information is requested

Comments

@Ehud-Lev-Forter
Copy link
Contributor

Hi, We have noticed that during high load, one of the consumers can get stuck, meaning it will report the same lag, but will not process anything, we were not able to reproduce it locally, but we are afraid that it will happen again. 

In our kafka-stream process we supervise state to monitor the process, if we see an error or problem, we restart the process. Does ParallelConsumer have something similar?

@astubbs
Copy link
Contributor

astubbs commented Nov 27, 2022

Hi, no, not at the moment. But you can check on a per record basis - the record context in your user function, for failed processing counts. We’re developing the metrics and monitoring systems now:

What were the logs saying when this happened?
Are you checking the record context for failure attempts in your user function?
How did you resolve the situation? Just kill the pc that you thought was stuck?

Lag is a bit more complicated with PC. For example, if there’s a single poison pill that can’t process and your user function doesn’t give up retrying it, lag could show 1,000 but there may only be a single record that is “stuck”.

The new monitoring and metrics functions will allow you to see all this info.

Cc @nachomdo

@astubbs astubbs added the question Further information is requested label Nov 27, 2022
@Ehud-Lev-Forter
Copy link
Contributor Author

Ehud-Lev-Forter commented Nov 28, 2022

Hi, thank you for your response,
We did get this issue again, when it happened we saw many warning logs that looked like the following:
Warning: 1 records in the queue have been waiting longer than 10s for following topics [my-topic-2]. 
Important to mention that other processes also got those warnings but much less. 
We are using version 0.5.2.4 with the "reactor consumer" and till now I didn't notice the "record context", we used the context.streamConsumerRecords() instead. I will check it out and see if I can catch errors through that. 

As for now, the only way for us to resolve the issue is to track the lag (using external metrics) and restart the process. 

@astubbs
Copy link
Contributor

astubbs commented Nov 29, 2022

Are you logging any processing failures in your function?
Do you see any ERROR level events in the log?
If there really is a "stuck" record, it should still get given to the user function to attempt.

@Ehud-Lev-Forter
Copy link
Contributor Author

Yes, I am printing everything I can (error-info), I also trying to catch all exceptions and send records that has failed into different DLQs. I am printing the record.numberOfFailedAttempt but I did not get any of those yet. I had the issue again, and I restarted the process again, and I got the following warning after the restart:
Truncating state - removing records lower than 54845363. Offsets have been removed from the partition by the broker or committed offset has been raised. Bootstrap polled 54845363 but expected 1 from loaded commit data. Could be caused by record retention or compaction and offset reset policy LATEST.","logger_name":"io.confluent.parallelconsumer.state.PartitionState","thread_name":"pc-control","level":"WARN",

I am now trying to tackle the issue by adding some kind of supervisor that detects that this consumer is not really doing anything and kill it.

@astubbs
Copy link
Contributor

astubbs commented Nov 29, 2022

What is the head offset of that partition?
What is your auto offset reset policy set to?

@Ehud-Lev-Forter
Copy link
Contributor Author

Ehud-Lev-Forter commented Nov 29, 2022

I don't have the historical information, I can say that now ( 1.5 hours after the restart) it is
CURRENT-OFFSET LOG-END-OFFSET LAG
54203593 54203720 127
Must say that this does not make sense to me since it seems that it complained on future offset.

Update:
It might be that this log was talking on another partition, this process consumed from another partition as well: 55016222 55016324

As for the auto offset reset policy earliest

@astubbs
Copy link
Contributor

astubbs commented Dec 2, 2022

Update: It might be that this log was talking on another partition, this process consumed from another partition as well: 55016222 55016324

ah - does the log message not include the partition it's talking about?

@Ehud-Lev-Forter , if you get a chance, take a look at this PR and see if there's any other metrics you'd like?

#485

@astubbs
Copy link
Contributor

astubbs commented Dec 2, 2022

cc @nachomdo

@Ehud-Lev-Forter
Copy link
Contributor Author

Looks like we can benefit a lot from this PR as is.
As for other metrics, I am not sure if it makes sense, but I can think those:

  1. Throttling or rate limit, I saw that PC push back some work when things get slow.
  2. Maybe slowWork.size gauge.

As for the warn log, yes it is missing the partition log

@astubbs
Copy link
Contributor

astubbs commented Dec 5, 2022

1 - yup, that status will be in there
2 - oh - that's a good idea!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants