mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2887445584


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java:
##########
@@ -109,16 +110,18 @@ private VOut computeValue(final KIn key, final VIn value) 
{
         return newValue;
     }
 
-    private ValueAndTimestamp<VOut> computeValueAndTimestamp(final KIn key, 
final ValueAndTimestamp<VIn> valueAndTimestamp) {
+    private ValueTimestampHeaders<VOut> computeValueAndTimestamp(final KIn 
key, final ValueTimestampHeaders<VIn> valueTimestampHeaders) {
         VOut newValue = null;
         long timestamp = 0;
+        Headers headers = null;
 
-        if (valueAndTimestamp != null) {
-            newValue = mapper.apply(key, valueAndTimestamp.value());
-            timestamp = valueAndTimestamp.timestamp();
+        if (valueTimestampHeaders != null) {
+            newValue = mapper.apply(key, valueTimestampHeaders.value());
+            timestamp = valueTimestampHeaders.timestamp();
+            headers = valueTimestampHeaders.headers();
         }
 
-        return ValueAndTimestamp.make(newValue, timestamp);
+        return ValueTimestampHeaders.make(newValue, timestamp, headers);

Review Comment:
   I think passing `headers` here is fine, as it's stateless map() operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to