vvcephei commented on a change in pull request #11567:
URL: https://github.com/apache/kafka/pull/11567#discussion_r775709569



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##########
@@ -317,13 +318,25 @@ public boolean isOpen() {
     @Override
     public <R> QueryResult<R> query(final Query<R> query, final PositionBound 
positionBound,
         final boolean collectExecutionInfo) {
+
+        if (query instanceof WindowRangeQuery) {

Review comment:
       Hey @patrickstuedi , Github won't let me comment on the prior 
conversation, but thanks for pointing out that oversight in the in-memory 
key-value store! I've submitted https://github.com/apache/kafka/pull/11630 to 
fix it.
   
   That was one code block that we overlooked when we moved all the query 
handling logic to StoreQueryUtils.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/query/WindowKeyQuery.java
##########
@@ -50,4 +55,13 @@ public K getKey() {
     public Optional<Instant> getTimeTo() {
         return timeTo;
     }
+
+    @Override
+    public String toString() {
+        return "WindowKeyQuery{" +
+            "key=" + key +
+            ", timeFrom=" + timeFrom +
+            ", timeTo=" + timeTo +
+            '}';
+    }

Review comment:
       Added the `toString` so that the queries printed in exceptions will 
contain useful information.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -540,32 +545,19 @@ public void verifyStore() {
                 if (storeToTest.timestamped()) {
                     final Function<ValueAndTimestamp<Integer>, Integer> 
valueExtractor =
                             ValueAndTimestamp::value;
-                    final Instant timeTo = Instant.now();
-                    final Instant timeFrom = timeTo.minusSeconds(60);
-                    shouldHandleWindowKeyQueries(2, timeFrom, timeTo, 
valueExtractor);
-                    shouldHandleWindowRangeQueries(timeFrom, timeTo, 
valueExtractor);
+                    shouldHandleWindowKeyQueries(valueExtractor);
+                    shouldHandleWindowRangeQueries(valueExtractor);
                 } else {
                     final Function<Integer, Integer> valueExtractor = 
Function.identity();
-                    final Instant timeTo = Instant.now();
-                    final Instant timeFrom = timeTo.minusSeconds(60);
-                    shouldHandleWindowKeyQueries(2, timeFrom, timeTo, 
valueExtractor);
-                    shouldHandleWindowRangeQueries(timeFrom, timeTo, 
valueExtractor);
+                    shouldHandleWindowKeyQueries(valueExtractor);
+                    shouldHandleWindowRangeQueries(valueExtractor);
                 }
             }
 
             if (storeToTest.isSession()) {
-                if (storeToTest.timestamped()) {
-                    final Function<ValueAndTimestamp<Integer>, Integer> 
valueExtractor =
-                            ValueAndTimestamp::value;
-                    final Instant timeTo = Instant.now();
-                    final Instant timeFrom = timeTo.minusSeconds(60);
-                    shouldHandleSessionKeyQueries(2, valueExtractor);
-                } else {
-                    final Function<Integer, Integer> valueExtractor = 
Function.identity();
-                    final Instant timeTo = Instant.now();
-                    final Instant timeFrom = timeTo.minusSeconds(60);
-                    shouldHandleSessionKeyQueries(2, valueExtractor);
-                }
+                // Note there's no "timestamped" differentiation here.
+                // Idiosyncratically, SessionStores are _never_ timestamped.
+                shouldHandleSessionKeyQueries();

Review comment:
       This isn't our fault. When we added the timestamped stores, we chose not 
to make SessionStores timestamped because the session bounds already have the 
end timestamp available, which is identical to the timestamp we would have 
stored in the value.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -906,10 +948,10 @@ public void shouldRejectUnknownQuery() {
                         queryResult.get(partition)::getFailureMessage
                 );
 
-                final KeyValueIterator<Windowed<Integer>, V> iterator = 
queryResult.get(partition)
-                        .getResult();
-                while (iterator.hasNext()) {
-                    
actualValue.add(valueExtactor.apply(iterator.next().value));
+                try (final KeyValueIterator<Windowed<Integer>, V> iterator = 
queryResult.get(partition).getResult()) {
+                    while (iterator.hasNext()) {
+                        actualValue.add((Integer) iterator.next().value);
+                    }

Review comment:
       Huh, looks like @vpapavas and I missed the need to close the iterator 
before.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -93,20 +92,10 @@ Position getPosition() {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    public <R> QueryResult<R> query(
-        final Query<R> query,
-        final PositionBound positionBound,
-        final boolean collectExecutionInfo) {
-
-        if (query instanceof RangeQuery) {
-            final RangeQuery<Bytes, byte[]> typedQuery = (RangeQuery<Bytes, 
byte[]>) query;
-            final KeyValueIterator<Bytes, byte[]> keyValueIterator =  
this.range(
-                    typedQuery.getLowerBound().orElse(null), 
typedQuery.getUpperBound().orElse(null));
-            final R result = (R) keyValueIterator;
-            final QueryResult<R> queryResult = QueryResult.forResult(result);
-            return queryResult;
-        }

Review comment:
       Dropped this unnecessary duplicate code, as discussed.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -58,7 +58,7 @@
         final Query<R> query,
         final StateStore store) {
 
-        return new FailedQueryResult<>(
+        return forFailure(

Review comment:
       This just made it easier to inline the "unknown query" message.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/query/WindowKeyQuery.java
##########
@@ -25,17 +25,22 @@
 
 @Evolving
 public class WindowKeyQuery<K, V> implements Query<WindowStoreIterator<V>> {
+
     private final K key;
     private final Optional<Instant> timeFrom;
     private final Optional<Instant> timeTo;
 
-    private WindowKeyQuery(final K key, final Optional<Instant> timeTo, final 
Optional<Instant> timeFrom) {
+    private WindowKeyQuery(final K key,
+                           final Optional<Instant> timeTo,
+                           final Optional<Instant> timeFrom) {

Review comment:
       Since I was fixing stuff anyway, I went ahead and fixed a bunch of 
formatting issues that I didn't bother mentioning before.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -411,23 +422,40 @@ public void close() {
                                              final boolean 
collectExecutionInfo) {
         final QueryResult<R> result;
         final WindowRangeQuery<K, V> typedQuery = (WindowRangeQuery<K, V>) 
query;
-        if (typedQuery.getTimeFrom().isPresent() && 
typedQuery.getTimeTo().isPresent()) {
-            final WindowRangeQuery<Bytes, byte[]> rawKeyQuery = 
WindowRangeQuery.withWindowStartRange(typedQuery.getTimeFrom().get(), 
typedQuery.getTimeTo().get());
-            final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> 
rawResult = wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (typedQuery.getKey().isPresent()) {

Review comment:
       Looks like we were not handling the right query variant before, but it 
didn't come up yet because other tests were failing before we got to this point.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -296,8 +297,8 @@ public boolean setFlushListener(final CacheFlushListener<K, 
V> listener,
         final QueryResult<byte[]> rawResult =
             wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
         if (rawResult.isSuccess()) {
-            final Deserializer<V> deserializer = getValueDeserializer();
-            final V value = deserializer.deserialize(serdes.topic(), 
rawResult.getResult());
+            final Function<byte[], V> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final V value = deserializer.apply(rawResult.getResult());

Review comment:
       Note, the new version in StoreQueryUtils returns a Function, so that the 
iterators can just invoke the function on the value without having to know the 
right topic to pass in to the deserializer.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/query/WindowRangeQuery.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.time.Instant;
+import java.util.Optional;
+
+public class WindowRangeQuery<K, V> implements 
Query<KeyValueIterator<Windowed<K>, V>> {

Review comment:
       ```suggestion
   @Evolving
   public class WindowRangeQuery<K, V> implements 
Query<KeyValueIterator<Windowed<K>, V>> {
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -411,23 +422,40 @@ public void close() {
                                              final boolean 
collectExecutionInfo) {
         final QueryResult<R> result;
         final WindowRangeQuery<K, V> typedQuery = (WindowRangeQuery<K, V>) 
query;
-        if (typedQuery.getTimeFrom().isPresent() && 
typedQuery.getTimeTo().isPresent()) {
-            final WindowRangeQuery<Bytes, byte[]> rawKeyQuery = 
WindowRangeQuery.withWindowStartRange(typedQuery.getTimeFrom().get(), 
typedQuery.getTimeTo().get());
-            final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> 
rawResult = wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (typedQuery.getKey().isPresent()) {
+            final WindowRangeQuery<Bytes, byte[]> rawKeyQuery =
+                WindowRangeQuery.withKey(
+                    Bytes.wrap(serdes.rawKey(typedQuery.getKey().get()))
+                );
+            final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> 
rawResult =
+                wrapped().query(rawKeyQuery, positionBound, 
collectExecutionInfo);
             if (rawResult.isSuccess()) {
-                final MeteredWindowedKeyValueIterator typedResult = new 
MeteredWindowedKeyValueIterator(rawResult.getResult(),
+                final MeteredWindowedKeyValueIterator<K, V> typedResult =
+                    new MeteredWindowedKeyValueIterator<>(
+                        rawResult.getResult(),
                         fetchSensor,
                         streamsMetrics,
-                        serdes,
-                        time);
-                final QueryResult<MeteredWindowedKeyValueIterator<K, V>> 
typedQueryResult = QueryResult.forResult(typedResult);
+                        serdes::keyFrom,
+                        StoreQueryUtils.getDeserializeValue(serdes, wrapped()),

Review comment:
       This is the reason I exploded the `serdes` reference in favor of 
functions for deserializing the key and value. When we're handling queries for 
non-timestamped stores, we need to be able to adapt the value deserializer.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -308,21 +309,6 @@ public boolean setFlushListener(final 
CacheFlushListener<K, V> listener,
         return result;
     }
 
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    private Deserializer<V> getValueDeserializer() {

Review comment:
       I moved this to StoreQueryUtils because we need it in WindowStore as 
well.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/query/internals/FailedQueryResult.java
##########
@@ -86,4 +86,14 @@ public R getResult() {
             "Cannot get result for failed query. Failure is " + 
failureReason.name() + ": "
                 + failure);
     }
+
+    @Override
+    public String toString() {
+        return "FailedQueryResult{" +
+            "failureReason=" + failureReason +
+            ", failure='" + failure + '\'' +
+            ", executionInfo=" + getExecutionInfo() +
+            ", position=" + getPosition() +
+            '}';
+    }

Review comment:
       It's pretty hard to debug test failures without these, so I added them.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
##########
@@ -52,27 +51,17 @@ Position getPosition() {
     }
 
     @Override
-    public <R> QueryResult<R> query(final Query<R> query, final PositionBound 
positionBound,
-        final boolean collectExecutionInfo) {
-
-        if (query instanceof WindowRangeQuery) {
-            @SuppressWarnings("unchecked") final WindowRangeQuery<Bytes, 
byte[]> windowRangeQuery = (WindowRangeQuery<Bytes, byte[]>) query;
-            if (windowRangeQuery.getKey().isPresent()) {
-                final Bytes key = windowRangeQuery.getKey().get();
-                final KeyValueIterator<Windowed<Bytes>, byte[]> 
keyValueIterator = this.fetch(key);
-                @SuppressWarnings("unchecked") final R result = (R) 
keyValueIterator;
-                final QueryResult<R> queryResult = 
QueryResult.forResult(result);
-                return queryResult;
-            }
-        }

Review comment:
       Dropped duplicate code, as discussed.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##########
@@ -316,27 +315,17 @@ public boolean isOpen() {
     }
 
     @Override
-    public <R> QueryResult<R> query(final Query<R> query, final PositionBound 
positionBound,
-        final boolean collectExecutionInfo) {
-
-        if (query instanceof WindowRangeQuery) {
-            @SuppressWarnings("unchecked") final WindowRangeQuery<Bytes, 
byte[]> windowRangeQuery = (WindowRangeQuery<Bytes, byte[]>) query;
-            if (windowRangeQuery.getKey().isPresent()) {
-                final Bytes key = windowRangeQuery.getKey().get();
-                final KeyValueIterator<Windowed<Bytes>, byte[]> 
keyValueIterator = this.fetch(key);
-                @SuppressWarnings("unchecked") final R result = (R) 
keyValueIterator;
-                final QueryResult<R> queryResult = 
QueryResult.forResult(result);
-                return queryResult;
-            }
-        }

Review comment:
       Dropped this unnecessary duplicate code, as discussed.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -247,7 +248,8 @@ public V fetchSession(final K key, final long 
earliestSessionEndTime, final long
             wrapped().fetch(keyBytes(key)),
             fetchSensor,
             streamsMetrics,
-            serdes,
+            serdes::keyFrom,
+            serdes::valueFrom,

Review comment:
       Explained below

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -112,13 +112,13 @@
     private static final Position INPUT_POSITION = Position.emptyPosition();
     private static final String STORE_NAME = "kv-store";
 
+    private static final long RECORD_TIME = System.currentTimeMillis();

Review comment:
       So that we can deterministically compute the ranges to search, I went 
ahead and gave the records a well-known timestamp.
   
   A future improvement to the tests could be to add records that will fall 
into different windows, but this is good enough for now.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -214,73 +217,120 @@ public static boolean isPermitted(
 
     @SuppressWarnings("unchecked")
     private static <R> QueryResult<R> runWindowKeyQuery(final Query<R> query,
-                                                  final PositionBound 
positionBound,
-                                                  final boolean 
collectExecutionInfo,
-                                                  final StateStore store) {
+                                                        final PositionBound 
positionBound,
+                                                        final boolean 
collectExecutionInfo,
+                                                        final StateStore 
store) {
         if (store instanceof WindowStore) {
-            final WindowKeyQuery<Bytes, byte[]> windowKeyQuery = 
(WindowKeyQuery<Bytes, byte[]>) query;
+            final WindowKeyQuery<Bytes, byte[]> windowKeyQuery =
+                (WindowKeyQuery<Bytes, byte[]>) query;
             final WindowStore<Bytes, byte[]> windowStore = (WindowStore<Bytes, 
byte[]>) store;
             try {
                 if (windowKeyQuery.getTimeFrom().isPresent() && 
windowKeyQuery.getTimeTo().isPresent()) {
-                    final WindowStoreIterator<byte[]> iterator = 
windowStore.fetch(windowKeyQuery.getKey(), windowKeyQuery.getTimeFrom().get(), 
windowKeyQuery.getTimeTo().get());
+                    final WindowStoreIterator<byte[]> iterator = 
windowStore.fetch(
+                        windowKeyQuery.getKey(),
+                        windowKeyQuery.getTimeFrom().get(),
+                        windowKeyQuery.getTimeTo().get()
+                    );
                     return (QueryResult<R>) QueryResult.forResult(iterator);
                 } else {
-                    return 
QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "WindowKeyQuery 
requires key and window bounds to be present");
+                    return QueryResult.forFailure(
+                        FailureReason.UNKNOWN_QUERY_TYPE,
+                        "This store (" + store.getClass() + ") doesn't know 
how to"
+                            + " execute the given query (" + query + ") 
because it only supports"
+                            + " closed-range queries."
+                            + " Contact the store maintainer if you need 
support"
+                            + " for a new query type."
+                    );
                 }
-            } catch(final Exception e){
+            } catch (final Exception e) {
                 final String message = parseStoreException(e, store, query);
-                return QueryResult.forFailure(
-                        FailureReason.STORE_EXCEPTION,
-                        message
-                );
+                return QueryResult.forFailure(FailureReason.STORE_EXCEPTION, 
message);
             }
         } else {
-            return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, 
"Support for WindowKeyQuery's is currently restricted to stores of type 
WindowStore");
+            return QueryResult.forUnknownQueryType(query, store);
         }
     }
 
     @SuppressWarnings("unchecked")
     private static <R> QueryResult<R> runWindowRangeQuery(final Query<R> query,
-                                                        final PositionBound 
positionBound,
-                                                        final boolean 
collectExecutionInfo,
-                                                        final StateStore 
store) {
+                                                          final PositionBound 
positionBound,
+                                                          final boolean 
collectExecutionInfo,
+                                                          final StateStore 
store) {
         if (store instanceof WindowStore) {
-            final WindowRangeQuery<Bytes, byte[]> windowRangeQuery = 
(WindowRangeQuery<Bytes, byte[]>) query;
+            final WindowRangeQuery<Bytes, byte[]> windowRangeQuery =
+                (WindowRangeQuery<Bytes, byte[]>) query;
             final WindowStore<Bytes, byte[]> windowStore = (WindowStore<Bytes, 
byte[]>) store;
             try {
+                // There's no store API for open time ranges
                 if (windowRangeQuery.getTimeFrom().isPresent() && 
windowRangeQuery.getTimeTo().isPresent()) {
-                    final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = 
windowStore.fetchAll(windowRangeQuery.getTimeFrom().get(), 
windowRangeQuery.getTimeTo().get());
+                    final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                        windowStore.fetchAll(
+                            windowRangeQuery.getTimeFrom().get(),
+                            windowRangeQuery.getTimeTo().get()
+                        );
                     return (QueryResult<R>) QueryResult.forResult(iterator);
                 } else {
-                    return 
QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "WindowRangeQuery 
requires window bounds to be present when run against a WindowStore");
+                    return QueryResult.forFailure(
+                        FailureReason.UNKNOWN_QUERY_TYPE,
+                        "This store (" + store.getClass() + ") doesn't know 
how to"
+                            + " execute the given query (" + query + ") 
because"
+                            + " WindowStores only supports 
WindowRangeQuery.withWindowStartRange."
+                            + " Contact the store maintainer if you need 
support"
+                            + " for a new query type."
+                    );
                 }
-            } catch(final Exception e){
+            } catch (final Exception e) {
                 final String message = parseStoreException(e, store, query);
                 return QueryResult.forFailure(
-                        FailureReason.STORE_EXCEPTION,
-                        message
+                    FailureReason.STORE_EXCEPTION,
+                    message
                 );
             }
         } else if (store instanceof SessionStore) {
-            final WindowRangeQuery<Bytes, byte[]> windowRangeQuery = 
(WindowRangeQuery<Bytes, byte[]>) query;
+            final WindowRangeQuery<Bytes, byte[]> windowRangeQuery =
+                (WindowRangeQuery<Bytes, byte[]>) query;
             final SessionStore<Bytes, byte[]> sessionStore = 
(SessionStore<Bytes, byte[]>) store;
             try {
                 if (windowRangeQuery.getKey().isPresent()) {
-                    final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = 
sessionStore.fetch(windowRangeQuery.getKey().get());
+                    final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = 
sessionStore.fetch(
+                        windowRangeQuery.getKey().get());
                     return (QueryResult<R>) QueryResult.forResult(iterator);
                 } else {
-                    return 
QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "WindowRangeQuery 
requires key to be present when run against a SessionStore");
+                    return QueryResult.forFailure(
+                        FailureReason.UNKNOWN_QUERY_TYPE,
+                        "This store (" + store.getClass() + ") doesn't know 
how to"
+                            + " execute the given query (" + query + ") 
because"
+                            + " SessionStores only support 
WindowRangeQuery.withKey."
+                            + " Contact the store maintainer if you need 
support"
+                            + " for a new query type."
+                    );
                 }
-            } catch(final Exception e){
+            } catch (final Exception e) {
                 final String message = parseStoreException(e, store, query);
                 return QueryResult.forFailure(
-                        FailureReason.STORE_EXCEPTION,
-                        message
+                    FailureReason.STORE_EXCEPTION,
+                    message
                 );
             }
         } else {
-            return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, 
"Support for WindowRangeQuery's is currently restricted to Window and Session 
stores");
+            return QueryResult.forUnknownQueryType(query, store);
+        }
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public static <V> Function<byte[], V> getDeserializeValue(final 
StateSerdes<?, V> serdes,
+                                                              final StateStore 
wrapped) {

Review comment:
       Moved from the MeteredKeyValueStore. I still hope we can refactor the 
store hierarchy later to get rid of this entirely.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -411,23 +422,40 @@ public void close() {
                                              final boolean 
collectExecutionInfo) {
         final QueryResult<R> result;
         final WindowRangeQuery<K, V> typedQuery = (WindowRangeQuery<K, V>) 
query;
-        if (typedQuery.getTimeFrom().isPresent() && 
typedQuery.getTimeTo().isPresent()) {
-            final WindowRangeQuery<Bytes, byte[]> rawKeyQuery = 
WindowRangeQuery.withWindowStartRange(typedQuery.getTimeFrom().get(), 
typedQuery.getTimeTo().get());
-            final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> 
rawResult = wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (typedQuery.getKey().isPresent()) {
+            final WindowRangeQuery<Bytes, byte[]> rawKeyQuery =
+                WindowRangeQuery.withKey(
+                    Bytes.wrap(serdes.rawKey(typedQuery.getKey().get()))
+                );
+            final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> 
rawResult =
+                wrapped().query(rawKeyQuery, positionBound, 
collectExecutionInfo);
             if (rawResult.isSuccess()) {
-                final MeteredWindowedKeyValueIterator typedResult = new 
MeteredWindowedKeyValueIterator(rawResult.getResult(),
+                final MeteredWindowedKeyValueIterator<K, V> typedResult =
+                    new MeteredWindowedKeyValueIterator<>(
+                        rawResult.getResult(),
                         fetchSensor,
                         streamsMetrics,
-                        serdes,
-                        time);
-                final QueryResult<MeteredWindowedKeyValueIterator<K, V>> 
typedQueryResult = QueryResult.forResult(typedResult);
+                        serdes::keyFrom,
+                        StoreQueryUtils.getDeserializeValue(serdes, wrapped()),
+                        time
+                    );
+                final QueryResult<MeteredWindowedKeyValueIterator<K, V>> 
typedQueryResult =
+                    QueryResult.forResult(typedResult);
                 result = (QueryResult<R>) typedQueryResult;
             } else {
                 // the generic type doesn't matter, since failed queries have 
no result set.
                 result = (QueryResult<R>) rawResult;
             }
         } else {
-            result = QueryResult.forUnknownQueryType(query, this);
+
+            result = QueryResult.forFailure(
+                FailureReason.UNKNOWN_QUERY_TYPE,
+                "This store (" + getClass() + ") doesn't know how to"
+                    + " execute the given query (" + query + ") because"
+                    + " SessionStores only support WindowRangeQuery.withKey."
+                    + " Contact the store maintainer if you need support"
+                    + " for a new query type."
+            );

Review comment:
       More explanatory error, as discussed.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -214,73 +217,120 @@ public static boolean isPermitted(
 
     @SuppressWarnings("unchecked")
     private static <R> QueryResult<R> runWindowKeyQuery(final Query<R> query,
-                                                  final PositionBound 
positionBound,
-                                                  final boolean 
collectExecutionInfo,
-                                                  final StateStore store) {
+                                                        final PositionBound 
positionBound,
+                                                        final boolean 
collectExecutionInfo,
+                                                        final StateStore 
store) {
         if (store instanceof WindowStore) {
-            final WindowKeyQuery<Bytes, byte[]> windowKeyQuery = 
(WindowKeyQuery<Bytes, byte[]>) query;
+            final WindowKeyQuery<Bytes, byte[]> windowKeyQuery =
+                (WindowKeyQuery<Bytes, byte[]>) query;
             final WindowStore<Bytes, byte[]> windowStore = (WindowStore<Bytes, 
byte[]>) store;
             try {
                 if (windowKeyQuery.getTimeFrom().isPresent() && 
windowKeyQuery.getTimeTo().isPresent()) {
-                    final WindowStoreIterator<byte[]> iterator = 
windowStore.fetch(windowKeyQuery.getKey(), windowKeyQuery.getTimeFrom().get(), 
windowKeyQuery.getTimeTo().get());
+                    final WindowStoreIterator<byte[]> iterator = 
windowStore.fetch(
+                        windowKeyQuery.getKey(),
+                        windowKeyQuery.getTimeFrom().get(),
+                        windowKeyQuery.getTimeTo().get()
+                    );
                     return (QueryResult<R>) QueryResult.forResult(iterator);
                 } else {
-                    return 
QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "WindowKeyQuery 
requires key and window bounds to be present");
+                    return QueryResult.forFailure(
+                        FailureReason.UNKNOWN_QUERY_TYPE,
+                        "This store (" + store.getClass() + ") doesn't know 
how to"
+                            + " execute the given query (" + query + ") 
because it only supports"
+                            + " closed-range queries."
+                            + " Contact the store maintainer if you need 
support"
+                            + " for a new query type."
+                    );
                 }
-            } catch(final Exception e){
+            } catch (final Exception e) {
                 final String message = parseStoreException(e, store, query);
-                return QueryResult.forFailure(
-                        FailureReason.STORE_EXCEPTION,
-                        message
-                );
+                return QueryResult.forFailure(FailureReason.STORE_EXCEPTION, 
message);
             }
         } else {
-            return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, 
"Support for WindowKeyQuery's is currently restricted to stores of type 
WindowStore");
+            return QueryResult.forUnknownQueryType(query, store);

Review comment:
       This message didn't need to be specialized.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -214,73 +217,120 @@ public static boolean isPermitted(
 
     @SuppressWarnings("unchecked")
     private static <R> QueryResult<R> runWindowKeyQuery(final Query<R> query,
-                                                  final PositionBound 
positionBound,
-                                                  final boolean 
collectExecutionInfo,
-                                                  final StateStore store) {
+                                                        final PositionBound 
positionBound,
+                                                        final boolean 
collectExecutionInfo,
+                                                        final StateStore 
store) {
         if (store instanceof WindowStore) {
-            final WindowKeyQuery<Bytes, byte[]> windowKeyQuery = 
(WindowKeyQuery<Bytes, byte[]>) query;
+            final WindowKeyQuery<Bytes, byte[]> windowKeyQuery =
+                (WindowKeyQuery<Bytes, byte[]>) query;
             final WindowStore<Bytes, byte[]> windowStore = (WindowStore<Bytes, 
byte[]>) store;
             try {
                 if (windowKeyQuery.getTimeFrom().isPresent() && 
windowKeyQuery.getTimeTo().isPresent()) {
-                    final WindowStoreIterator<byte[]> iterator = 
windowStore.fetch(windowKeyQuery.getKey(), windowKeyQuery.getTimeFrom().get(), 
windowKeyQuery.getTimeTo().get());
+                    final WindowStoreIterator<byte[]> iterator = 
windowStore.fetch(
+                        windowKeyQuery.getKey(),
+                        windowKeyQuery.getTimeFrom().get(),
+                        windowKeyQuery.getTimeTo().get()
+                    );
                     return (QueryResult<R>) QueryResult.forResult(iterator);
                 } else {
-                    return 
QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "WindowKeyQuery 
requires key and window bounds to be present");
+                    return QueryResult.forFailure(
+                        FailureReason.UNKNOWN_QUERY_TYPE,
+                        "This store (" + store.getClass() + ") doesn't know 
how to"
+                            + " execute the given query (" + query + ") 
because it only supports"
+                            + " closed-range queries."
+                            + " Contact the store maintainer if you need 
support"
+                            + " for a new query type."
+                    );
                 }
-            } catch(final Exception e){
+            } catch (final Exception e) {

Review comment:
       This was the checkstyle error that was failing your build.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -266,6 +266,11 @@ public boolean isWindowed() {
             public boolean isWindowed() {
                 return true;
             }
+
+            @Override
+            public boolean timestamped() {
+                return false;
+            }

Review comment:
       I think this was my bad from before. This store is not a timestamped 
store.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -602,33 +594,78 @@ public void verifyStore() {
         );
     }
 
-    private <T> void shouldHandleWindowKeyQueries(final Integer key, final 
Instant timeFrom, final Instant timeTo, final Function<T, Integer> extractor) {
+    private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer> 
extractor) {
+
+        final long windowSize = WINDOW_SIZE.toMillis();
+        final long windowStart = (RECORD_TIME / windowSize) * windowSize;

Review comment:
       This is why the test was failing for you. The query is for a range of 
window start times, not record times. Since the window size is five minutes, 
the range `[now - 1 minute, now]` wasn't going to contain the actual window 
start time of `now - 5 minutes`. In other words, just a simple oversight :/ 
Sorry for the trouble. 

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -602,33 +594,78 @@ public void verifyStore() {
         );
     }
 
-    private <T> void shouldHandleWindowKeyQueries(final Integer key, final 
Instant timeFrom, final Instant timeTo, final Function<T, Integer> extractor) {
+    private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer> 
extractor) {
+
+        final long windowSize = WINDOW_SIZE.toMillis();
+        final long windowStart = (RECORD_TIME / windowSize) * windowSize;
+
+        // tightest possible start range
         shouldHandleWindowKeyQuery(
-                key,
-                timeFrom,
-                timeTo,
-                extractor,
-                mkSet(1, 2, 3)
+            2,
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart),
+            extractor,
+            mkSet(2)

Review comment:
       Since we're specifying the key, we expect only to get back windows with 
the value for that key. The aggregation we specified is to sum all values for 
the key, and it comes out to `2` because we only write one value for each key; 
namely, the value is the same number as the key.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -367,7 +372,7 @@ public static void before()
                     new ProducerRecord<>(
                         INPUT_TOPIC_NAME,
                         i % partitions,
-                        Time.SYSTEM.milliseconds(),
+                        RECORD_TIME,

Review comment:
       Setting the records' timestamps.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -540,32 +545,19 @@ public void verifyStore() {
                 if (storeToTest.timestamped()) {
                     final Function<ValueAndTimestamp<Integer>, Integer> 
valueExtractor =
                             ValueAndTimestamp::value;
-                    final Instant timeTo = Instant.now();
-                    final Instant timeFrom = timeTo.minusSeconds(60);
-                    shouldHandleWindowKeyQueries(2, timeFrom, timeTo, 
valueExtractor);
-                    shouldHandleWindowRangeQueries(timeFrom, timeTo, 
valueExtractor);
+                    shouldHandleWindowKeyQueries(valueExtractor);
+                    shouldHandleWindowRangeQueries(valueExtractor);

Review comment:
       I moved the bounds into the plural check method, so we can check correct 
behavior for multiple bounds.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -602,33 +594,78 @@ public void verifyStore() {
         );
     }
 
-    private <T> void shouldHandleWindowKeyQueries(final Integer key, final 
Instant timeFrom, final Instant timeTo, final Function<T, Integer> extractor) {
+    private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer> 
extractor) {
+
+        final long windowSize = WINDOW_SIZE.toMillis();
+        final long windowStart = (RECORD_TIME / windowSize) * windowSize;
+
+        // tightest possible start range
         shouldHandleWindowKeyQuery(
-                key,
-                timeFrom,
-                timeTo,
-                extractor,
-                mkSet(1, 2, 3)
+            2,
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart),
+            extractor,
+            mkSet(2)
+        );
 
+        // miss the window start range
+        shouldHandleWindowKeyQuery(
+            2,
+            Instant.ofEpochMilli(windowStart - 1),
+            Instant.ofEpochMilli(windowStart - 1),
+            extractor,
+            mkSet()
+        );
+
+        // miss the key
+        shouldHandleWindowKeyQuery(
+            999,
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart),
+            extractor,
+            mkSet()
+        );
+
+        // miss both
+        shouldHandleWindowKeyQuery(
+            999,
+            Instant.ofEpochMilli(windowStart - 1),
+            Instant.ofEpochMilli(windowStart - 1),
+            extractor,
+            mkSet()
         );

Review comment:
       It seemed like a good idea to check a few other query configurations, 
but none of them showed any problems.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -602,34 +594,155 @@ public void verifyStore() {
         );
     }
 
-    private <T> void shouldHandleWindowKeyQueries(final Integer key, final 
Instant timeFrom, final Instant timeTo, final Function<T, Integer> extractor) {
+    private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer> 
extractor) {
+
+        final long windowSize = WINDOW_SIZE.toMillis();
+        final long windowStart = (RECORD_TIME / windowSize) * windowSize;
+
+        // tightest possible start range
         shouldHandleWindowKeyQuery(
-                key,
-                timeFrom,
-                timeTo,
-                extractor,
-                mkSet(1, 2, 3)
+            2,
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart),
+            extractor,
+            mkSet(2)
+        );
 
+        // miss the window start range
+        shouldHandleWindowKeyQuery(
+            2,
+            Instant.ofEpochMilli(windowStart - 1),
+            Instant.ofEpochMilli(windowStart - 1),
+            extractor,
+            mkSet()
+        );
+
+        // miss the key
+        shouldHandleWindowKeyQuery(
+            999,
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart),
+            extractor,
+            mkSet()
+        );
+
+        // miss both
+        shouldHandleWindowKeyQuery(
+            999,
+            Instant.ofEpochMilli(windowStart - 1),
+            Instant.ofEpochMilli(windowStart - 1),
+            extractor,
+            mkSet()
         );
     }
 
-    private <T> void shouldHandleWindowRangeQueries(final Instant timeFrom, 
final Instant timeTo, final Function<T, Integer> extractor) {
+    private <T> void shouldHandleWindowRangeQueries(final Function<T, Integer> 
extractor) {
+        final long windowSize = WINDOW_SIZE.toMillis();
+        final long windowStart = (RECORD_TIME / windowSize) * windowSize;
+
         shouldHandleWindowRangeQuery(
-                timeFrom,
-                timeTo,
-                extractor,
-                mkSet(1, 2, 3)
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart),
+            extractor,
+            mkSet(0, 1, 2, 3)
+        );
 
+        // miss the window start
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart - 1),
+            Instant.ofEpochMilli(windowStart - 1),
+            extractor,
+            mkSet()
         );
+
+        // Should fail to execute this query on a WindowStore.

Review comment:
       I also added some cases to test the specialized failure messages.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -803,18 +845,18 @@ public void shouldRejectUnknownQuery() {
                 assertThat(queryResult.get(partition).isSuccess(), is(true));
 
                 assertThrows(
-                        IllegalArgumentException.class,
-                        queryResult.get(partition)::getFailureReason
+                    IllegalArgumentException.class,
+                    queryResult.get(partition)::getFailureReason
                 );
                 assertThrows(
-                        IllegalArgumentException.class,
-                        queryResult.get(partition)::getFailureMessage
+                    IllegalArgumentException.class,
+                    queryResult.get(partition)::getFailureMessage
                 );
 
-                final WindowStoreIterator<V> iterator = 
queryResult.get(partition)
-                        .getResult();
-                while (iterator.hasNext()) {
-                    
actualValue.add(valueExtactor.apply(iterator.next().value));
+                try (final WindowStoreIterator<V> iterator = 
queryResult.get(partition).getResult()) {
+                    while (iterator.hasNext()) {
+                        
actualValue.add(valueExtactor.apply(iterator.next().value));
+                    }

Review comment:
       These iterators need to be closed or they'll leak resources (it's the 
same for IQv1 as well).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -350,9 +353,35 @@ public boolean isOpen() {
         return open;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public <R> QueryResult<R> query(final Query<R> query, final PositionBound 
positionBound,
         final boolean collectExecutionInfo) {
+
+        if (query instanceof WindowKeyQuery) {
+            final WindowKeyQuery<Bytes, byte[]> windowKeyQuery = 
(WindowKeyQuery<Bytes, byte[]>) query;
+            if (windowKeyQuery.getTimeFrom().isPresent() && 
windowKeyQuery.getTimeTo().isPresent()) {
+                final Bytes key = windowKeyQuery.getKey();
+                final Instant lower = windowKeyQuery.getTimeFrom().get();
+                final Instant upper = windowKeyQuery.getTimeTo().get();
+                final WindowStoreIterator<byte[]> iterator = this.fetch(key, 
lower, upper);
+                final R result = (R) iterator;
+                final QueryResult<R> queryResult = 
QueryResult.forResult(result);
+                return queryResult;
+            }
+        } else if (query instanceof WindowRangeQuery) {
+            final WindowRangeQuery<Bytes, byte[]> windowRangeQuery = 
(WindowRangeQuery<Bytes, byte[]>) query;
+            if (windowRangeQuery.getTimeFrom().isPresent() &&
+                    windowRangeQuery.getTimeTo().isPresent()) {

Review comment:
       Ah, I was previously mistaken about this. My prior comment was made with 
your KIP-763 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-763%3A+Range+queries+with+open+endpoints)
 in mind.
   
   Upon looking at the WindowStore code and the KIP, I now see that we don't 
support open ranges on window stores, and that the KIP was only about KeyValue 
range queries.
   
   I'm also just now catching on that you're using `Optional.of` there, not 
`Optional.ofNullable`, so it's always the case that we either have both bounds 
and no key or a key with no bounds.
   
   I had a second concern that the window store is not handling the case of a 
range query with a key and no bounds, but upon examination of the WindowStore, 
I can see that there is no method to handle that case, so it makes sense to 
return "unknown query" for it (since the objective at this moment is parity). 
It might be nice if we additionally explain that the error is because the store 
can't handle the parameterization rather than the query itself, though.
   
   And finally, just to clarify, I do think we should consolidate on handling 
the queries in `StoreQueryUtils` rather than in the stores themselves, so these 
comments are meant to apply to the handler in that class, and we should just 
delete the code in this class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to