cadonna commented on code in PR #12289: URL: https://github.com/apache/kafka/pull/12289#discussion_r911825201
########## streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java: ########## @@ -180,7 +183,7 @@ public void shouldQuerySpecificActivePartitionStores() throws Exception { // Assert that all messages in the first batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); - until(() -> { + retryUntil(() -> { Review Comment: In the PR description you state: > The first reason is that the test uses the until method which just fails whenever a retryable error occurs. I doubt this statement, because within the `until()` the code verifies for retrieable exceptions and returns `false` if it encounters a retrieable exception which would trigger a further iteration over the code passed into the `until()`. You can verify this behavior by running the following test: ``` public void testUntil() throws Exception { until(() -> { try { throw new InvalidStateStorePartitionException( "The specified partition 1 for store source-table does not exist."); } catch (final InvalidStateStoreException exception) { verifyRetriableException(exception); LOG.info("Either streams wasn't running or a re-balancing took place. Will try again."); return false; } }); } ``` This code will retry throwing the exception until the timeout is exceeded. You will notice that when the timeout is exceeded, the test passes. This is definitely a bug! The following rewrite to `until()` will fix this bug: ``` private static void until(final TestCondition condition) { boolean success = false; final long deadline = System.currentTimeMillis() + IntegrationTestUtils.DEFAULT_TIMEOUT; boolean deadlineExceeded = System.currentTimeMillis() >= deadline; while (!success && !deadlineExceeded) { try { success = condition.conditionMet(); Thread.sleep(500L); } catch (final RuntimeException e) { throw e; } catch (final Exception e) { throw new RuntimeException(e); } finally { deadlineExceeded = System.currentTimeMillis() >= deadline; } } if (deadlineExceeded) { fail("Test execution timed out"); } } ``` In summary, I think method `until()` does not cause any flakiness. If you agree with my argumentation could you remove again `retryUntil()`, add the fix to `until()`, and adapt the PR description? ########## streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java: ########## @@ -210,39 +213,33 @@ public void shouldQuerySpecificActivePartitionStores() throws Exception { } final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam2 = - StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore()) - .withPartition(keyDontBelongPartition); - - try { - // Assert that key is not served when wrong specific partition is requested - // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition - // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested - if (kafkaStreams1IsActive) { - assertThat(store1.get(key), is(notNullValue())); - assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue())); - final InvalidStateStoreException exception = + StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore()) + .withPartition(keyDontBelongPartition); + // Assert that key is not served when wrong specific partition is requested + // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition + // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested + if (kafkaStreams1IsActive) { + assertThat(store1.get(key), is(notNullValue())); + assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue())); + final InvalidStateStoreException exception = assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams1, storeQueryParam2).get(key)); - assertThat( + + assertThat( exception.getMessage(), containsString("The specified partition 1 for store source-table does not exist.") - ); - } else { - assertThat(store2.get(key), is(notNullValue())); - assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), is(nullValue())); - final InvalidStateStoreException exception = + ); + } else { + assertThat(store2.get(key), is(notNullValue())); + assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), is(nullValue())); + final InvalidStateStoreException exception = assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams2, storeQueryParam2).get(key)); - assertThat( + assertThat( exception.getMessage(), containsString("The specified partition 1 for store source-table does not exist.") - ); - } - return true; - } catch (final InvalidStateStoreException exception) { - verifyRetrievableException(exception); - LOG.info("Either streams wasn't running or a re-balancing took place. Will try again."); - return false; + ); } - }); + return true; Review Comment: See my comment above about `until()`. ########## streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java: ########## @@ -535,17 +532,26 @@ public void shouldQueryStoresAfterAddingAndRemovingStreamThread() throws Excepti }); } + private Matcher<String> retrievableException() { + return is( + anyOf( + containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"), + containsString("The state store, source-table, may have migrated to another instance"), + containsString("Cannot get state store source-table because the stream thread is STARTING, not RUNNING"), + containsString("The specified partition 1 for store source-table does not exist.") Review Comment: I looked again at this code and discussed it with others and I think that might indeed be one cause of the flakiness of the test. My previous concerns proofed wrong. Thank you a lot for the investigation. ########## streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java: ########## @@ -210,39 +213,33 @@ public void shouldQuerySpecificActivePartitionStores() throws Exception { } final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam2 = - StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore()) - .withPartition(keyDontBelongPartition); - - try { - // Assert that key is not served when wrong specific partition is requested - // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition - // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested - if (kafkaStreams1IsActive) { - assertThat(store1.get(key), is(notNullValue())); - assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue())); - final InvalidStateStoreException exception = + StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore()) + .withPartition(keyDontBelongPartition); + // Assert that key is not served when wrong specific partition is requested + // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition + // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested + if (kafkaStreams1IsActive) { Review Comment: We cannot merge the first two if-statements because in the first if-statement `store1` and `store2` are assigned and both variables are used in each of the branches in the second if-statement. AFAIS, we could merge the second and the third if-statement. I leave it up to you whether the two if-statements should be merged or not. I am fine either way, with a slight preference to merge them. But I also understand that this is not really needed in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org