mjsax commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1373953314
########## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java: ########## @@ -126,15 +128,22 @@ public <R> QueryResult<R> query( final PositionBound positionBound, final QueryConfig config) { - final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; - final QueryResult<R> result = store.query(query, positionBound, config); + QueryResult<R> result = store.query(query, positionBound, config); + Position position = result.getPosition(); + if (result.isSuccess()) { + byte[] res = (byte[]) result.getResult(); + byte[] res1 = convertToTimestampedFormat(res); + result = (QueryResult<R>) QueryResult.forResult(res1); + } + if (config.isCollectExecutionInfo()) { final long end = System.nanoTime(); result.addExecutionInfo( "Handled in " + getClass() + " in " + (end - start) + "ns" ); } + result.setPosition(position); Review Comment: Why this change? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java: ########## @@ -126,15 +128,22 @@ public <R> QueryResult<R> query( final PositionBound positionBound, final QueryConfig config) { - final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; - final QueryResult<R> result = store.query(query, positionBound, config); + QueryResult<R> result = store.query(query, positionBound, config); + Position position = result.getPosition(); + if (result.isSuccess()) { + byte[] res = (byte[]) result.getResult(); + byte[] res1 = convertToTimestampedFormat(res); + result = (QueryResult<R>) QueryResult.forResult(res1); Review Comment: We need to set the position on the newly created result. We should also copy the "execution information" (also wondering if we should add a new entry to it?) \cc @aliehsaeedii WDYT? Maybe we can actually re-use `InternalQueryResultUtil.copyAndSubstituteDeserializedResult` ? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java: ########## @@ -126,15 +128,22 @@ public <R> QueryResult<R> query( final PositionBound positionBound, final QueryConfig config) { - final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; - final QueryResult<R> result = store.query(query, positionBound, config); + QueryResult<R> result = store.query(query, positionBound, config); + Position position = result.getPosition(); + if (result.isSuccess()) { + byte[] res = (byte[]) result.getResult(); + byte[] res1 = convertToTimestampedFormat(res); Review Comment: Use better variable names. Eg `res` -> `plainValue` and `res2` -> `valueWithTimestamp` ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java: ########## @@ -102,4 +140,230 @@ static class RawAndDeserializedValue<ValueType> { this.value = value; } } + + @SuppressWarnings("unchecked") + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final long start = time.nanoseconds(); + final QueryResult<R> result; + + final StoreQueryUtils.QueryHandler handler = queryHandlers.get(query.getClass()); + if (handler == null) { + result = wrapped().query(query, positionBound, config); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns"); + } + } else { + result = (QueryResult<R>) handler.apply( + query, + positionBound, + config, + this + ); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " with serdes " + + serdes + " in " + (time.nanoseconds() - start) + "ns"); + } + } + return result; + } + + + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runTimestampKeyQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final TimestampedKeyQuery<K, V> typedKeyQuery = (TimestampedKeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(serdes, wrapped()); + final ValueAndTimestamp<V> value = deserializer.apply(rawResult.getResult()); + final QueryResult<ValueAndTimestamp<V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runTimestampRangeQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final QueryResult<R> result; + final TimestampedRangeQuery<K, V> typedQuery = (TimestampedRangeQuery<K, V>) query; + + final RangeQuery<Bytes, byte[]> rawRangeQuery; + if (typedQuery.getLowerBound().isPresent() && typedQuery.getUpperBound().isPresent()) { + rawRangeQuery = RangeQuery.withRange( + keyBytes(typedQuery.getLowerBound().get()), + keyBytes(typedQuery.getUpperBound().get()) + ); + } else if (typedQuery.getLowerBound().isPresent()) { + rawRangeQuery = RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get())); + } else if (typedQuery.getUpperBound().isPresent()) { + rawRangeQuery = RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get())); + } else { + rawRangeQuery = RangeQuery.withNoBounds(); + } + final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult = + wrapped().query(rawRangeQuery, positionBound, config); + if (rawResult.isSuccess()) { + final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult(); + final KeyValueIterator<K, V> resultIterator = new MeteredKeyValueTimestampedIterator( + iterator, + getSensor, + getDeserializeValue(serdes, wrapped()), + false + ); + final QueryResult<KeyValueIterator<K, V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult( + rawResult, + resultIterator + ); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runKeyQuery2(final Query<R> query, Review Comment: Why `2` in the end of the name? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java: ########## @@ -102,4 +140,230 @@ static class RawAndDeserializedValue<ValueType> { this.value = value; } } + + @SuppressWarnings("unchecked") + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final long start = time.nanoseconds(); + final QueryResult<R> result; + + final StoreQueryUtils.QueryHandler handler = queryHandlers.get(query.getClass()); + if (handler == null) { + result = wrapped().query(query, positionBound, config); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns"); + } + } else { + result = (QueryResult<R>) handler.apply( + query, + positionBound, + config, + this + ); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " with serdes " + + serdes + " in " + (time.nanoseconds() - start) + "ns"); + } + } + return result; + } + + + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runTimestampKeyQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final TimestampedKeyQuery<K, V> typedKeyQuery = (TimestampedKeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(serdes, wrapped()); + final ValueAndTimestamp<V> value = deserializer.apply(rawResult.getResult()); + final QueryResult<ValueAndTimestamp<V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runTimestampRangeQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final QueryResult<R> result; + final TimestampedRangeQuery<K, V> typedQuery = (TimestampedRangeQuery<K, V>) query; + + final RangeQuery<Bytes, byte[]> rawRangeQuery; + if (typedQuery.getLowerBound().isPresent() && typedQuery.getUpperBound().isPresent()) { + rawRangeQuery = RangeQuery.withRange( + keyBytes(typedQuery.getLowerBound().get()), + keyBytes(typedQuery.getUpperBound().get()) + ); + } else if (typedQuery.getLowerBound().isPresent()) { + rawRangeQuery = RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get())); + } else if (typedQuery.getUpperBound().isPresent()) { + rawRangeQuery = RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get())); + } else { + rawRangeQuery = RangeQuery.withNoBounds(); + } + final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult = + wrapped().query(rawRangeQuery, positionBound, config); + if (rawResult.isSuccess()) { + final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult(); + final KeyValueIterator<K, V> resultIterator = new MeteredKeyValueTimestampedIterator( + iterator, + getSensor, + getDeserializeValue(serdes, wrapped()), + false + ); + final QueryResult<KeyValueIterator<K, V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult( + rawResult, + resultIterator + ); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runKeyQuery2(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue2(serdes, wrapped(), true); + ValueAndTimestamp<V> value = deserializer.apply(rawResult.getResult()); + V res = null; + if (value != null) { + res = value.value(); + } Review Comment: Maybe make this a one-linger, allowing you to use `final` (also, use better variable names `res` is not very descriptive): ``` final V plainValue = value == null ? null : value.value(); ``` ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java: ########## @@ -102,4 +140,230 @@ static class RawAndDeserializedValue<ValueType> { this.value = value; } } + + @SuppressWarnings("unchecked") + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final long start = time.nanoseconds(); + final QueryResult<R> result; + + final StoreQueryUtils.QueryHandler handler = queryHandlers.get(query.getClass()); + if (handler == null) { + result = wrapped().query(query, positionBound, config); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns"); + } + } else { + result = (QueryResult<R>) handler.apply( + query, + positionBound, + config, + this + ); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " with serdes " + + serdes + " in " + (time.nanoseconds() - start) + "ns"); + } + } + return result; + } + + + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runTimestampKeyQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final TimestampedKeyQuery<K, V> typedKeyQuery = (TimestampedKeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(serdes, wrapped()); + final ValueAndTimestamp<V> value = deserializer.apply(rawResult.getResult()); + final QueryResult<ValueAndTimestamp<V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runTimestampRangeQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final QueryResult<R> result; + final TimestampedRangeQuery<K, V> typedQuery = (TimestampedRangeQuery<K, V>) query; + + final RangeQuery<Bytes, byte[]> rawRangeQuery; + if (typedQuery.getLowerBound().isPresent() && typedQuery.getUpperBound().isPresent()) { + rawRangeQuery = RangeQuery.withRange( + keyBytes(typedQuery.getLowerBound().get()), + keyBytes(typedQuery.getUpperBound().get()) + ); + } else if (typedQuery.getLowerBound().isPresent()) { + rawRangeQuery = RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get())); + } else if (typedQuery.getUpperBound().isPresent()) { + rawRangeQuery = RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get())); + } else { + rawRangeQuery = RangeQuery.withNoBounds(); + } + final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult = + wrapped().query(rawRangeQuery, positionBound, config); + if (rawResult.isSuccess()) { + final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult(); + final KeyValueIterator<K, V> resultIterator = new MeteredKeyValueTimestampedIterator( + iterator, + getSensor, + getDeserializeValue(serdes, wrapped()), + false + ); + final QueryResult<KeyValueIterator<K, V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult( + rawResult, + resultIterator + ); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runKeyQuery2(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue2(serdes, wrapped(), true); + ValueAndTimestamp<V> value = deserializer.apply(rawResult.getResult()); + V res = null; + if (value != null) { + res = value.value(); + } + final QueryResult<V> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, res); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runRangeQuery2(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final QueryResult<R> result; + final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query; + final RangeQuery<Bytes, byte[]> rawRangeQuery; + if (typedQuery.getLowerBound().isPresent() && typedQuery.getUpperBound().isPresent()) { + rawRangeQuery = RangeQuery.withRange( + keyBytes(typedQuery.getLowerBound().get()), + keyBytes(typedQuery.getUpperBound().get()) + ); + } else if (typedQuery.getLowerBound().isPresent()) { + rawRangeQuery = RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get())); + } else if (typedQuery.getUpperBound().isPresent()) { + rawRangeQuery = RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get())); + } else { + rawRangeQuery = RangeQuery.withNoBounds(); + } + final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult = + wrapped().query(rawRangeQuery, positionBound, config); + if (rawResult.isSuccess()) { + final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult(); + final KeyValueIterator<K,V> resultIterator = new MeteredKeyValueTimestampedIterator( + iterator, + getSensor, + getDeserializeValue(serdes, wrapped()), + true + ); + final QueryResult<KeyValueIterator<K, V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult( + rawResult, + resultIterator + ); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + private class MeteredKeyValueTimestampedIterator implements KeyValueIterator<K, V> { Review Comment: Seems this is copy of existing `MeteredKeyValueTimestampedIterator` from `MeteredKeyValueStore`. I think we should not just make a duplication of the code, but implement is in a way to re-use code (in the end, the difference is only that we need an additional "unwrapping", that I think we can get done eg by making it part of the `Function` that we pass in. Side note: as discussed in person, the original class should be renamed -- not sure why it has `Timestamped` in it's name -- does not make any sense. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java: ########## @@ -102,4 +140,230 @@ static class RawAndDeserializedValue<ValueType> { this.value = value; } } + + @SuppressWarnings("unchecked") + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final long start = time.nanoseconds(); + final QueryResult<R> result; + + final StoreQueryUtils.QueryHandler handler = queryHandlers.get(query.getClass()); + if (handler == null) { + result = wrapped().query(query, positionBound, config); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns"); + } + } else { + result = (QueryResult<R>) handler.apply( + query, + positionBound, + config, + this + ); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " with serdes " + + serdes + " in " + (time.nanoseconds() - start) + "ns"); + } + } + return result; + } + + + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runTimestampKeyQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final TimestampedKeyQuery<K, V> typedKeyQuery = (TimestampedKeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(serdes, wrapped()); + final ValueAndTimestamp<V> value = deserializer.apply(rawResult.getResult()); + final QueryResult<ValueAndTimestamp<V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runTimestampRangeQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final QueryResult<R> result; + final TimestampedRangeQuery<K, V> typedQuery = (TimestampedRangeQuery<K, V>) query; + + final RangeQuery<Bytes, byte[]> rawRangeQuery; + if (typedQuery.getLowerBound().isPresent() && typedQuery.getUpperBound().isPresent()) { + rawRangeQuery = RangeQuery.withRange( + keyBytes(typedQuery.getLowerBound().get()), + keyBytes(typedQuery.getUpperBound().get()) + ); + } else if (typedQuery.getLowerBound().isPresent()) { + rawRangeQuery = RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get())); + } else if (typedQuery.getUpperBound().isPresent()) { + rawRangeQuery = RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get())); + } else { + rawRangeQuery = RangeQuery.withNoBounds(); + } + final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult = + wrapped().query(rawRangeQuery, positionBound, config); + if (rawResult.isSuccess()) { + final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult(); + final KeyValueIterator<K, V> resultIterator = new MeteredKeyValueTimestampedIterator( + iterator, + getSensor, + getDeserializeValue(serdes, wrapped()), + false + ); + final QueryResult<KeyValueIterator<K, V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult( + rawResult, + resultIterator + ); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runKeyQuery2(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue2(serdes, wrapped(), true); + ValueAndTimestamp<V> value = deserializer.apply(rawResult.getResult()); Review Comment: nit: `value` -> `valueAndTimestamp`. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java: ########## @@ -346,6 +346,23 @@ public static <V> Function<byte[], V> getDeserializeValue(final StateSerdes<?, V return byteArray -> deserializer.deserialize(serdes.topic(), byteArray); } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static <V> Function<byte[], V> getDeserializeValue2(final StateSerdes<?, V> serdes, + final StateStore wrapped, + final boolean isDSLStore ) { + final Serde<V> valueSerde = serdes.valueSerde(); + final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) || isDSLStore; + final Deserializer<V> deserializer; + if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) { Review Comment: We did discuss this part of the code in person, and we where both confused why it's needed -- I think it's needed because of the underlying "bug" that the `KeyValueToTimestampedKeyValueByteStoreAdapter` did not add the missing timestamp on the query result and is not recognized as a `TimestampedBytestStore` -- after this is fixed, I believe we might not need this code any longer (did not try it out, but we should explore it as side cleanup). ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java: ########## @@ -102,4 +140,230 @@ static class RawAndDeserializedValue<ValueType> { this.value = value; } } + + @SuppressWarnings("unchecked") + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final long start = time.nanoseconds(); + final QueryResult<R> result; + + final StoreQueryUtils.QueryHandler handler = queryHandlers.get(query.getClass()); + if (handler == null) { + result = wrapped().query(query, positionBound, config); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns"); + } + } else { + result = (QueryResult<R>) handler.apply( + query, + positionBound, + config, + this + ); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " with serdes " + + serdes + " in " + (time.nanoseconds() - start) + "ns"); + } + } + return result; + } + + + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runTimestampKeyQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final TimestampedKeyQuery<K, V> typedKeyQuery = (TimestampedKeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(serdes, wrapped()); + final ValueAndTimestamp<V> value = deserializer.apply(rawResult.getResult()); + final QueryResult<ValueAndTimestamp<V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runTimestampRangeQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final QueryResult<R> result; + final TimestampedRangeQuery<K, V> typedQuery = (TimestampedRangeQuery<K, V>) query; + + final RangeQuery<Bytes, byte[]> rawRangeQuery; + if (typedQuery.getLowerBound().isPresent() && typedQuery.getUpperBound().isPresent()) { + rawRangeQuery = RangeQuery.withRange( + keyBytes(typedQuery.getLowerBound().get()), + keyBytes(typedQuery.getUpperBound().get()) + ); + } else if (typedQuery.getLowerBound().isPresent()) { + rawRangeQuery = RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get())); + } else if (typedQuery.getUpperBound().isPresent()) { + rawRangeQuery = RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get())); + } else { + rawRangeQuery = RangeQuery.withNoBounds(); + } + final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult = + wrapped().query(rawRangeQuery, positionBound, config); + if (rawResult.isSuccess()) { + final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult(); + final KeyValueIterator<K, V> resultIterator = new MeteredKeyValueTimestampedIterator( + iterator, + getSensor, + getDeserializeValue(serdes, wrapped()), + false + ); + final QueryResult<KeyValueIterator<K, V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult( + rawResult, + resultIterator + ); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runKeyQuery2(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue2(serdes, wrapped(), true); + ValueAndTimestamp<V> value = deserializer.apply(rawResult.getResult()); + V res = null; + if (value != null) { + res = value.value(); + } + final QueryResult<V> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, res); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runRangeQuery2(final Query<R> query, Review Comment: Why `2` at the end? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java: ########## @@ -346,6 +346,23 @@ public static <V> Function<byte[], V> getDeserializeValue(final StateSerdes<?, V return byteArray -> deserializer.deserialize(serdes.topic(), byteArray); } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static <V> Function<byte[], V> getDeserializeValue2(final StateSerdes<?, V> serdes, + final StateStore wrapped, + final boolean isDSLStore ) { + final Serde<V> valueSerde = serdes.valueSerde(); + final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) || isDSLStore; Review Comment: The issue is actually inside `isTimestamped` -- for the `KeyValueToTimestampedKeyValueByteStoreAdapter` it incorrectly returns `false` while it should return `true`. The root cause is a bug in `KeyValueToTimestampedKeyValueByteStoreAdapter` itself: it does not implement the marker interface `TimestampedBytesStore` but it should. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java: ########## @@ -346,6 +346,23 @@ public static <V> Function<byte[], V> getDeserializeValue(final StateSerdes<?, V return byteArray -> deserializer.deserialize(serdes.topic(), byteArray); } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static <V> Function<byte[], V> getDeserializeValue2(final StateSerdes<?, V> serdes, Review Comment: Adding this function is not the right fix IMHO. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java: ########## @@ -102,4 +140,230 @@ static class RawAndDeserializedValue<ValueType> { this.value = value; } } + + @SuppressWarnings("unchecked") + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final long start = time.nanoseconds(); + final QueryResult<R> result; + + final StoreQueryUtils.QueryHandler handler = queryHandlers.get(query.getClass()); + if (handler == null) { + result = wrapped().query(query, positionBound, config); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns"); + } + } else { + result = (QueryResult<R>) handler.apply( + query, + positionBound, + config, + this + ); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " with serdes " + + serdes + " in " + (time.nanoseconds() - start) + "ns"); + } + } + return result; + } + + + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runTimestampKeyQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final TimestampedKeyQuery<K, V> typedKeyQuery = (TimestampedKeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(serdes, wrapped()); + final ValueAndTimestamp<V> value = deserializer.apply(rawResult.getResult()); + final QueryResult<ValueAndTimestamp<V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runTimestampRangeQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final QueryResult<R> result; + final TimestampedRangeQuery<K, V> typedQuery = (TimestampedRangeQuery<K, V>) query; + + final RangeQuery<Bytes, byte[]> rawRangeQuery; + if (typedQuery.getLowerBound().isPresent() && typedQuery.getUpperBound().isPresent()) { + rawRangeQuery = RangeQuery.withRange( + keyBytes(typedQuery.getLowerBound().get()), + keyBytes(typedQuery.getUpperBound().get()) + ); + } else if (typedQuery.getLowerBound().isPresent()) { + rawRangeQuery = RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get())); + } else if (typedQuery.getUpperBound().isPresent()) { + rawRangeQuery = RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get())); + } else { + rawRangeQuery = RangeQuery.withNoBounds(); + } + final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult = + wrapped().query(rawRangeQuery, positionBound, config); + if (rawResult.isSuccess()) { + final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult(); + final KeyValueIterator<K, V> resultIterator = new MeteredKeyValueTimestampedIterator( + iterator, + getSensor, + getDeserializeValue(serdes, wrapped()), + false + ); + final QueryResult<KeyValueIterator<K, V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult( + rawResult, + resultIterator + ); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runKeyQuery2(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue2(serdes, wrapped(), true); Review Comment: It's not save to pass in `true` as third parameter -- this code is also use for PAPI stores... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org