[ 
https://issues.apache.org/jira/browse/KAFKA-7110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16526643#comment-16526643
 ] 

Shawn Nguyen commented on KAFKA-7110:
-------------------------------------

Thanks for the quick response [~guozhang].

1) I agree, serialization should actually be toBinary since we're serializing 
into a topic. Since no such method signature exists, I can extend the 
WindowKeySchema to have a toBinary call with the additional seqnum parameter, 
along with a fromBinary method for the deserialization w/ seqnum. That way we 
can reuse the existing TimeWindowedSerde for writing and reading windowed 
topics (and windowed changelog topics).

2) In the PR attached to this ticket, I had an overloaded constructor with the 
window size. I don't have a clean solution to passing in the window size, but 
for the use case of ingesting the time windowed ktable (whose input is a 
changelog topic) described here - [https://github.com/apache/kafka/pull/5044,] 
we could just set the window size in the serde when the windowConsumed() method 
is called in the timeWindowedKTable api.

Something like:
{noformat}
public synchronized <K, V> KTable<Windowed<K>, V> timeWindowedKTable(final 
String topic, final Consumed<K, V> consumed, final Materialized<K, V, 
WindowStore<Bytes, byte[]>> materialized, final TimeWindows timeWindow) {
Consumed<Windowed<K>, V> windowedConsumed = 
consumed.windowedChangelogConsumed(timeWindow.size()); 
ConsumedInternal<Windowed<K>, V> consumedInternal = new 
ConsumedInternal<>(windowedConsumed);
...
}
{noformat}
I considered this approach since this is the only use case for a client to 
ingest a changelog topic for now. However, this way is very specific to the 
window ktable api. 

> Windowed changelog keys not deserialized properly by TimeWindowedSerde
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-7110
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7110
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Shawn Nguyen
>            Priority: Major
>
> Currently the TimeWindowedSerde does not deserialize the windowed keys from a 
> changelog topic properly. There are a few assumptions made in the 
> TimeWindowedDeserializer that prevents the changelog windowed keys from being 
> correctly deserialized.
> 1) In the from method of WindowKeySchema (called in deserialize in 
> TimeWindowedDeserializer), we extract the window from the binary key, but we 
> call getLong(binaryKey.length -TIMESTAMP_SIZE). However, the changelog for 
> ChangeLoggingWindowBytesStore will log the windowed key as:
>  
> {noformat}
> changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, timestamp, 
> maybeUpdateSeqnumForDups()), value);
> {noformat}
>  
> In toStoreKeyBinary, we store the key in 
> {noformat}
> final ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + 
> TIMESTAMP_SIZE + SEQNUM_SIZE);
> {noformat}
> with the seqnum (used for de-duping). So the eventual result is that when we 
> deserialize, we do not assume the windowed changelog key has a seq_num, and 
> the window extracted will be gibberish values since the bytes extracted won't 
> be alligned.
> The fix here is to introduce a new Serde in WindowSerdes that will handle 
> explicitly, windowed changelog input topic. 
>  
> 2) In the constructor of TimeWindowedDeserializer, the windowSize is fixed to 
> Long.MAX_VALUE:
>  
> {noformat}
> // TODO: fix this part as last bits of KAFKA-4468 public 
> TimeWindowedDeserializer(final Deserializer<T> inner) { this(inner, 
> Long.MAX_VALUE); } 
> public TimeWindowedDeserializer(final Deserializer<T> inner, final long 
> windowSize) { this.inner = inner; this.windowSize = windowSize; }
> {noformat}
> This will cause the end times to be giberrish when we extract the window 
> since the windowSize is subtracted from the start time in:
>  
> {noformat}
> public static <K> Windowed<K> from(final byte[] binaryKey, final long 
> windowSize, final Deserializer<K> deserializer, final String topic) { final 
> byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE]; 
> System.arraycopy(binaryKey, 0, bytes, 0, bytes.length); final K key = 
> deserializer.deserialize(topic, bytes); final Window window = 
> extractWindow(binaryKey, windowSize); return new Windowed<>(key, window); } 
> private static Window extractWindow(final byte[] binaryKey, final long 
> windowSize) { final ByteBuffer buffer = ByteBuffer.wrap(binaryKey); final 
> long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE); return 
> timeWindowForSize(start, windowSize); }
> {noformat}
> So in the new serde, we will make windowSize a constructor param that can be 
> supplied.
> I've started a patch, and will prepare a PR for the fix for 1) and 2) above. 
> Let me know if this sounds reasonable. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to