Chuckame commented on code in PR #16218: URL: https://github.com/apache/kafka/pull/16218#discussion_r1823572135
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java: ########## @@ -111,34 +110,26 @@ public void process(final Record<KO, Change<VO>> record) { return; } - final Bytes prefixBytes = keySchema.prefixBytes(record.key()); + final Bytes foreignKeyBytes = keySchema.prefixBytes(record.key()); //Perform the prefixScan and propagate the results try (final KeyValueIterator<Bytes, ValueAndTimestamp<SubscriptionWrapper<K>>> prefixScanResults = - subscriptionStore.range(prefixBytes, Bytes.increment(prefixBytes))) { + subscriptionStore.prefixScan(foreignKeyBytes, new BytesSerializer())) { Review Comment: Personally I prefer to use the native prefixScan instead of a range "hack" to profit the RocksDB native implementation, as it's using the index and filters differently, and may load into memory different part of the index. I'm not sure if a range using your increment would work, as the range is inclusive, so we are not able to predict what's the maximum value before the increment. Taking the following keys `AAA -> ZZZ` (where we only have A-Z letters), if we prefixScan for `A`, the inclusive-end should be the last value before `BAA` which is `AZZ`. Your increment suppose to compute a range from `A` to `AZ`, but it will only take values until `AZ`, while `AZZ` is greater than `AZ`. Comparatively: `A < AB < AXZ < AZ < AZA < AZZ` I think the easiest solution is to just exclude the entry when the key is equals to the incremented bytes, but only for the last key instead of all the entries, as a range is ordered: ```diff //Perform the prefixScan and propagate the results try (final KeyValueIterator<Bytes, ValueAndTimestamp<SubscriptionWrapper<K>>> prefixScanResults = subscriptionStore.range(foreignKeyBytes, Bytes.increment(foreignKeyBytes))) { while (prefixScanResults.hasNext()) { final KeyValue<Bytes, ValueAndTimestamp<SubscriptionWrapper<K>>> next = prefixScanResults.next(); // have to check the prefix on the last key because the range end is inclusive :( - if (prefixEquals(next.key.get(), foreignKeyBytes.get())) { + if (prefixScanResults.hasNext() || prefixEquals(next.key.get(), foreignKeyBytes.get())) { final CombinedKey<KO, K> combinedKey = keySchema.fromBytes(next.key); context().forward( record.withKey(combinedKey.primaryKey()) .withValue(new SubscriptionResponseWrapper<>( next.value.value().hash(), record.value().newValue, next.value.value().primaryPartition())) ); } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org