loicgreffier commented on code in PR #16093: URL: https://github.com/apache/kafka/pull/16093#discussion_r1617692897
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java: ########## @@ -288,7 +305,38 @@ private <K, V> void forwardInternal(final ProcessorNode<K, V, ?, ?> child, final Record<K, V> record) { setCurrentNode(child); - child.process(record); + try { + child.process(record); + } catch (final StreamsException e) { + // exception received from child processor, just rethrow + throw e; + } catch (final Exception e) { + final byte[] rawKey = streamTask.rawRecord() != null ? streamTask.rawRecord().key() : null; + final byte[] rawValue = streamTask.rawRecord() != null ? streamTask.rawRecord().value() : null; + + final ErrorHandlerContext errorHandlerContext = new ErrorHandlerContextImpl(topic(), + partition(), offset(), headers(), rawKey, rawValue, + child.name(), taskId()); + final ProcessingExceptionHandler.ProcessingHandlerResponse response = processingExceptionHandler + .handle(errorHandlerContext, record, e); + + if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { + throw new StreamsException("Processing exception handler is set to fail upon" + + " a processing error. If you would rather have the streaming pipeline" + + " continue after a processing error, please set the " + + PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", + e); + } else { + log.warn( + "Skipping record due to processing error. topic=[{}] partition=[{}] offset=[{}]", + topic(), + partition(), + offset(), + e + ); Review Comment: True, this log is redundant with any default `ProcessingExceptionHandler` implementations. I removed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org