[ 
https://issues.apache.org/jira/browse/KAFKA-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524168#comment-15524168
 ] 

Guozhang Wang commented on KAFKA-4120:
--------------------------------------

[~elevy] {{o.a.k.common.utils.Bytes}} is added in trunk after the 0.10.0.1 
release and hence the corresponding Javadoc will only be available after the 
0.10.1.0 release.

[~gfodor] As part of the KIP-63, we have actually pursued the second option to 
make all the LRU caches storing bytes, this arguably will be less efficient due 
to ser / deser, but we believe it is beneficial for memory management in Kafka 
Streams in the long run. More discussions can be found here:

https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+Management+in+Kafka+Streams

> byte[] keys in RocksDB state stores do not work as expected
> -----------------------------------------------------------
>
>                 Key: KAFKA-4120
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4120
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.0.1
>            Reporter: Greg Fodor
>            Assignee: Guozhang Wang
>
> We ran into an issue using a byte[] key in a RocksDB state store (with the 
> byte array serde.) Internally, the RocksDB store keeps a LRUCache that is 
> backed by a LinkedHashMap that sits between the callers and the actual db. 
> The problem is that while the underlying rocks db will persist byte arrays 
> with equal data as equivalent keys, the LinkedHashMap uses byte[] reference 
> equality from Object.equals/hashcode. So, this can result in multiple entries 
> in the cache for two different byte arrays that have the same contents and 
> are backed by the same key in the db, resulting in unexpected behavior. 
> One such behavior that manifests from this is if you store a value in the 
> state store with a specific key, if you re-read that key with the same byte 
> array you will get the new value, but if you re-read that key with a 
> different byte array with the same bytes, you will get a stale value until 
> the db is flushed. (This made it particularly tricky to track down what was 
> happening :))
> The workaround for us is to convert the keys from raw byte arrays to a 
> deserialized avro structure that provides proper hashcode/equals semantics 
> for the intermediate cache. In general this seems like good practice, so one 
> of the proposed solutions is to simply emit a warning or exception if a key 
> type with breaking semantics like this is provided.
> A few proposed solutions:
> - When the state store is defined on array keys, ensure that the cache map 
> does proper comparisons on array values not array references. This would fix 
> this problem, but seems a bit strange to special case. However, I have a hard 
> time of thinking of other examples where this behavior would burn users.
> - Change the LRU cache to deserialize and serialize all keys to bytes and use 
> a value based comparison for the map. This would be the most correct, as it 
> would ensure that both the rocks db and the cache have identical key spaces 
> and equality/hashing semantics. However, this is probably slow, and since the 
> general case of using avro record types as keys works fine, it will largely 
> be unnecessary overhead.
> - Don't change anything about the behavior, but trigger a warning in the log 
> or fail to start if a state store is defined on array keys (or possibly any 
> key type that fails to properly override Object.equals/hashcode.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to