mjsax commented on code in PR #21643:
URL: https://github.com/apache/kafka/pull/21643#discussion_r2898274271


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java:
##########
@@ -638,20 +635,6 @@ private void prepareTimestampedStore() {
         }
     }
 
-    @Test
-    public void shouldThrowUnsupportedOperationExceptionOnQuery() {

Review Comment:
   I think we should keep this test, but instead of expecting an exception, 
expect a `FailedQueryResult` ?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java:
##########
@@ -191,70 +187,6 @@ public void shouldThrowNullPointerIfMetricsScopeIsNull() {
         assertTrue(e.getMessage().contains("storeSupplier's metricsScope can't 
be null"));
     }
 
-    @Test
-    public void shouldThrowUsingIQv2ForInMemoryStores() {

Review Comment:
   So we need to add test that we now support something? Can also be a follow 
up PR (or just Jira ticket for now?)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java:
##########
@@ -152,7 +155,40 @@ public boolean isOpen() {
     public <R> QueryResult<R> query(final Query<R> query,
                                     final PositionBound positionBound,
                                     final QueryConfig config) {
-        throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet.");
+        // Handle KeyQuery: convert byte[] result from timestamped to headers 
format

Review Comment:
   We should also add the "tracing" code, ie, call 
`result.addExecutionInfo(...)`.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java:
##########
@@ -152,6 +155,39 @@ public boolean isOpen() {
     public <R> QueryResult<R> query(final Query<R> query,
                                     final PositionBound positionBound,
                                     final QueryConfig config) {
+        // Handle KeyQuery: convert byte[] result from timestamped to headers 
format
+        if (query instanceof KeyQuery) {
+            final KeyQuery<Bytes, byte[]> keyQuery = (KeyQuery<Bytes, byte[]>) 
query;
+            final QueryResult<byte[]> rawResult = store.query(keyQuery, 
positionBound, config);
+
+            if (rawResult.isSuccess()) {
+                final byte[] convertedValue = 
convertToHeaderFormat(rawResult.getResult());
+                final QueryResult<byte[]> convertedResult =
+                        
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
convertedValue);
+                return (QueryResult<R>) convertedResult;
+            } else {
+                return (QueryResult<R>) rawResult;
+            }
+        }
+
+        // Handle RangeQuery: wrap iterator to convert values
+        if (query instanceof RangeQuery) {
+            final RangeQuery<Bytes, byte[]> rangeQuery = (RangeQuery<Bytes, 
byte[]>) query;
+            final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                    store.query(rangeQuery, positionBound, config);
+
+            if (rawResult.isSuccess()) {
+                final KeyValueIterator<Bytes, byte[]> convertedIterator =
+                        new 
TimestampedToHeadersIteratorAdapter<>(rawResult.getResult());
+                final QueryResult<KeyValueIterator<Bytes, byte[]>> 
convertedResult =
+                        
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
convertedIterator);
+                return (QueryResult<R>) convertedResult;
+            } else {
+                return (QueryResult<R>) rawResult;
+            }
+        }
+
+        // For other query types, delegate to the underlying store

Review Comment:
   Not sure if this is correct? -- For regular layers in the hierarchy, if a 
query-type is not supported, it's fine to forward to lower layers, but for an 
adaptor, if we cannot translate the bytes for unknown query type, even if the 
lower layer supports the query, we would crash in the upper layer trying to 
deserialize as the byte format does not match what the metered layer expects, 
w/o the adaptor fixing it up...
   
   So it seems better to return `FailedQueryResult` for this case?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -152,11 +148,4 @@ private void verifyAndCloseEmptyDefaultColumnFamily(final 
ColumnFamilyHandle col
         }
     }
 
-    @Override
-    public <R> QueryResult<R> query(final Query<R> query,
-                                    final PositionBound positionBound,
-                                    final QueryConfig config) {
-        throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet.");
-    }

Review Comment:
   I don't think so. If we don't overwrite and re-use `RocksDBStore#query()` we 
would start to "support" `KeyQuery` what we don't want to do at this point?
   
   I think we need:
   ```
   @Override
   public <R> QueryResult<R> query(final Query<R> query,
                                   final PositionBound positionBound,
                                   final QueryConfig config) {
       final long start = config.isCollectExecutionInfo() ? System.nanoTime() : 
-1L;
       final QueryResult<R> result;
   
       synchronized (position) {
           result = QueryResult.forUnknownQueryType(query, store);
   
           if (config.isCollectExecutionInfo()) {
               result.addExecutionInfo(
                   "Handled in " + store.getClass() + " in " + 
(System.nanoTime() - start) + "ns"
               );
           }
           result.setPosition(position.copy());
       }
       return result;
   }
   ```



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##########
@@ -440,6 +465,11 @@ public void setup(final boolean cache, final boolean log, 
final StoresToTest sto
 
         final StreamsBuilder builder = new StreamsBuilder();
         if (Objects.equals(kind, "DSL") && supplier instanceof 
KeyValueBytesStoreSupplier) {
+            if (storeToTest.isHeaders()) {
+                // DSL doesn't support headers stores - skip this test 
combination

Review Comment:
   Let write down a TODO (Jira ticket?) to enable this later, after DSL is 
completed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to