guozhangwang commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766163659



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -62,4 +71,32 @@ public static void updatePosition(
             position.withComponent(meta.topic(), meta.partition(), 
meta.offset());
         }
     }
+
+    public static boolean isPermitted(
+        final Position position,
+        final PositionBound positionBound,
+        final int partition
+    ) {
+        if (positionBound.isUnbounded()) {
+            return true;
+        } else {
+            final Position bound = positionBound.position();
+            for (final String topic : bound.getTopics()) {
+                final Map<Integer, Long> partitionBounds = 
bound.getBound(topic);
+                final Map<Integer, Long> seenPartitionBounds = 
position.getBound(topic);

Review comment:
       qq: why we name this function of position `getBound` while it seems just 
retrieving the current positions of the topic, not really a bound?
   
   Also for this local variables, similarly, why name it `seenPartitionBounds` 
if the returned value semantics are not really "bounds"? Should that just be 
`currentOffsets` or something?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##########
@@ -107,10 +107,10 @@ private void putAndMaybeForward(final 
ThreadCache.DirtyEntry entry,
             // we can skip flushing to downstream as well as writing to 
underlying store
             if (rawNewValue != null || rawOldValue != null) {
                 // we need to get the old values if needed, and then put to 
store, and then flush
-                wrapped().put(entry.key(), entry.newValue());
-
                 final ProcessorRecordContext current = context.recordContext();
                 context.setRecordContext(entry.entry().context());
+                wrapped().put(entry.key(), entry.newValue());

Review comment:
       Why reorder the steps here?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1764,20 +1764,18 @@ protected void processStreamThread(final 
Consumer<StreamThread> consumer) {
             );
         }
         final StateQueryResult<R> result = new StateQueryResult<>();
+        final Set<Integer> handledPartitions = new HashSet<>();

Review comment:
       Why moving it out of the else block scope?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -125,22 +128,81 @@
         final long deadline = start + DEFAULT_TIMEOUT;
 
         do {
+            if (Thread.currentThread().isInterrupted()) {
+                fail("Test was interrupted.");
+            }
             final StateQueryResult<R> result = kafkaStreams.query(request);
-            if (result.getPartitionResults().keySet().containsAll(partitions)
-                || result.getGlobalResult() != null) {
+            if (result.getPartitionResults().keySet().containsAll(partitions)) 
{
                 return result;
             } else {
-                try {
-                    Thread.sleep(100L);
-                } catch (final InterruptedException e) {
-                    throw new RuntimeException(e);
-                }
+                sleep(100L);
             }
         } while (System.currentTimeMillis() < deadline);
 
         throw new TimeoutException("The query never returned the desired 
partitions");
     }
 
+    /**
+     * Repeatedly runs the query until the response is valid and then return 
the response.
+     * <p>
+     * Validity in this case means that the response position is up to the 
specified bound.
+     * <p>
+     * Once position bounding is generally supported, we should migrate tests 
to wait on the
+     * expected response position.
+     */
+    public static <R> StateQueryResult<R> iqv2WaitForResult(
+        final KafkaStreams kafkaStreams,
+        final StateQueryRequest<R> request) {
+
+        final long start = System.currentTimeMillis();
+        final long deadline = start + DEFAULT_TIMEOUT;
+
+        StateQueryResult<R> result;
+        do {
+            if (Thread.currentThread().isInterrupted()) {
+                fail("Test was interrupted.");
+            }
+
+            result = kafkaStreams.query(request);
+            final LinkedList<QueryResult<R>> allResults = 
getAllResults(result);
+
+            if (allResults.isEmpty()) {

Review comment:
       Just curious when would the result be empty actually? I thought even if 
the tasks were not initialized, we would still return the `NOT_PRESENT` error 
instead of returning empty.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
##########
@@ -97,11 +97,15 @@ public void addResult(final int partition, final 
QueryResult<R> r) {
      * prior observations.
      */
     public Position getPosition() {
-        Position position = Position.emptyPosition();
-        for (final QueryResult<R> r : partitionResults.values()) {
-            position = position.merge(r.getPosition());
+        if (globalResult != null) {

Review comment:
       Not sure I understand this: in this PR we've effectively turned off the 
only caller of "result.setGlobalResult(r);" so this should never hit any more 
right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to