cadonna commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1618417037


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java:
##########
@@ -232,12 +236,14 @@ private void updateHead() {
             }
             headRecord = new StampedRecord(deserialized, timestamp);
             headRecordSizeInBytes = consumerRecordSizeInBytes(raw);
+            rawHeadRecord = raw;

Review Comment:
   Did you consider putting the raw record into `StampedRecord`? If you do not 
need to add any method to get the raw record to `PartitionGroup`, 
`AbstractPartitionGroup`, and `SynchronizedPartitionGroup`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -764,6 +766,7 @@ public boolean process(final long wallClockTime) {
 
             // get the next record to process
             record = partitionGroup.nextRecord(recordInfo, wallClockTime);
+            rawRecord = partitionGroup.rawHeadRecord();

Review Comment:
   Also here, have you considered putting the raw record into 
`ProcessorRecordContext`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java:
##########
@@ -288,7 +301,31 @@ private <K, V> void forwardInternal(final ProcessorNode<K, 
V, ?, ?> child,
                                         final Record<K, V> record) {
         setCurrentNode(child);
 
-        child.process(record);
+        try {

Review Comment:
   I am not sure if this is the correct location to check for the exception and 
call the exception handler. What if the exception is thrown in the first 
processor of the topology. The first processor gets its input records from the 
stream task, not from the processor context.
   I think a good place would be `StreamTask#doprocess()`. Did you consider 
that?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to