cadonna commented on code in PR #12289:
URL: https://github.com/apache/kafka/pull/12289#discussion_r911825201


##########
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:
##########
@@ -180,7 +183,7 @@ public void shouldQuerySpecificActivePartitionStores() 
throws Exception {
 
         // Assert that all messages in the first batch were processed in a 
timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, 
TimeUnit.SECONDS), is(equalTo(true)));
-        until(() -> {
+        retryUntil(() -> {

Review Comment:
   In the PR description you state:
   
   > The first reason is that the test uses the until method which just fails 
whenever a retryable error occurs.
   
   I doubt this statement, because within the `until()` the code verifies for 
retrieable exceptions and returns `false` if it encounters a retrieable 
exception which would trigger a further iteration over the code passed into the 
`until()`.
   You can verify this behavior by running the following test:
   ```
   public void testUntil() throws Exception {
     until(() -> {
         try {
             throw new InvalidStateStorePartitionException(
                 "The specified partition 1 for store source-table does not 
exist.");
         } catch (final InvalidStateStoreException exception) {
             verifyRetriableException(exception);
             LOG.info("Either streams wasn't running or a re-balancing took 
place. Will try again.");
             return false;
         }
     });
   }
   ``` 
   This code will retry throwing the exception until the timeout is exceeded.
   You will  notice that when the timeout is exceeded, the test passes. This is 
definitely a bug!
   The following rewrite to `until()` will fix this bug:
   ```
     private static void until(final TestCondition condition) {
         boolean success = false;
         final long deadline = System.currentTimeMillis() + 
IntegrationTestUtils.DEFAULT_TIMEOUT;
         boolean deadlineExceeded = System.currentTimeMillis() >= deadline;
         while (!success && !deadlineExceeded) {
             try {
                 success = condition.conditionMet();
                 Thread.sleep(500L);
             } catch (final RuntimeException e) {
                 throw e;
             } catch (final Exception e) {
                 throw new RuntimeException(e);
             } finally {
                 deadlineExceeded = System.currentTimeMillis() >= deadline;
             }
         }
         if (deadlineExceeded) {
             fail("Test execution timed out");
         }
     }
   ``` 
   
   In summary, I think method `until()` does not cause any flakiness.
   
   If you agree with my argumentation could you remove again `retryUntil()`, 
add the fix to `until()`, and adapt the PR description?



##########
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:
##########
@@ -210,39 +213,33 @@ public void shouldQuerySpecificActivePartitionStores() 
throws Exception {
             }
 
             final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, 
Integer>> storeQueryParam2 =
-                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
-                .withPartition(keyDontBelongPartition);
-
-            try {
-                // Assert that key is not served when wrong specific partition 
is requested
-                // If kafkaStreams1 is active for keyPartition, kafkaStreams2 
would be active for keyDontBelongPartition
-                // So, in that case, store3 would be null and the store4 would 
not return the value for key as wrong partition was requested
-                if (kafkaStreams1IsActive) {
-                    assertThat(store1.get(key), is(notNullValue()));
-                    assertThat(getStore(kafkaStreams2, 
storeQueryParam2).get(key), is(nullValue()));
-                    final InvalidStateStoreException exception =
+                    StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                            .withPartition(keyDontBelongPartition);
+            // Assert that key is not served when wrong specific partition is 
requested
+            // If kafkaStreams1 is active for keyPartition, kafkaStreams2 
would be active for keyDontBelongPartition
+            // So, in that case, store3 would be null and the store4 would not 
return the value for key as wrong partition was requested
+            if (kafkaStreams1IsActive) {
+                assertThat(store1.get(key), is(notNullValue()));
+                assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), 
is(nullValue()));
+                final InvalidStateStoreException exception =
                         assertThrows(InvalidStateStoreException.class, () -> 
getStore(kafkaStreams1, storeQueryParam2).get(key));
-                    assertThat(
+
+                assertThat(
                         exception.getMessage(),
                         containsString("The specified partition 1 for store 
source-table does not exist.")
-                    );
-                } else {
-                    assertThat(store2.get(key), is(notNullValue()));
-                    assertThat(getStore(kafkaStreams1, 
storeQueryParam2).get(key), is(nullValue()));
-                    final InvalidStateStoreException exception =
+                );
+            } else {
+                assertThat(store2.get(key), is(notNullValue()));
+                assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), 
is(nullValue()));
+                final InvalidStateStoreException exception =
                         assertThrows(InvalidStateStoreException.class, () -> 
getStore(kafkaStreams2, storeQueryParam2).get(key));
-                    assertThat(
+                assertThat(
                         exception.getMessage(),
                         containsString("The specified partition 1 for store 
source-table does not exist.")
-                    );
-                }
-                return true;
-            } catch (final InvalidStateStoreException exception) {
-                verifyRetrievableException(exception);
-                LOG.info("Either streams wasn't running or a re-balancing took 
place. Will try again.");
-                return false;
+                );
             }
-        });
+            return true;

Review Comment:
   See my comment above about `until()`.



##########
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:
##########
@@ -535,17 +532,26 @@ public void 
shouldQueryStoresAfterAddingAndRemovingStreamThread() throws Excepti
         });
     }
 
+    private Matcher<String> retrievableException() {
+        return is(
+                anyOf(
+                        containsString("Cannot get state store source-table 
because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
+                        containsString("The state store, source-table, may 
have migrated to another instance"),
+                        containsString("Cannot get state store source-table 
because the stream thread is STARTING, not RUNNING"),
+                        containsString("The specified partition 1 for store 
source-table does not exist.")

Review Comment:
   I looked again at this code and discussed it with others and I think that 
might indeed be one cause of the flakiness of the test. My previous concerns 
proofed wrong. Thank you a lot for the investigation. 



##########
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:
##########
@@ -210,39 +213,33 @@ public void shouldQuerySpecificActivePartitionStores() 
throws Exception {
             }
 
             final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, 
Integer>> storeQueryParam2 =
-                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
-                .withPartition(keyDontBelongPartition);
-
-            try {
-                // Assert that key is not served when wrong specific partition 
is requested
-                // If kafkaStreams1 is active for keyPartition, kafkaStreams2 
would be active for keyDontBelongPartition
-                // So, in that case, store3 would be null and the store4 would 
not return the value for key as wrong partition was requested
-                if (kafkaStreams1IsActive) {
-                    assertThat(store1.get(key), is(notNullValue()));
-                    assertThat(getStore(kafkaStreams2, 
storeQueryParam2).get(key), is(nullValue()));
-                    final InvalidStateStoreException exception =
+                    StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                            .withPartition(keyDontBelongPartition);
+            // Assert that key is not served when wrong specific partition is 
requested
+            // If kafkaStreams1 is active for keyPartition, kafkaStreams2 
would be active for keyDontBelongPartition
+            // So, in that case, store3 would be null and the store4 would not 
return the value for key as wrong partition was requested
+            if (kafkaStreams1IsActive) {

Review Comment:
   We cannot merge the first two if-statements because in the first 
if-statement `store1` and `store2` are assigned and both variables are used in 
each of the branches in the second if-statement.
   AFAIS, we could merge the second and the third if-statement. I leave it up 
to you whether the two if-statements should be merged or not. I am fine either 
way, with a slight preference to merge them. But I also understand that this is 
not really needed in this PR.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to