cadonna commented on code in PR #16093: URL: https://github.com/apache/kafka/pull/16093#discussion_r1618417037
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java: ########## @@ -232,12 +236,14 @@ private void updateHead() { } headRecord = new StampedRecord(deserialized, timestamp); headRecordSizeInBytes = consumerRecordSizeInBytes(raw); + rawHeadRecord = raw; Review Comment: Did you consider putting the raw record into `StampedRecord`? If you do not need to add any method to get the raw record to `PartitionGroup`, `AbstractPartitionGroup`, and `SynchronizedPartitionGroup`. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ########## @@ -764,6 +766,7 @@ public boolean process(final long wallClockTime) { // get the next record to process record = partitionGroup.nextRecord(recordInfo, wallClockTime); + rawRecord = partitionGroup.rawHeadRecord(); Review Comment: Also here, have you considered putting the raw record into `ProcessorRecordContext`? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java: ########## @@ -288,7 +301,31 @@ private <K, V> void forwardInternal(final ProcessorNode<K, V, ?, ?> child, final Record<K, V> record) { setCurrentNode(child); - child.process(record); + try { Review Comment: I am not sure if this is the correct location to check for the exception and call the exception handler. What if the exception is thrown in the first processor of the topology. The first processor gets its input records from the stream task, not from the processor context. I think a good place would be `StreamTask#doprocess()`. Did you consider that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org