mjsax commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1373953314


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##########
@@ -126,15 +128,22 @@ public <R> QueryResult<R> query(
         final PositionBound positionBound,
         final QueryConfig config) {
 
-
         final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
-        final QueryResult<R> result = store.query(query, positionBound, 
config);
+        QueryResult<R> result = store.query(query, positionBound, config);
+        Position position = result.getPosition();
+        if (result.isSuccess()) {
+            byte[] res = (byte[]) result.getResult();
+            byte[] res1 = convertToTimestampedFormat(res);
+            result = (QueryResult<R>) QueryResult.forResult(res1);
+        }
+
         if (config.isCollectExecutionInfo()) {
             final long end = System.nanoTime();
             result.addExecutionInfo(
                 "Handled in " + getClass() + " in " + (end - start) + "ns"
             );
         }
+        result.setPosition(position);

Review Comment:
   Why this change?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##########
@@ -126,15 +128,22 @@ public <R> QueryResult<R> query(
         final PositionBound positionBound,
         final QueryConfig config) {
 
-
         final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
-        final QueryResult<R> result = store.query(query, positionBound, 
config);
+        QueryResult<R> result = store.query(query, positionBound, config);
+        Position position = result.getPosition();
+        if (result.isSuccess()) {
+            byte[] res = (byte[]) result.getResult();
+            byte[] res1 = convertToTimestampedFormat(res);
+            result = (QueryResult<R>) QueryResult.forResult(res1);

Review Comment:
   We need to set the position on the newly created result. We should also copy 
the "execution information" (also wondering if we should add a new entry to 
it?) \cc @aliehsaeedii WDYT?
   
   Maybe we can actually re-use 
`InternalQueryResultUtil.copyAndSubstituteDeserializedResult` ?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##########
@@ -126,15 +128,22 @@ public <R> QueryResult<R> query(
         final PositionBound positionBound,
         final QueryConfig config) {
 
-
         final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
-        final QueryResult<R> result = store.query(query, positionBound, 
config);
+        QueryResult<R> result = store.query(query, positionBound, config);
+        Position position = result.getPosition();
+        if (result.isSuccess()) {
+            byte[] res = (byte[]) result.getResult();
+            byte[] res1 = convertToTimestampedFormat(res);

Review Comment:
   Use better variable names. Eg `res` -> `plainValue` and `res2` -> 
`valueWithTimestamp`



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +140,230 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampKeyQuery(final Query<R> query,
+                                                      final PositionBound 
positionBound,
+                                                      final QueryConfig 
config) {
+        final QueryResult<R> result;
+        final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> value = 
deserializer.apply(rawResult.getResult());
+            final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampRangeQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+
+        final QueryResult<R> result;
+        final TimestampedRangeQuery<K, V> typedQuery = 
(TimestampedRangeQuery<K, V>) query;
+
+        final RangeQuery<Bytes, byte[]> rawRangeQuery;
+        if (typedQuery.getLowerBound().isPresent() && 
typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = RangeQuery.withRange(
+                    keyBytes(typedQuery.getLowerBound().get()),
+                    keyBytes(typedQuery.getUpperBound().get())
+            );
+        } else if (typedQuery.getLowerBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get()));
+        } else if (typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get()));
+        } else {
+            rawRangeQuery = RangeQuery.withNoBounds();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredKeyValueTimestampedIterator(
+                    iterator,
+                    getSensor,
+                    getDeserializeValue(serdes, wrapped()),
+                    false
+            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                            rawResult,
+                            resultIterator
+                    );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runKeyQuery2(final Query<R> query,

Review Comment:
   Why `2` in the end of the name?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +140,230 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampKeyQuery(final Query<R> query,
+                                                      final PositionBound 
positionBound,
+                                                      final QueryConfig 
config) {
+        final QueryResult<R> result;
+        final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> value = 
deserializer.apply(rawResult.getResult());
+            final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampRangeQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+
+        final QueryResult<R> result;
+        final TimestampedRangeQuery<K, V> typedQuery = 
(TimestampedRangeQuery<K, V>) query;
+
+        final RangeQuery<Bytes, byte[]> rawRangeQuery;
+        if (typedQuery.getLowerBound().isPresent() && 
typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = RangeQuery.withRange(
+                    keyBytes(typedQuery.getLowerBound().get()),
+                    keyBytes(typedQuery.getUpperBound().get())
+            );
+        } else if (typedQuery.getLowerBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get()));
+        } else if (typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get()));
+        } else {
+            rawRangeQuery = RangeQuery.withNoBounds();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredKeyValueTimestampedIterator(
+                    iterator,
+                    getSensor,
+                    getDeserializeValue(serdes, wrapped()),
+                    false
+            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                            rawResult,
+                            resultIterator
+                    );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runKeyQuery2(final Query<R> query,
+                                             final PositionBound positionBound,
+                                             final QueryConfig config) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue2(serdes, wrapped(), true);
+            ValueAndTimestamp<V> value = 
deserializer.apply(rawResult.getResult());
+            V res = null;
+            if (value != null) {
+                res = value.value();
+            }

Review Comment:
   Maybe make this a one-linger, allowing you to use `final` (also, use better 
variable names `res` is not very descriptive):
   ```
   final V plainValue = value == null ? null : value.value();
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +140,230 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampKeyQuery(final Query<R> query,
+                                                      final PositionBound 
positionBound,
+                                                      final QueryConfig 
config) {
+        final QueryResult<R> result;
+        final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> value = 
deserializer.apply(rawResult.getResult());
+            final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampRangeQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+
+        final QueryResult<R> result;
+        final TimestampedRangeQuery<K, V> typedQuery = 
(TimestampedRangeQuery<K, V>) query;
+
+        final RangeQuery<Bytes, byte[]> rawRangeQuery;
+        if (typedQuery.getLowerBound().isPresent() && 
typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = RangeQuery.withRange(
+                    keyBytes(typedQuery.getLowerBound().get()),
+                    keyBytes(typedQuery.getUpperBound().get())
+            );
+        } else if (typedQuery.getLowerBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get()));
+        } else if (typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get()));
+        } else {
+            rawRangeQuery = RangeQuery.withNoBounds();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredKeyValueTimestampedIterator(
+                    iterator,
+                    getSensor,
+                    getDeserializeValue(serdes, wrapped()),
+                    false
+            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                            rawResult,
+                            resultIterator
+                    );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runKeyQuery2(final Query<R> query,
+                                             final PositionBound positionBound,
+                                             final QueryConfig config) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue2(serdes, wrapped(), true);
+            ValueAndTimestamp<V> value = 
deserializer.apply(rawResult.getResult());
+            V res = null;
+            if (value != null) {
+                res = value.value();
+            }
+            final QueryResult<V> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, res);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runRangeQuery2(final Query<R> query,
+                                               final PositionBound 
positionBound,
+                                               final QueryConfig config) {
+
+        final QueryResult<R> result;
+        final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
+        final RangeQuery<Bytes, byte[]> rawRangeQuery;
+        if (typedQuery.getLowerBound().isPresent() && 
typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = RangeQuery.withRange(
+                    keyBytes(typedQuery.getLowerBound().get()),
+                    keyBytes(typedQuery.getUpperBound().get())
+            );
+        } else if (typedQuery.getLowerBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get()));
+        } else if (typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get()));
+        } else {
+            rawRangeQuery = RangeQuery.withNoBounds();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K,V> resultIterator = new 
MeteredKeyValueTimestampedIterator(
+                    iterator,
+                    getSensor,
+                    getDeserializeValue(serdes, wrapped()),
+                    true
+            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                            rawResult,
+                            resultIterator
+                    );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private class MeteredKeyValueTimestampedIterator implements 
KeyValueIterator<K, V> {

Review Comment:
   Seems this is copy of existing `MeteredKeyValueTimestampedIterator` from 
`MeteredKeyValueStore`.
   
   I think we should not just make a duplication of the code, but implement is 
in a way to re-use code (in the end, the difference is only that we need an 
additional "unwrapping", that I think we can get done eg by making it part of 
the `Function` that we pass in.
   
   Side note: as discussed in person, the original class should be renamed -- 
not sure why it has `Timestamped` in it's name -- does not make any sense.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +140,230 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampKeyQuery(final Query<R> query,
+                                                      final PositionBound 
positionBound,
+                                                      final QueryConfig 
config) {
+        final QueryResult<R> result;
+        final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> value = 
deserializer.apply(rawResult.getResult());
+            final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampRangeQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+
+        final QueryResult<R> result;
+        final TimestampedRangeQuery<K, V> typedQuery = 
(TimestampedRangeQuery<K, V>) query;
+
+        final RangeQuery<Bytes, byte[]> rawRangeQuery;
+        if (typedQuery.getLowerBound().isPresent() && 
typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = RangeQuery.withRange(
+                    keyBytes(typedQuery.getLowerBound().get()),
+                    keyBytes(typedQuery.getUpperBound().get())
+            );
+        } else if (typedQuery.getLowerBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get()));
+        } else if (typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get()));
+        } else {
+            rawRangeQuery = RangeQuery.withNoBounds();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredKeyValueTimestampedIterator(
+                    iterator,
+                    getSensor,
+                    getDeserializeValue(serdes, wrapped()),
+                    false
+            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                            rawResult,
+                            resultIterator
+                    );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runKeyQuery2(final Query<R> query,
+                                             final PositionBound positionBound,
+                                             final QueryConfig config) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue2(serdes, wrapped(), true);
+            ValueAndTimestamp<V> value = 
deserializer.apply(rawResult.getResult());

Review Comment:
   nit: `value` -> `valueAndTimestamp`.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##########
@@ -346,6 +346,23 @@ public static <V> Function<byte[], V> 
getDeserializeValue(final StateSerdes<?, V
         return byteArray -> deserializer.deserialize(serdes.topic(), 
byteArray);
     }
 
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public static <V> Function<byte[], V> getDeserializeValue2(final 
StateSerdes<?, V> serdes,
+                                                               final 
StateStore wrapped,
+                                                               final boolean 
isDSLStore ) {
+        final Serde<V> valueSerde = serdes.valueSerde();
+        final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) 
|| isDSLStore;
+        final Deserializer<V> deserializer;
+        if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {

Review Comment:
   We did discuss this part of the code in person, and we where both confused 
why it's needed -- I think it's needed because of the underlying "bug" that the 
`KeyValueToTimestampedKeyValueByteStoreAdapter` did not add the missing 
timestamp on the query result and is not recognized as a 
`TimestampedBytestStore` -- after this is fixed, I believe we might not need 
this code any longer (did not try it out, but we should explore it as side 
cleanup).



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +140,230 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampKeyQuery(final Query<R> query,
+                                                      final PositionBound 
positionBound,
+                                                      final QueryConfig 
config) {
+        final QueryResult<R> result;
+        final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> value = 
deserializer.apply(rawResult.getResult());
+            final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampRangeQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+
+        final QueryResult<R> result;
+        final TimestampedRangeQuery<K, V> typedQuery = 
(TimestampedRangeQuery<K, V>) query;
+
+        final RangeQuery<Bytes, byte[]> rawRangeQuery;
+        if (typedQuery.getLowerBound().isPresent() && 
typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = RangeQuery.withRange(
+                    keyBytes(typedQuery.getLowerBound().get()),
+                    keyBytes(typedQuery.getUpperBound().get())
+            );
+        } else if (typedQuery.getLowerBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get()));
+        } else if (typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get()));
+        } else {
+            rawRangeQuery = RangeQuery.withNoBounds();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredKeyValueTimestampedIterator(
+                    iterator,
+                    getSensor,
+                    getDeserializeValue(serdes, wrapped()),
+                    false
+            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                            rawResult,
+                            resultIterator
+                    );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runKeyQuery2(final Query<R> query,
+                                             final PositionBound positionBound,
+                                             final QueryConfig config) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue2(serdes, wrapped(), true);
+            ValueAndTimestamp<V> value = 
deserializer.apply(rawResult.getResult());
+            V res = null;
+            if (value != null) {
+                res = value.value();
+            }
+            final QueryResult<V> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, res);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runRangeQuery2(final Query<R> query,

Review Comment:
   Why `2` at the end?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##########
@@ -346,6 +346,23 @@ public static <V> Function<byte[], V> 
getDeserializeValue(final StateSerdes<?, V
         return byteArray -> deserializer.deserialize(serdes.topic(), 
byteArray);
     }
 
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public static <V> Function<byte[], V> getDeserializeValue2(final 
StateSerdes<?, V> serdes,
+                                                               final 
StateStore wrapped,
+                                                               final boolean 
isDSLStore ) {
+        final Serde<V> valueSerde = serdes.valueSerde();
+        final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) 
|| isDSLStore;

Review Comment:
   The issue is actually inside `isTimestamped` -- for the 
`KeyValueToTimestampedKeyValueByteStoreAdapter` it incorrectly returns `false` 
while it should return `true`. The root cause is a bug in 
`KeyValueToTimestampedKeyValueByteStoreAdapter` itself: it does not implement 
the marker interface `TimestampedBytesStore` but it should.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##########
@@ -346,6 +346,23 @@ public static <V> Function<byte[], V> 
getDeserializeValue(final StateSerdes<?, V
         return byteArray -> deserializer.deserialize(serdes.topic(), 
byteArray);
     }
 
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public static <V> Function<byte[], V> getDeserializeValue2(final 
StateSerdes<?, V> serdes,

Review Comment:
   Adding this function is not the right fix IMHO.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +140,230 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampKeyQuery(final Query<R> query,
+                                                      final PositionBound 
positionBound,
+                                                      final QueryConfig 
config) {
+        final QueryResult<R> result;
+        final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> value = 
deserializer.apply(rawResult.getResult());
+            final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampRangeQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+
+        final QueryResult<R> result;
+        final TimestampedRangeQuery<K, V> typedQuery = 
(TimestampedRangeQuery<K, V>) query;
+
+        final RangeQuery<Bytes, byte[]> rawRangeQuery;
+        if (typedQuery.getLowerBound().isPresent() && 
typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = RangeQuery.withRange(
+                    keyBytes(typedQuery.getLowerBound().get()),
+                    keyBytes(typedQuery.getUpperBound().get())
+            );
+        } else if (typedQuery.getLowerBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get()));
+        } else if (typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get()));
+        } else {
+            rawRangeQuery = RangeQuery.withNoBounds();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredKeyValueTimestampedIterator(
+                    iterator,
+                    getSensor,
+                    getDeserializeValue(serdes, wrapped()),
+                    false
+            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                            rawResult,
+                            resultIterator
+                    );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runKeyQuery2(final Query<R> query,
+                                             final PositionBound positionBound,
+                                             final QueryConfig config) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue2(serdes, wrapped(), true);

Review Comment:
   It's not save to pass in `true` as third parameter -- this code is also use 
for PAPI stores...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to