mdedetrich commented on code in PR #12289:
URL: https://github.com/apache/kafka/pull/12289#discussion_r906835871


##########
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:
##########
@@ -210,39 +213,33 @@ public void shouldQuerySpecificActivePartitionStores() 
throws Exception {
             }
 
             final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, 
Integer>> storeQueryParam2 =
-                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
-                .withPartition(keyDontBelongPartition);
-
-            try {
-                // Assert that key is not served when wrong specific partition 
is requested
-                // If kafkaStreams1 is active for keyPartition, kafkaStreams2 
would be active for keyDontBelongPartition
-                // So, in that case, store3 would be null and the store4 would 
not return the value for key as wrong partition was requested
-                if (kafkaStreams1IsActive) {
-                    assertThat(store1.get(key), is(notNullValue()));
-                    assertThat(getStore(kafkaStreams2, 
storeQueryParam2).get(key), is(nullValue()));
-                    final InvalidStateStoreException exception =
+                    StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                            .withPartition(keyDontBelongPartition);
+            // Assert that key is not served when wrong specific partition is 
requested
+            // If kafkaStreams1 is active for keyPartition, kafkaStreams2 
would be active for keyDontBelongPartition
+            // So, in that case, store3 would be null and the store4 would not 
return the value for key as wrong partition was requested
+            if (kafkaStreams1IsActive) {
+                assertThat(store1.get(key), is(notNullValue()));
+                assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), 
is(nullValue()));
+                final InvalidStateStoreException exception =
                         assertThrows(InvalidStateStoreException.class, () -> 
getStore(kafkaStreams1, storeQueryParam2).get(key));
-                    assertThat(
+
+                assertThat(
                         exception.getMessage(),
                         containsString("The specified partition 1 for store 
source-table does not exist.")
-                    );
-                } else {
-                    assertThat(store2.get(key), is(notNullValue()));
-                    assertThat(getStore(kafkaStreams1, 
storeQueryParam2).get(key), is(nullValue()));
-                    final InvalidStateStoreException exception =
+                );
+            } else {
+                assertThat(store2.get(key), is(notNullValue()));
+                assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), 
is(nullValue()));
+                final InvalidStateStoreException exception =
                         assertThrows(InvalidStateStoreException.class, () -> 
getStore(kafkaStreams2, storeQueryParam2).get(key));
-                    assertThat(
+                assertThat(
                         exception.getMessage(),
                         containsString("The specified partition 1 for store 
source-table does not exist.")
-                    );
-                }
-                return true;
-            } catch (final InvalidStateStoreException exception) {
-                verifyRetrievableException(exception);
-                LOG.info("Either streams wasn't running or a re-balancing took 
place. Will try again.");
-                return false;
+                );
             }
-        });
+            return true;

Review Comment:
   So the original reason for the return value is so that you can easily 
replace it with `until`. Regarding extending the 
`retryOnExceptionWithTimeout(), the reason why `retryUntil` is handcoded is 
because its keeping the same unwrapping `RuntimeException` that the original 
`until` did (see 
https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java#L559-L563).
   
   I don't know the original reason why it was done this way but the intention 
was to minimize the amount of changes as to not increase the flakiness (but 
this is a hunch). Are you happy with just removing the `return` value for now 
or do you also want to extend ``retryOnExceptionWithTimeout()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to