mjsax commented on code in PR #21650:
URL: https://github.com/apache/kafka/pull/21650#discussion_r2898395327


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java:
##########
@@ -211,7 +214,26 @@ public <R> QueryResult<R> query(final Query<R> query,
                                     final PositionBound positionBound,
                                     final QueryConfig config) {
 
-        throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped window stores with headers yet.");
+        final QueryResult<R> result = store.query(query, positionBound, 
config);
+
+        if (!result.isSuccess()) {
+            return result;
+        }
+
+        // Wrap iterators to convert from timestamped format to header format
+        if (query instanceof WindowKeyQuery) {
+            final QueryResult<WindowStoreIterator<byte[]>> rawResult = 
(QueryResult<WindowStoreIterator<byte[]>>) result;
+            final WindowStoreIterator<byte[]> wrappedIterator = new 
TimestampedWindowToHeadersWindowStoreIteratorAdapter(rawResult.getResult());
+            return (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
wrappedIterator);
+        } else if (query instanceof WindowRangeQuery) {
+            final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> 
rawResult =
+                (QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>>) 
result;
+            final KeyValueIterator<Windowed<Bytes>, byte[]> wrappedIterator =
+                new 
TimestampedToHeadersIteratorAdapter<>(rawResult.getResult());
+            return (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
wrappedIterator);
+        }
+
+        return result;

Review Comment:
   Not sure if this is correct? -- For regular layers in the hierarchy, if a 
query-type is not supported, it's fine to forward to lower layers, but for an 
adaptor, if we cannot translate the bytes for unknown query type, even if the 
lower layer supports the query, we would crash in the upper layer trying to 
deserialize as the byte format does not match what the metered layer expects, 
w/o the adaptor fixing it up...
   
   So it seems better to return FailedQueryResult for this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to