[ 
https://issues.apache.org/jira/browse/KAFKA-18326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-18326.
--------------------------------------------
    Fix Version/s: 4.0.0
                   3.9.1
                   3.8.2
       Resolution: Fixed

> Cached stores may return deleted values
> ---------------------------------------
>
>                 Key: KAFKA-18326
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18326
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Almog Gavra
>            Assignee: Almog Gavra
>            Priority: Critical
>             Fix For: 4.0.0, 3.9.1, 3.8.2
>
>         Attachments: range-scan-fix.patch
>
>
> Reported in community Slack by Stanislav Savulchik. I've attached a patch 
> fix, and am waiting for the reporter to submit a PR if they want to since 
> they were the first to identify it!
> It affects basically every version of Kafka Streams out there...
> -----
> Hi everyone,
> I’m investigating an unexpected behavior of a KeyValueStore.prefixScan  
> method that sometimes returns previously deleted keys if caching is enabled. 
> Example pseudocode:
> {code:java}
>  val keyPrefixSerializer: Serializer[Int] = ??? // 4 bytes big endian
> val store: KeyValueStore[(Int, String), String] = ???
> // store contents
> // (1, "A") -> "A"
> // (1, "B") -> "B"
> // using put instead of delete to avoid reading previous value
> store.put((1, "B"), null)
> // reading all key value pairs using key prefix
> val result: List[KeyValue[(Int, String), String]] = 
>     store.prefixScan(1, keyPrefixSerializer).asScala.toList
> // expected result 
> // (1, "A") -> "A"
> // actual result 
> // (1, "A") -> "A"
> // (1, "B") -> "B" (was previously deleted, but returned by the 
> iterator){code}
> I tried to come up with a unit test for 
> MergedSortedCacheKeyValueBytesStoreIterator (returned by 
> KeyValueStore.prefixScan and other methods like range, all) in order to 
> reproduce the behavior. And it also showed that the iterator returns more 
> items than expected if I delete a larger key:
> {code:java}
>  @Test
>     public void shouldSkipAllDeletedFromCache1() {
>         final byte[][] bytes = {{0}, {1}};
>         for (final byte[] aByte : bytes) {
>             store.put(Bytes.wrap(aByte), aByte);
>         }
>         cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null)); 
> // simulate key deletion from store that is cached
>         try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = 
> createIterator()) {
>             assertArrayEquals(bytes[0], iterator.next().key.get());
>             assertFalse(iterator.hasNext()); // 
> org.opentest4j.AssertionFailedError: expected: <false> but was: <true>
>         }
>     }{code}
>   
> But if I delete a smaller key the test is successful:
> {code:java}
>  @Test
>     public void shouldSkipAllDeletedFromCache0() {
>         final byte[][] bytes = {{0}, {1}};
>         for (final byte[] aByte : bytes) {
>             store.put(Bytes.wrap(aByte), aByte);
>         }
>         cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
>         try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = 
> createIterator()) {
>             assertArrayEquals(bytes[1], iterator.next().key.get());
>             assertFalse(iterator.hasNext());
>         }
>     }    
> Could someone help me verify if it is a bug or am I missing something?
> Thank you.{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to