loicgreffier commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1617692897


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java:
##########
@@ -288,7 +305,38 @@ private <K, V> void forwardInternal(final ProcessorNode<K, 
V, ?, ?> child,
                                         final Record<K, V> record) {
         setCurrentNode(child);
 
-        child.process(record);
+        try {
+            child.process(record);
+        } catch (final StreamsException e) {
+            // exception received from child processor, just rethrow
+            throw e;
+        } catch (final Exception e) {
+            final byte[] rawKey = streamTask.rawRecord() != null ? 
streamTask.rawRecord().key() : null;
+            final byte[] rawValue = streamTask.rawRecord() != null ? 
streamTask.rawRecord().value() : null;
+
+            final ErrorHandlerContext errorHandlerContext = new 
ErrorHandlerContextImpl(topic(),
+                    partition(), offset(), headers(), rawKey, rawValue,
+                    child.name(), taskId());
+            final ProcessingExceptionHandler.ProcessingHandlerResponse 
response = processingExceptionHandler
+                    .handle(errorHandlerContext, record, e);
+
+            if (response == 
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
+                throw new StreamsException("Processing exception handler is 
set to fail upon" +
+                        " a processing error. If you would rather have the 
streaming pipeline" +
+                        " continue after a processing error, please set the " +
+                        PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " 
appropriately.",
+                        e);
+            } else {
+                log.warn(
+                    "Skipping record due to processing error. topic=[{}] 
partition=[{}] offset=[{}]",
+                    topic(),
+                    partition(),
+                    offset(),
+                    e
+                );

Review Comment:
   True, this log is redundant with any default `ProcessingExceptionHandler` 
implementations. I removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to