aliehsaeedii commented on code in PR #21639:
URL: https://github.com/apache/kafka/pull/21639#discussion_r2889406412


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java:
##########
@@ -44,6 +44,21 @@ public class 
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
         super(inner);
     }
 
+    @Override
+    public byte[] delete(final Bytes key) {
+        final byte[] oldValue = wrapped().delete(key);
+        log(key,
+            null,
+            oldValue == null
+                ? internalContext.recordContext().timestamp()
+                : timestamp(oldValue),
+            oldValue == null
+                ? internalContext.recordContext().headers()
+                : headers(oldValue)
+        );
+        return oldValue;
+    }
+

Review Comment:
   What about `put(key, null`)? We need to mess with internalProcessorContext 
to make the put(key, null) case work. I think custom headers is not existent by 
default in `internalContext.recordContext().headers()`



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreWithHeaders.java:
##########
@@ -41,6 +42,40 @@ class ChangeLoggingTimestampedWindowBytesStoreWithHeaders 
extends ChangeLoggingW
         super(bytesStore, retainDuplicates, WindowKeySchema::toStoreKeyBinary);
     }
 
+    @Override
+    public void put(final Bytes key,
+                    final byte[] valueTimestampHeaders,
+                    final long windowStartTimestamp) {
+        byte[] oldValueTimestampHeaders = null;
+        if (valueTimestampHeaders == null) {
+            // Deletion: fetch old value to preserve its headers in the 
tombstone
+            try (final WindowStoreIterator<byte[]> iter = wrapped().fetch(key, 
windowStartTimestamp, windowStartTimestamp)) {

Review Comment:
   Is nt it adding cost to the `put`?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java:
##########
@@ -43,13 +43,20 @@ public class ChangeLoggingSessionBytesStoreWithHeaders
 
     @Override
     public void remove(final Windowed<Bytes> sessionKey) {
+        final byte[] oldAggregationWithHeaders = wrapped().fetchSession(
+            sessionKey.key(),
+            sessionKey.window().start(),
+            sessionKey.window().end()
+        );
         wrapped().remove(sessionKey);
         internalContext.logChange(
             name(),
             SessionKeySchema.toBinary(sessionKey),
             null,
             internalContext.recordContext().timestamp(),
-            internalContext.recordContext().headers(),
+            oldAggregationWithHeaders == null
+                ? internalContext.recordContext().headers()

Review Comment:
   Does `internalContext.recordContext().headers()` contain custom headers 
content?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to