loicgreffier commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1619416340


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java:
##########
@@ -288,7 +301,31 @@ private <K, V> void forwardInternal(final ProcessorNode<K, 
V, ?, ?> child,
                                         final Record<K, V> record) {
         setCurrentNode(child);
 
-        child.process(record);
+        try {

Review Comment:
   @cadonna Catching exceptions right here: 
https://github.com/apache/kafka/blob/0f0c9ecbf330923ad653cc2ff4fca6c4dced1cf7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L847
 was our very first intention before we decided to move it to 
`ProcessorContextImpl#forwardInternal`.
   
   Unless we're wrong, we cannot get the precise node name where the exception 
occurred at `StreamTask#doProcess` level.
   
   Could we:
   - catch exceptions at `StreamTask#doProcess` level
   - catch exceptions at `ProcessorContextImpl#forwardInternal` level, but 
rather than rethrowing a `StreamsException` 
(https://github.com/loicgreffier/kafka/blob/960c2a3153b30e48963387b5756b0310275bf48b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L320),
 it can throw a `new ChildNodeProcessingException(child.name, e)` caught and 
handled at StreamTask#doProcess level. 
   
   `ChildNodeProcessingException` would be a new exception acting as a wrapper 
of the root cause, that additionally provides the name of the child node in 
which an exception occurs... letting the first node know about something 
happened in one of its children.
   
   Hope it is clear, if it sounds like a plan to you, we can give a try to this 
impl
   
   _EDIT_:
   
   We might have to call `processingExceptionHandler#handle` in 
`ProcessorContextImpl#forwardInternal` to know if processing should FAIL or 
CONTINUE at this level. Otherwise, a `ChildNodeProcessingException` will be 
thrown in all cases, FAIL or CONTINUE will be computed in 
`StreamTask#doProcess`, but in case of CONTINUE, how to resume the processing 
(maybe possible if ChildNodeProcessingException provides the failed child node 
name, and the current state of the record)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to