cadonna commented on code in PR #12289:
URL: https://github.com/apache/kafka/pull/12289#discussion_r906002582


##########
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:
##########
@@ -210,39 +213,33 @@ public void shouldQuerySpecificActivePartitionStores() 
throws Exception {
             }
 
             final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, 
Integer>> storeQueryParam2 =
-                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
-                .withPartition(keyDontBelongPartition);
-
-            try {
-                // Assert that key is not served when wrong specific partition 
is requested
-                // If kafkaStreams1 is active for keyPartition, kafkaStreams2 
would be active for keyDontBelongPartition
-                // So, in that case, store3 would be null and the store4 would 
not return the value for key as wrong partition was requested
-                if (kafkaStreams1IsActive) {
-                    assertThat(store1.get(key), is(notNullValue()));
-                    assertThat(getStore(kafkaStreams2, 
storeQueryParam2).get(key), is(nullValue()));
-                    final InvalidStateStoreException exception =
+                    StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                            .withPartition(keyDontBelongPartition);
+            // Assert that key is not served when wrong specific partition is 
requested
+            // If kafkaStreams1 is active for keyPartition, kafkaStreams2 
would be active for keyDontBelongPartition
+            // So, in that case, store3 would be null and the store4 would not 
return the value for key as wrong partition was requested
+            if (kafkaStreams1IsActive) {
+                assertThat(store1.get(key), is(notNullValue()));
+                assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), 
is(nullValue()));
+                final InvalidStateStoreException exception =
                         assertThrows(InvalidStateStoreException.class, () -> 
getStore(kafkaStreams1, storeQueryParam2).get(key));
-                    assertThat(
+

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:
##########
@@ -210,39 +213,33 @@ public void shouldQuerySpecificActivePartitionStores() 
throws Exception {
             }
 
             final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, 
Integer>> storeQueryParam2 =
-                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
-                .withPartition(keyDontBelongPartition);
-
-            try {
-                // Assert that key is not served when wrong specific partition 
is requested
-                // If kafkaStreams1 is active for keyPartition, kafkaStreams2 
would be active for keyDontBelongPartition
-                // So, in that case, store3 would be null and the store4 would 
not return the value for key as wrong partition was requested
-                if (kafkaStreams1IsActive) {
-                    assertThat(store1.get(key), is(notNullValue()));
-                    assertThat(getStore(kafkaStreams2, 
storeQueryParam2).get(key), is(nullValue()));
-                    final InvalidStateStoreException exception =
+                    StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                            .withPartition(keyDontBelongPartition);
+            // Assert that key is not served when wrong specific partition is 
requested
+            // If kafkaStreams1 is active for keyPartition, kafkaStreams2 
would be active for keyDontBelongPartition
+            // So, in that case, store3 would be null and the store4 would not 
return the value for key as wrong partition was requested
+            if (kafkaStreams1IsActive) {
+                assertThat(store1.get(key), is(notNullValue()));
+                assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), 
is(nullValue()));
+                final InvalidStateStoreException exception =
                         assertThrows(InvalidStateStoreException.class, () -> 
getStore(kafkaStreams1, storeQueryParam2).get(key));
-                    assertThat(
+
+                assertThat(
                         exception.getMessage(),
                         containsString("The specified partition 1 for store 
source-table does not exist.")

Review Comment:
   ```suggestion
                   assertThat(
                       exception.getMessage(),
                       containsString("The specified partition 1 for store 
source-table does not exist.")
   ```



##########
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:
##########
@@ -210,39 +213,33 @@ public void shouldQuerySpecificActivePartitionStores() 
throws Exception {
             }
 
             final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, 
Integer>> storeQueryParam2 =
-                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
-                .withPartition(keyDontBelongPartition);
-
-            try {
-                // Assert that key is not served when wrong specific partition 
is requested
-                // If kafkaStreams1 is active for keyPartition, kafkaStreams2 
would be active for keyDontBelongPartition
-                // So, in that case, store3 would be null and the store4 would 
not return the value for key as wrong partition was requested
-                if (kafkaStreams1IsActive) {
-                    assertThat(store1.get(key), is(notNullValue()));
-                    assertThat(getStore(kafkaStreams2, 
storeQueryParam2).get(key), is(nullValue()));
-                    final InvalidStateStoreException exception =
+                    StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                            .withPartition(keyDontBelongPartition);
+            // Assert that key is not served when wrong specific partition is 
requested
+            // If kafkaStreams1 is active for keyPartition, kafkaStreams2 
would be active for keyDontBelongPartition
+            // So, in that case, store3 would be null and the store4 would not 
return the value for key as wrong partition was requested
+            if (kafkaStreams1IsActive) {

Review Comment:
   I am wondering why we have three times the same if-statement `if 
(kafkaStreams1IsActive)`. Can we not merge the three if-statements together?



##########
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:
##########
@@ -535,17 +532,26 @@ public void 
shouldQueryStoresAfterAddingAndRemovingStreamThread() throws Excepti
         });
     }
 
+    private Matcher<String> retrievableException() {
+        return is(
+                anyOf(
+                        containsString("Cannot get state store source-table 
because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
+                        containsString("The state store, source-table, may 
have migrated to another instance"),
+                        containsString("Cannot get state store source-table 
because the stream thread is STARTING, not RUNNING"),
+                        containsString("The specified partition 1 for store 
source-table does not exist.")
+                )
+        );
+    }

Review Comment:
   Fix for indentation and typo
   ```suggestion
       private Matcher<String> retriableException() {
           return is(
               anyOf(
                   containsString("Cannot get state store source-table because 
the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
                   containsString("The state store, source-table, may have 
migrated to another instance"),
                   containsString("Cannot get state store source-table because 
the stream thread is STARTING, not RUNNING"),
                   containsString("The specified partition 1 for store 
source-table does not exist.")
               )
           );
       }
   ```
   Could you please fix the typo in the whole class. It should be `retriable`. 
The typo was mostly existing before this PR?
   Could you please also fix the indentation in the whole class? We use 4 
spaces.



##########
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:
##########
@@ -210,39 +213,33 @@ public void shouldQuerySpecificActivePartitionStores() 
throws Exception {
             }
 
             final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, 
Integer>> storeQueryParam2 =
-                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
-                .withPartition(keyDontBelongPartition);
-
-            try {
-                // Assert that key is not served when wrong specific partition 
is requested
-                // If kafkaStreams1 is active for keyPartition, kafkaStreams2 
would be active for keyDontBelongPartition
-                // So, in that case, store3 would be null and the store4 would 
not return the value for key as wrong partition was requested
-                if (kafkaStreams1IsActive) {
-                    assertThat(store1.get(key), is(notNullValue()));
-                    assertThat(getStore(kafkaStreams2, 
storeQueryParam2).get(key), is(nullValue()));
-                    final InvalidStateStoreException exception =
+                    StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                            .withPartition(keyDontBelongPartition);
+            // Assert that key is not served when wrong specific partition is 
requested
+            // If kafkaStreams1 is active for keyPartition, kafkaStreams2 
would be active for keyDontBelongPartition
+            // So, in that case, store3 would be null and the store4 would not 
return the value for key as wrong partition was requested
+            if (kafkaStreams1IsActive) {
+                assertThat(store1.get(key), is(notNullValue()));
+                assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), 
is(nullValue()));
+                final InvalidStateStoreException exception =
                         assertThrows(InvalidStateStoreException.class, () -> 
getStore(kafkaStreams1, storeQueryParam2).get(key));
-                    assertThat(
+
+                assertThat(
                         exception.getMessage(),
                         containsString("The specified partition 1 for store 
source-table does not exist.")
-                    );
-                } else {
-                    assertThat(store2.get(key), is(notNullValue()));
-                    assertThat(getStore(kafkaStreams1, 
storeQueryParam2).get(key), is(nullValue()));
-                    final InvalidStateStoreException exception =
+                );
+            } else {
+                assertThat(store2.get(key), is(notNullValue()));
+                assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), 
is(nullValue()));
+                final InvalidStateStoreException exception =
                         assertThrows(InvalidStateStoreException.class, () -> 
getStore(kafkaStreams2, storeQueryParam2).get(key));
-                    assertThat(
+                assertThat(
                         exception.getMessage(),
                         containsString("The specified partition 1 for store 
source-table does not exist.")
-                    );
-                }
-                return true;
-            } catch (final InvalidStateStoreException exception) {
-                verifyRetrievableException(exception);
-                LOG.info("Either streams wasn't running or a re-balancing took 
place. Will try again.");
-                return false;
+                );
             }
-        });
+            return true;

Review Comment:
   Why do we still need to return something? With your change we would retry 
until we do not encounter any exception anymore. 
   Maybe it would be better to extend `retryOnExceptionWithTimeout()` (see 
https://github.com/apache/kafka/blob/e4b3a3cdeb295fdd4c4434ec1a7ee77b66553ae0/clients/src/test/java/org/apache/kafka/test/TestUtils.java#L361)
 to include the exception for which to retry.



##########
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:
##########
@@ -535,17 +532,26 @@ public void 
shouldQueryStoresAfterAddingAndRemovingStreamThread() throws Excepti
         });
     }
 
+    private Matcher<String> retrievableException() {
+        return is(
+                anyOf(
+                        containsString("Cannot get state store source-table 
because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
+                        containsString("The state store, source-table, may 
have migrated to another instance"),
+                        containsString("Cannot get state store source-table 
because the stream thread is STARTING, not RUNNING"),
+                        containsString("The specified partition 1 for store 
source-table does not exist.")

Review Comment:
   I see that this might happen, but I am wondering if we can solve that 
differently. The reason is that a bug could also throw that exception and we 
would just retry. Due to exceeding the timeout we would probably understand 
that it is a bug, but it seems really hard to investigate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to