aliehsaeedii commented on code in PR #21639:
URL: https://github.com/apache/kafka/pull/21639#discussion_r2889406412
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java:
##########
@@ -44,6 +44,21 @@ public class
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
super(inner);
}
+ @Override
+ public byte[] delete(final Bytes key) {
+ final byte[] oldValue = wrapped().delete(key);
+ log(key,
+ null,
+ oldValue == null
+ ? internalContext.recordContext().timestamp()
+ : timestamp(oldValue),
+ oldValue == null
+ ? internalContext.recordContext().headers()
+ : headers(oldValue)
+ );
+ return oldValue;
+ }
+
Review Comment:
What about `put(key, null`)? We need to mess with internalProcessorContext
to make the put(key, null) case work. I think custom headers is not existent by
default in `internalContext.recordContext().headers()`
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreWithHeaders.java:
##########
@@ -41,6 +42,40 @@ class ChangeLoggingTimestampedWindowBytesStoreWithHeaders
extends ChangeLoggingW
super(bytesStore, retainDuplicates, WindowKeySchema::toStoreKeyBinary);
}
+ @Override
+ public void put(final Bytes key,
+ final byte[] valueTimestampHeaders,
+ final long windowStartTimestamp) {
+ byte[] oldValueTimestampHeaders = null;
+ if (valueTimestampHeaders == null) {
+ // Deletion: fetch old value to preserve its headers in the
tombstone
+ try (final WindowStoreIterator<byte[]> iter = wrapped().fetch(key,
windowStartTimestamp, windowStartTimestamp)) {
Review Comment:
Is nt it adding cost to the `put`?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java:
##########
@@ -43,13 +43,20 @@ public class ChangeLoggingSessionBytesStoreWithHeaders
@Override
public void remove(final Windowed<Bytes> sessionKey) {
+ final byte[] oldAggregationWithHeaders = wrapped().fetchSession(
+ sessionKey.key(),
+ sessionKey.window().start(),
+ sessionKey.window().end()
+ );
wrapped().remove(sessionKey);
internalContext.logChange(
name(),
SessionKeySchema.toBinary(sessionKey),
null,
internalContext.recordContext().timestamp(),
- internalContext.recordContext().headers(),
+ oldAggregationWithHeaders == null
+ ? internalContext.recordContext().headers()
Review Comment:
Does `internalContext.recordContext().headers()` contain custom headers
content?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]