Almog Gavra created KAFKA-18326:
-----------------------------------

             Summary: Cached stores may return deleted values
                 Key: KAFKA-18326
                 URL: https://issues.apache.org/jira/browse/KAFKA-18326
             Project: Kafka
          Issue Type: Bug
          Components: streams
            Reporter: Almog Gavra
         Attachments: range-scan-fix.patch

Reported in community Slack by Stanislav Savulchik. I've attached a patch fix, 
and am waiting for the reporter to submit a PR if they want to since they were 
the first to identify it!

It affects basically every version of Kafka Streams out there...

-----

Hi everyone,
I’m investigating an unexpected behavior of a KeyValueStore.prefixScan  method 
that sometimes returns previously deleted keys if caching is enabled. Example 
pseudocode:
{code:java}
 val keyPrefixSerializer: Serializer[Int] = ??? // 4 bytes big endian
val store: KeyValueStore[(Int, String), String] = ???
// store contents
// (1, "A") -> "A"
// (1, "B") -> "B"
// using put instead of delete to avoid reading previous value
store.put((1, "B"), null)
// reading all key value pairs using key prefix
val result: List[KeyValue[(Int, String), String]] = 
    store.prefixScan(1, keyPrefixSerializer).asScala.toList
// expected result 
// (1, "A") -> "A"
// actual result 
// (1, "A") -> "A"
// (1, "B") -> "B" (was previously deleted, but returned by the iterator){code}

I tried to come up with a unit test for 
MergedSortedCacheKeyValueBytesStoreIterator (returned by 
KeyValueStore.prefixScan and other methods like range, all) in order to 
reproduce the behavior. And it also showed that the iterator returns more items 
than expected if I delete a larger key:
{code:java}
 @Test
    public void shouldSkipAllDeletedFromCache1() {
        final byte[][] bytes = {{0}, {1}};
        for (final byte[] aByte : bytes) {
            store.put(Bytes.wrap(aByte), aByte);
        }
        cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null)); // 
simulate key deletion from store that is cached
        try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = 
createIterator()) {
            assertArrayEquals(bytes[0], iterator.next().key.get());
            assertFalse(iterator.hasNext()); // 
org.opentest4j.AssertionFailedError: expected: <false> but was: <true>
        }
    }{code}
  
But if I delete a smaller key the test is successful:
{code:java}
 @Test
    public void shouldSkipAllDeletedFromCache0() {
        final byte[][] bytes = {{0}, {1}};
        for (final byte[] aByte : bytes) {
            store.put(Bytes.wrap(aByte), aByte);
        }
        cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
        try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = 
createIterator()) {
            assertArrayEquals(bytes[1], iterator.next().key.get());
            assertFalse(iterator.hasNext());
        }
    }    
Could someone help me verify if it is a bug or am I missing something?
Thank you.{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to