vvcephei commented on a change in pull request #11598:
URL: https://github.com/apache/kafka/pull/11598#discussion_r768122088



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -175,6 +194,7 @@ public synchronized void putAll(final List<KeyValue<Bytes, 
byte[]>> entries) {
         if (from == null && to == null) {
             return getKeyValueIterator(map.keySet(), forward);
         } else if (from == null) {
+            System.out.println("-----------> range upper bound");

Review comment:
       looks like this was left over.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,85 @@ public boolean setFlushListener(final 
CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    collectExecutionInfo,
+                    this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (System.nanoTime() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runRangeQuery(
+            final Query query, final PositionBound positionBound, final 
boolean collectExecutionInfo) {
+
+        final QueryResult<R> result;
+        final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
+        final RangeQuery rawRangeQuery;
+        if (typedQuery.getLowerBound().isPresent() && 
typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withRange(keyBytes(typedQuery.getLowerBound().get()),
+                    keyBytes(typedQuery.getUpperBound().get()));
+        } else if (typedQuery.getLowerBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get()));
+        } else if (typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get()));
+        } else {
+            rawRangeQuery = RangeQuery.withNoBounds();
+        }

Review comment:
       At the risk of being too fancy, what do you think about this instead?
   ```suggestion
           rawRangeQuery = 
RangeQuery.withRange(typedQuery.getLowerBound.map(this::keyBytes),
               typedQuery.getUpperBound.map(this::keyBytes));
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -279,34 +344,34 @@ public static void before()
                 final RecordMetadata recordMetadata = future.get(1, 
TimeUnit.MINUTES);
                 assertThat(recordMetadata.hasOffset(), is(true));
                 INPUT_POSITION.withComponent(
-                    recordMetadata.topic(),
-                    recordMetadata.partition(),
-                    recordMetadata.offset()
+                        recordMetadata.topic(),
+                        recordMetadata.partition(),
+                        recordMetadata.offset()
                 );
             }
         }
 
         assertThat(INPUT_POSITION, equalTo(
-            Position
-                .emptyPosition()
-                .withComponent(INPUT_TOPIC_NAME, 0, 1L)
-                .withComponent(INPUT_TOPIC_NAME, 1, 0L)
+                Position
+                        .emptyPosition()
+                        .withComponent(INPUT_TOPIC_NAME, 0, 1L)
+                        .withComponent(INPUT_TOPIC_NAME, 1, 1L)
         ));
     }
 
     @Before
     public void beforeTest() {
         final StoreSupplier<?> supplier = storeToTest.supplier();
         final Properties streamsConfig = streamsConfiguration(
-            cache,
-            log,
-            storeToTest.name()
+                cache,
+                log,
+                storeToTest.name()

Review comment:
       I'm sorry to sound picky, but do you mind backing out these formatting 
changes? I'm only concerned because there's a lot of them. Otherwise, we'll 
just have duelling autoformat results between commits.
   
   For reference, I have indents and continuation indents both set to `4` for 
this project.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -426,6 +481,68 @@ public void verifyStore() {
             shouldHandlePingQuery();
             shouldCollectExecutionInfo();
             shouldCollectExecutionInfoUnderFailure();
+
+            if (storeToTest.keyValue()) {
+                if (storeToTest.timestamped()) {
+                    shouldHandleRangeQuery(
+                            Optional.of(1),
+                            Optional.of(3),
+                            (Function<ValueAndTimestamp<Integer>, Integer>) 
ValueAndTimestamp::value,
+                            mkSet(1, 2, 3)
+
+                    );
+                    shouldHandleRangeQuery(
+                            Optional.of(1),
+                            Optional.empty(),
+                            (Function<ValueAndTimestamp<Integer>, Integer>) 
ValueAndTimestamp::value,
+                            mkSet(1, 2, 3)
+
+                    );
+                    shouldHandleRangeQuery(
+                            Optional.empty(),
+                            Optional.of(1),
+                            (Function<ValueAndTimestamp<Integer>, Integer>) 
ValueAndTimestamp::value,
+                            mkSet(0, 1)
+
+                    );
+                    shouldHandleRangeQuery(
+                            Optional.empty(),
+                            Optional.empty(),
+                            (Function<ValueAndTimestamp<Integer>, Integer>) 
ValueAndTimestamp::value,
+                            mkSet(0, 1, 2, 3)
+
+                    );
+
+                } else {
+                    shouldHandleRangeQuery(
+                            Optional.of(1),
+                            Optional.of(3),
+                            Function.identity(),
+                            mkSet(1, 2, 3)
+                    );
+                    shouldHandleRangeQuery(
+                            Optional.of(1),
+                            Optional.empty(),
+                            Function.identity(),
+                            mkSet(1, 2, 3)
+
+                    );
+                    shouldHandleRangeQuery(
+                            Optional.empty(),
+                            Optional.of(1),
+                            Function.identity(),
+                            mkSet(0, 1)
+
+                    );
+                    shouldHandleRangeQuery(
+                            Optional.empty(),
+                            Optional.empty(),
+                            Function.identity(),
+                            mkSet(0, 1, 2, 3)
+
+                    );
+                }

Review comment:
       Sorry, not trying to golf here, but since everything between these 
branches is the same except the extractor function, I'm wondering if we should 
factor the checks out into a separate method:
   ```suggestion
                   if (storeToTest.timestamped()) {
                       shouldHandleRangeQueries(
                           (Function<ValueAndTimestamp<Integer>, Integer>) 
ValueAndTimestamp::value
                       );
                   } else {
                       shouldHandleRangeQueries(Function.identity());
                   }
                   
   ...
       private <T> void shouldHandleRangeQueries(final Function<T, Integer> 
extractor) {
                       shouldHandleRangeQuery(
                               Optional.of(1),
                               Optional.of(3),
                               extractor,
                               mkSet(1, 2, 3)
   
                       );
                       shouldHandleRangeQuery(
                               Optional.of(1),
                               Optional.empty(),
                               extractor,
                               mkSet(1, 2, 3)
   
                       );
                       shouldHandleRangeQuery(
                               Optional.empty(),
                               Optional.of(1),
                               extractor,
                               mkSet(0, 1)
   
                       );
                       shouldHandleRangeQuery(
                               Optional.empty(),
                               Optional.empty(),
                               extractor,
                               mkSet(0, 1, 2, 3)
   
                       );
       }
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -92,11 +93,29 @@ Position getPosition() {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public <R> QueryResult<R> query(
         final Query<R> query,
         final PositionBound positionBound,
         final boolean collectExecutionInfo) {
 
+        if (query instanceof RangeQuery) {
+            final RangeQuery<Bytes, byte[]> typedQuery = (RangeQuery<Bytes, 
byte[]>) query;
+            final KeyValueIterator<Bytes, byte[]> keyValueIterator;
+            if (typedQuery.getLowerBound().isPresent() && 
typedQuery.getUpperBound().isPresent()) {
+                keyValueIterator = 
this.range(typedQuery.getLowerBound().get(), typedQuery.getUpperBound().get());
+            } else if (typedQuery.getLowerBound().isPresent()) {
+                keyValueIterator = 
this.range(typedQuery.getLowerBound().get(), null);
+            } else if (typedQuery.getUpperBound().isPresent()) {
+                keyValueIterator = this.range(null, 
typedQuery.getUpperBound().get());
+            } else {
+                keyValueIterator = this.range(null, null);
+            }

Review comment:
       It looks like this could be simplified. WDYT?
   
   ```suggestion
               keyValueIterator = 
this.range(typedQuery.getLowerBound().orElse(null), 
typedQuery.getUpperBound().orElse(null));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to