[ 
https://issues.apache.org/jira/browse/KAFKA-7110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16525782#comment-16525782
 ] 

Guozhang Wang edited comment on KAFKA-7110 at 6/28/18 12:59 AM:
----------------------------------------------------------------

Hi [~shnguyen], thanks for reporting this issue.

For 1), when serializing into the Kafka topic, we should call 
`WindowKeySchema#toBinary` not `WindowKeySchema#toStoreKeyBinary`. As the 
comment indicates, the former is for serializing into the Kafka topic, while 
the latter is for serializing for the underlying persistent store. I think 
you've spotted a bug that we should call

{code}
changeLogger.logChange(WindowKeySchema.toBinary(key, timestamp, 
maybeUpdateSeqnumForDups()), value);
{code}

instead. With this we would not need to have a new serde.

2) This is a known issue described in KAFKA-4468. I'd like to see what's your 
proposed solution on it, are you trying to add an overloaded constructor for 
{{TimeWindowedSerde}}?

Note that we already have the constructor in {{TimeWindowedDeserializer}} that 
takes a window size, the tricky part is, though, for users to read from the 
changelog topic directly, she still needs to pass that window size to the 
constructor via the serde. Maybe we can pass in it as a property key-value pair 
and let the `configure` call to set it, but that would make the window size 
parameter to not be a final variable.


was (Author: guozhang):
Hi [~shnguyen], thanks for reporting this issue.

For 1), when serializing into the Kafka topic, we should call 
`WindowKeySchema#toBinary` not `WindowKeySchema#toStoreKeyBinary`. As the 
comment indicates, the former is for serializing into the Kafka topic, while 
the latter is for serializing for the underlying persistent store. I think 
you've spotted a bug that we should call

{code}
changeLogger.logChange(WindowKeySchema.toBinary(key, timestamp, 
maybeUpdateSeqnumForDups()), value);
{code}

instead. With this we would not need to have a new serde.

2) This is a known issue described in KAFKA-4468. I'd like to see what's your 
proposed solution on it, are you trying to add an overloaded constructor for 
{{TimeWindowedSerde}}?

> Windowed changelog keys not deserialized properly by TimeWindowedSerde
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-7110
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7110
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Shawn Nguyen
>            Priority: Major
>
> Currently the TimeWindowedSerde does not deserialize the windowed keys from a 
> changelog topic properly. There are a few assumptions made in the 
> TimeWindowedDeserializer that prevents the changelog windowed keys from being 
> correctly deserialized.
> 1) In the from method of WindowKeySchema (called in deserialize in 
> TimeWindowedDeserializer), we extract the window from the binary key, but we 
> call getLong(binaryKey.length -TIMESTAMP_SIZE). However, the changelog for 
> ChangeLoggingWindowBytesStore will log the windowed key as:
>  
> {noformat}
> changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, timestamp, 
> maybeUpdateSeqnumForDups()), value);
> {noformat}
>  
> In toStoreKeyBinary, we store the key in 
> {noformat}
> final ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + 
> TIMESTAMP_SIZE + SEQNUM_SIZE);
> {noformat}
> with the seqnum (used for de-duping). So the eventual result is that when we 
> deserialize, we do not assume the windowed changelog key has a seq_num, and 
> the window extracted will be gibberish values since the bytes extracted won't 
> be alligned.
> The fix here is to introduce a new Serde in WindowSerdes that will handle 
> explicitly, windowed changelog input topic. 
>  
> 2) In the constructor of TimeWindowedDeserializer, the windowSize is fixed to 
> Long.MAX_VALUE:
>  
> {noformat}
> // TODO: fix this part as last bits of KAFKA-4468 public 
> TimeWindowedDeserializer(final Deserializer<T> inner) { this(inner, 
> Long.MAX_VALUE); } 
> public TimeWindowedDeserializer(final Deserializer<T> inner, final long 
> windowSize) { this.inner = inner; this.windowSize = windowSize; }
> {noformat}
> This will cause the end times to be giberrish when we extract the window 
> since the windowSize is subtracted from the start time in:
>  
> {noformat}
> public static <K> Windowed<K> from(final byte[] binaryKey, final long 
> windowSize, final Deserializer<K> deserializer, final String topic) { final 
> byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE]; 
> System.arraycopy(binaryKey, 0, bytes, 0, bytes.length); final K key = 
> deserializer.deserialize(topic, bytes); final Window window = 
> extractWindow(binaryKey, windowSize); return new Windowed<>(key, window); } 
> private static Window extractWindow(final byte[] binaryKey, final long 
> windowSize) { final ByteBuffer buffer = ByteBuffer.wrap(binaryKey); final 
> long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE); return 
> timeWindowForSize(start, windowSize); }
> {noformat}
> So in the new serde, we will make windowSize a constructor param that can be 
> supplied.
> I've started a patch, and will prepare a PR for the fix for 1) and 2) above. 
> Let me know if this sounds reasonable. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to