loicgreffier commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1619416340


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java:
##########
@@ -288,7 +301,31 @@ private <K, V> void forwardInternal(final ProcessorNode<K, 
V, ?, ?> child,
                                         final Record<K, V> record) {
         setCurrentNode(child);
 
-        child.process(record);
+        try {

Review Comment:
   @cadonna Catching exceptions right here: 
https://github.com/apache/kafka/blob/0f0c9ecbf330923ad653cc2ff4fca6c4dced1cf7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L847
 was our very first intention before we decided to move it to 
`ProcessorContextImpl#forwardInternal`.
   
   Unless we're wrong, we cannot get the precise node name where the exception 
occurred at `StreamTask#doProcess` level.
   
   Could we:
   - catch exceptions at `StreamTask#doProcess` level
   - catch exceptions at `ProcessorContextImpl#forwardInternal` level, but 
rather than rethrowing a `StreamsException` 
(https://github.com/loicgreffier/kafka/blob/960c2a3153b30e48963387b5756b0310275bf48b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L320),
 it can throw a `new ChildNodeProcessingException(child.name, e)` caught and 
handled at StreamTask#doProcess level. 
   
   `ChildNodeProcessingException` would be a new exception acting as a wrapper 
of the root cause, that additionally provides the name of the child node in 
which an exception occurs... letting the first node know about something 
happened in one of its children.
   
   Hope it is clear, if it sounds like a plan to you, we can give a try to this 
impl



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to