Chuckame commented on code in PR #16218:
URL: https://github.com/apache/kafka/pull/16218#discussion_r1823572135


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java:
##########
@@ -111,34 +110,26 @@ public void process(final Record<KO, Change<VO>> record) {
                 return;
             }
 
-            final Bytes prefixBytes = keySchema.prefixBytes(record.key());
+            final Bytes foreignKeyBytes = keySchema.prefixBytes(record.key());
 
             //Perform the prefixScan and propagate the results
             try (final KeyValueIterator<Bytes, 
ValueAndTimestamp<SubscriptionWrapper<K>>> prefixScanResults =
-                     subscriptionStore.range(prefixBytes, 
Bytes.increment(prefixBytes))) {
+                     subscriptionStore.prefixScan(foreignKeyBytes, new 
BytesSerializer())) {

Review Comment:
   Personally I prefer to use the native prefixScan instead of a range "hack" 
to profit the RocksDB native implementation, as it's using the index and 
filters differently, and may load into memory different part of the index.
   
   I'm not sure if a range using your increment would work, as the range is 
inclusive, so we are not able to predict what's the maximum value before the 
increment.
   
   Taking the following keys `AAA -> ZZZ` (where we only have A-Z letters), if 
we prefixScan for `A`, the inclusive-end should be the last value before `BAA` 
which is `AZZ`. Your increment suppose to compute a range from `A` to `AZ`, but 
it will only take values until `AZ`, while `AZZ` is greater than `AZ`. 
Comparatively: `A < AB < AXZ < AZ < AZA < AZZ`
   
   
   I think the easiest solution is to just exclude the entry when the key is 
equals to the incremented bytes, but only for the last key instead of all the 
entries, as a range is ordered:
   ```diff
   //Perform the prefixScan and propagate the results
               try (final KeyValueIterator<Bytes, 
ValueAndTimestamp<SubscriptionWrapper<K>>> prefixScanResults =
                            subscriptionStore.range(foreignKeyBytes, 
Bytes.increment(foreignKeyBytes))) {
   
                   while (prefixScanResults.hasNext()) {
                       final KeyValue<Bytes, 
ValueAndTimestamp<SubscriptionWrapper<K>>> next = prefixScanResults.next();
                       // have to check the prefix on the last key because the 
range end is inclusive :(
   -                    if (prefixEquals(next.key.get(), 
foreignKeyBytes.get())) {
   +                    if (prefixScanResults.hasNext() || 
prefixEquals(next.key.get(), foreignKeyBytes.get())) {
                           final CombinedKey<KO, K> combinedKey = 
keySchema.fromBytes(next.key);
                           context().forward(
                                   record.withKey(combinedKey.primaryKey())
                                           .withValue(new 
SubscriptionResponseWrapper<>(
                                                   next.value.value().hash(),
                                                   record.value().newValue,
                                                   
next.value.value().primaryPartition()))
                           );
                       }
                   }
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to