Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: #242 Explicit terminal and retry exceptions for cleaner logging and poison pills #291

Closed
wants to merge 23 commits into from

Conversation

astubbs
Copy link
Contributor

@astubbs astubbs commented May 6, 2022

  • convert output of some tests to use the new exception and check logging output is as expected
  • docs, including batch partial retry
  • tests
  • partial batch support - seperate PR
  • max retries handling
  • rewrite docs section: 11.2. Skipping Records
  • retry dlc PR

Related:

Supersedes:

Related branches:

  • retry-dlq

@astubbs astubbs linked an issue May 6, 2022 that may be closed by this pull request
2 tasks
@astubbs astubbs requested a review from rkolesnev May 12, 2022 14:57
@astubbs
Copy link
Contributor Author

astubbs commented May 12, 2022

#268 ?

Copy link
Member

@ifeeneyconfluent ifeeneyconfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I'm happy to approve.

@astubbs astubbs changed the title feature: #242 Explicit retry exception for cleaner logging feature: #242 Explicit terminal and retriable exceptions for cleaner logging and poison pills May 13, 2022
log.error("Exception caught in user function running stage, registering WC as failed, returning to mailbox", e);
String msg = "Exception caught in user function running stage, registering WC as failed, returning to mailbox";
if (e instanceof RetriableException) {
log.debug("Explicit " + RetriableException.class.getSimpleName() + " caught, logging at DEBUG only. " + msg, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.debug("Explicit " + RetriableException.class.getSimpleName() + " caught, logging at DEBUG only. " + msg, e);
log.debug("Explicit {} caught - {}", RetriableException.class.getSimpleName(), msg, e);

ParallelConsumerOptions.class.getSimpleName(),
context);

closeDontDrainFirst();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it shutdown gracefully in this case? i.e. commit all that succeeded by this point, maybe should give some time for inflight processes as well (if it doesnt already) ? - to reduce possible duplicates on restart / rebalance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it is graceful - closeDontDrainFirst will commit everything that's done first - drain isn't graceful, it means process everything in buffers.
Inflight - good point, not 100% will need to double check.

Comment on lines 1163 to 1170
} catch (Exception e) {
log.error("Unknown internal error handling user function dispatch, terminating");

closeDontDrainFirst();

// throw again to make the future failed
throw e;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is current behaviour for user function exceptions ? is it a breaking change in behaviour?
I think i am missing the flow here - wouldn't this just kill PC and not retry the message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current = retry :)

wouldn't this just kill PC and not retry the message?

yeah there was a bug - check latest version?

@astubbs astubbs marked this pull request as draft May 14, 2022 08:01
@astubbs astubbs requested a review from rkolesnev May 14, 2022 08:01
@astubbs astubbs changed the title feature: #242 Explicit terminal and retriable exceptions for cleaner logging and poison pills feature: #242 Explicit terminal and retry exceptions for cleaner logging and poison pills May 16, 2022
@astubbs
Copy link
Contributor Author

astubbs commented May 17, 2022

Consider splitting the partial batch failure support into a seperate task.

…ption

# Conflicts:
#	parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
#	parallel-consumer-core/src/test/resources/logback-test.xml
Copy link
Contributor Author

@astubbs astubbs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...

*/

/**
* todo
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs

/**
* @see UserFunctionRunner
*/
class UserFunctionRunnerTest {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finish test

@@ -27,8 +27,8 @@
</root>

<!-- primary -->
<logger name="io.confluent.parallelconsumer" level="info"/>
<!-- <logger name="io.confluent.parallelconsumer" level="debug"/>-->
<logger name="io.confluent.parallelconsumer" level="debug"/>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reset

Comment on lines +7 to +16
/**
* A user's processing function can throw this exception, which signals to PC that processing of the message has failed,
* and that it should be retired at a later time.
* <p>
* The advantage of throwing this exception explicitly, is that PC will not log an ERROR. If any other type of exception
* is thrown by the user's function, that will be logged as an error (but will still be retried later).
* <p>
* So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be
* logged as an error.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update java doc


@AllArgsConstructor
@Slf4j
public class UserFunctionRunner<K, V> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic seems ok.
Not really sure about the new class and abstraction / concern separation - seems most of handling is calling back to PC - so the class seems like a utility / mixin type of class.
Not sure how it could be refactored better but now it seems adding more spaghetti-ness then making things cleaner - flow became PC-> UserFunctionRunner.runUserFunction -> PC with backward reference - that feels like a smell...
Would messaging / handling of execution result be part of the message based eventing / shared nothing refactor?
Alternatively could the success / failure notifications be moved back to PC and passed as callbacks?
Basically i think it would benefit from a bit of mapping of flows and rethinking of cleaner architecture / class split.

Comment on lines +57 to +67
} catch (PCUserException e) {
// throw again to make the future failed
throw e;
} catch (Exception e) {
log.error("Unknown internal error handling user function dispatch, terminating");

pc.closeDontDrainFirst();

// throw again to make the future failed
throw e;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not too keen on this - you are still killing PC on general Exception thrown from userFunction - effectively making default behaviour for non-specified exceptions a worst case one ( Terminal with Shutdown behaviour).
Did you mean to re-throw Exception as PCUserException on line 88?
Do you need a general Exception catch here at all - only other place that could throw is
Line 47 boolean workIsStale = pc.getWm().checkIfWorkIsStale(workContainerBatch);
could it handle exception inside that call instead - to leave here handling of userFunction call only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be optional behaviour - I thought I had implemented that in this PR, but might be somewhere else. Either way, it should do like KS does - and have this be configurable.
I'll also probably copy their default behaviour.

/**
* Run the supplied function.
*/
protected <R> List<Tuple<ConsumerRecord<K, V>, R>> runUserFunction(Function<PollContextInternal<K, V>, List<R>> usersFunction,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be better to change it to be a bit more readable?
ie. this would become something along the lines of

execute(){
     try{
        doDebugLogging(...);
        skipOnStaleWork(...);
                 
        PollContextInternal<K, V> context = new PollContextInternal<>(workContainerBatch);
        runWithUserExceptions(...)
    }catch(...){
     ...
    }
}

If exception from stale work check is specific or handled in that call itself - can even get rid of inner runWithUserExceptions method and pull exception handling up here. esp given its the main purpose of this class...

@eddyv
Copy link
Member

eddyv commented Jun 15, 2023

Closing - Stale.

@eddyv eddyv closed this Jun 15, 2023
@nizarsalhaji94
Copy link

Hi @astubbs

Is a released date is planned ?
I’m interested to use SHUTDOWN Terminal Failure Reatction.
In fact, we need to shutdown (or stop or kill) the PC after max retry.

Thanks

@astubbs
Copy link
Contributor Author

astubbs commented Dec 21, 2023

@nizarsalhaji94 hi sorry I am no longer involved with this project since February 2023. Try asking @eddyv

@nizarsalhaji94
Copy link

Hi @eddyv

Is a released date is planned ?
I’m interested to use SHUTDOWN Terminal Failure Reatction.
In fact, we need to shutdown (or stop or kill) the PC after max retry.

Thanks

@rkolesnev
Copy link
Member

@nizarsalhaji94
We are planning to implement this feature at some point, but no concrete dates / roadmap for this yet.
As a work around - i have suggested to do the stop/kill from user function approach - in this issue #718 (comment)

Just in general - i haven't seen your comment here until now - in the future - it would be more visible if you raise it as a question / issue with link to the PR if its relevant.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants