loicgreffier commented on code in PR #16093: URL: https://github.com/apache/kafka/pull/16093#discussion_r1619416340
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java: ########## @@ -288,7 +301,31 @@ private <K, V> void forwardInternal(final ProcessorNode<K, V, ?, ?> child, final Record<K, V> record) { setCurrentNode(child); - child.process(record); + try { Review Comment: @cadonna Catching exceptions right here: https://github.com/apache/kafka/blob/0f0c9ecbf330923ad653cc2ff4fca6c4dced1cf7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L847 was our very first intention before we decided to move it to `ProcessorContextImpl#forwardInternal`. Unless we're wrong, we cannot get the precise node name where the exception occured at `StreamTask#doProcess` level. Could we: - catch exceptions at `StreamTask#doProcess` level - catch exceptions at `ProcessorContextImpl#forwardInternal` level, but rather than rethrowing a `StreamsException` (https://github.com/loicgreffier/kafka/blob/960c2a3153b30e48963387b5756b0310275bf48b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L320), it can throw a `new ChildNodeProcessingException(child.name, e)` caught and handled at StreamTask#doProcess level. `ChildNodeProcessingException` would be a new exception acting as a wrapper of the root cause, that additionally provides the name of the child node in which an exception occurs... letting the first node know about something happened in one of its children. Hope it is clear, if it sounds like a plan to you, we can give a try to this impl -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org