[
https://issues.apache.org/jira/browse/KAFKA-7110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax updated KAFKA-7110:
-----------------------------------
Labels: kip (was: )
> Windowed changelog keys not deserialized properly by TimeWindowedSerde
> ----------------------------------------------------------------------
>
> Key: KAFKA-7110
> URL: https://issues.apache.org/jira/browse/KAFKA-7110
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Shawn Nguyen
> Assignee: Shawn Nguyen
> Priority: Major
> Labels: kip
>
> Currently the TimeWindowedSerde does not deserialize the windowed keys from a
> changelog topic properly. There are a few assumptions made in the
> TimeWindowedDeserializer that prevents the changelog windowed keys from being
> correctly deserialized.
> 1) In the from method of WindowKeySchema (called in deserialize in
> TimeWindowedDeserializer), we extract the window from the binary key, but we
> call getLong(binaryKey.length -TIMESTAMP_SIZE). However, the changelog for
> ChangeLoggingWindowBytesStore will log the windowed key as:
>
> {noformat}
> changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, timestamp,
> maybeUpdateSeqnumForDups()), value);
> {noformat}
>
> In toStoreKeyBinary, we store the key in
> {noformat}
> final ByteBuffer buf = ByteBuffer.allocate(serializedKey.length +
> TIMESTAMP_SIZE + SEQNUM_SIZE);
> {noformat}
> with the seqnum (used for de-duping). So the eventual result is that when we
> deserialize, we do not assume the windowed changelog key has a seq_num, and
> the window extracted will be gibberish values since the bytes extracted won't
> be alligned.
> The fix here is to introduce a new Serde in WindowSerdes that will handle
> explicitly, windowed changelog input topic.
>
> 2) In the constructor of TimeWindowedDeserializer, the windowSize is fixed to
> Long.MAX_VALUE:
>
> {noformat}
> // TODO: fix this part as last bits of KAFKA-4468 public
> TimeWindowedDeserializer(final Deserializer<T> inner) { this(inner,
> Long.MAX_VALUE); }
> public TimeWindowedDeserializer(final Deserializer<T> inner, final long
> windowSize) { this.inner = inner; this.windowSize = windowSize; }
> {noformat}
> This will cause the end times to be giberrish when we extract the window
> since the windowSize is subtracted from the start time in:
>
> {noformat}
> public static <K> Windowed<K> from(final byte[] binaryKey, final long
> windowSize, final Deserializer<K> deserializer, final String topic) { final
> byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE];
> System.arraycopy(binaryKey, 0, bytes, 0, bytes.length); final K key =
> deserializer.deserialize(topic, bytes); final Window window =
> extractWindow(binaryKey, windowSize); return new Windowed<>(key, window); }
> private static Window extractWindow(final byte[] binaryKey, final long
> windowSize) { final ByteBuffer buffer = ByteBuffer.wrap(binaryKey); final
> long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE); return
> timeWindowForSize(start, windowSize); }
> {noformat}
> So in the new serde, we will make windowSize a constructor param that can be
> supplied.
> I've started a patch, and will prepare a PR for the fix for 1) and 2) above.
> Let me know if this sounds reasonable.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)