[ https://issues.apache.org/jira/browse/KAFKA-18326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
A. Sophie Blee-Goldman resolved KAFKA-18326. -------------------------------------------- Fix Version/s: 4.0.0 3.9.1 3.8.2 Resolution: Fixed > Cached stores may return deleted values > --------------------------------------- > > Key: KAFKA-18326 > URL: https://issues.apache.org/jira/browse/KAFKA-18326 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Almog Gavra > Assignee: Almog Gavra > Priority: Critical > Fix For: 4.0.0, 3.9.1, 3.8.2 > > Attachments: range-scan-fix.patch > > > Reported in community Slack by Stanislav Savulchik. I've attached a patch > fix, and am waiting for the reporter to submit a PR if they want to since > they were the first to identify it! > It affects basically every version of Kafka Streams out there... > ----- > Hi everyone, > I’m investigating an unexpected behavior of a KeyValueStore.prefixScan > method that sometimes returns previously deleted keys if caching is enabled. > Example pseudocode: > {code:java} > val keyPrefixSerializer: Serializer[Int] = ??? // 4 bytes big endian > val store: KeyValueStore[(Int, String), String] = ??? > // store contents > // (1, "A") -> "A" > // (1, "B") -> "B" > // using put instead of delete to avoid reading previous value > store.put((1, "B"), null) > // reading all key value pairs using key prefix > val result: List[KeyValue[(Int, String), String]] = > store.prefixScan(1, keyPrefixSerializer).asScala.toList > // expected result > // (1, "A") -> "A" > // actual result > // (1, "A") -> "A" > // (1, "B") -> "B" (was previously deleted, but returned by the > iterator){code} > I tried to come up with a unit test for > MergedSortedCacheKeyValueBytesStoreIterator (returned by > KeyValueStore.prefixScan and other methods like range, all) in order to > reproduce the behavior. And it also showed that the iterator returns more > items than expected if I delete a larger key: > {code:java} > @Test > public void shouldSkipAllDeletedFromCache1() { > final byte[][] bytes = {{0}, {1}}; > for (final byte[] aByte : bytes) { > store.put(Bytes.wrap(aByte), aByte); > } > cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null)); > // simulate key deletion from store that is cached > try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = > createIterator()) { > assertArrayEquals(bytes[0], iterator.next().key.get()); > assertFalse(iterator.hasNext()); // > org.opentest4j.AssertionFailedError: expected: <false> but was: <true> > } > }{code} > > But if I delete a smaller key the test is successful: > {code:java} > @Test > public void shouldSkipAllDeletedFromCache0() { > final byte[][] bytes = {{0}, {1}}; > for (final byte[] aByte : bytes) { > store.put(Bytes.wrap(aByte), aByte); > } > cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null)); > try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = > createIterator()) { > assertArrayEquals(bytes[1], iterator.next().key.get()); > assertFalse(iterator.hasNext()); > } > } > Could someone help me verify if it is a bug or am I missing something? > Thank you.{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)