cadonna commented on code in PR #18739:
URL: https://github.com/apache/kafka/pull/18739#discussion_r1935239878


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -259,6 +259,10 @@ public <K, V> void send(final String topic,
 
         final ProducerRecord<byte[], byte[]> serializedRecord = new 
ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
 
+        // As many records could be in-flight,
+        // freeing raw records in the context to reduce memory pressure
+        freeContext(context);

Review Comment:
   The name of this method is a bit misleading. It basically frees the raw 
record within the context, not the whole context. What about calling it 
`freeRawInputRecordFromContext()`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java:
##########
@@ -55,8 +69,26 @@ public Headers headers() {
         return value.headers();
     }
 
+    public byte[] rawKey() {
+        return rawKey;
+    }
+
+    public byte[] rawValue() {
+        return rawValue;
+    }
+
     @Override
     public String toString() {
         return value.toString() + ", timestamp = " + timestamp;
     }
+
+    @Override
+    public boolean equals(final Object other) {
+        return super.equals(other);
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode();
+    }

Review Comment:
   Why are those needed?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java:
##########
@@ -48,6 +50,24 @@ public ProcessorRecordContext(final long timestamp,
         this.topic = topic;
         this.partition = partition;
         this.headers = Objects.requireNonNull(headers);
+        this.sourceRawKey = null;
+        this.sourceRawValue = null;

Review Comment:
   You also need to add these info to the `serialize()` and `deserialize()` so 
that the buffer values also get the source record. Here it gets a bit tricky, 
because you need to consider the case where a serialized record context does 
not contain the source record because it was written by a version of Streams 
that has not yet had the source record in the context.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to