Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose manual stop method in KafkaReceiver #380

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

cjlee38
Copy link

@cjlee38 cjlee38 commented Jan 17, 2024

KafkaReceiver currently is not possible to close safely. An example is below:

val disposable = kafkaReceiver.receive()
    .flatMapSequential { record -> process(record).then(record) }
    .delayUntil { 
        it.receiverOffset().acknowledge() 
        it.receiverOffset().commit()
    }
    .subscribe()

dispose() method would be a way, but it might lead to unexpected result because of cancellation leading processed records not to be committed. I think This PR is discussed at #247, and also helps to address #378 indirectly.(People can customize KafkaReceiver's lifecycle after stopping followed by disposable.isDisposed()

I've considered several situations related to thread-safety or something, but might miss I didn't expect. Any further suggestions would be appreciated.

@cjlee38
Copy link
Author

cjlee38 commented Jun 5, 2024

Hello,

It has been quite some time since I submitted this PR, and I haven't received any response yet. I was wondering if support for reactor-kafka has ended, or if there might be another reason for the delay? I don't mean to rush, but I'm leaving this message because it has been a considerable amount of time without any reply.

Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant