mdedetrich commented on code in PR #12289: URL: https://github.com/apache/kafka/pull/12289#discussion_r906835871
########## streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java: ########## @@ -210,39 +213,33 @@ public void shouldQuerySpecificActivePartitionStores() throws Exception { } final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam2 = - StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore()) - .withPartition(keyDontBelongPartition); - - try { - // Assert that key is not served when wrong specific partition is requested - // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition - // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested - if (kafkaStreams1IsActive) { - assertThat(store1.get(key), is(notNullValue())); - assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue())); - final InvalidStateStoreException exception = + StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore()) + .withPartition(keyDontBelongPartition); + // Assert that key is not served when wrong specific partition is requested + // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition + // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested + if (kafkaStreams1IsActive) { + assertThat(store1.get(key), is(notNullValue())); + assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue())); + final InvalidStateStoreException exception = assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams1, storeQueryParam2).get(key)); - assertThat( + + assertThat( exception.getMessage(), containsString("The specified partition 1 for store source-table does not exist.") - ); - } else { - assertThat(store2.get(key), is(notNullValue())); - assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), is(nullValue())); - final InvalidStateStoreException exception = + ); + } else { + assertThat(store2.get(key), is(notNullValue())); + assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), is(nullValue())); + final InvalidStateStoreException exception = assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams2, storeQueryParam2).get(key)); - assertThat( + assertThat( exception.getMessage(), containsString("The specified partition 1 for store source-table does not exist.") - ); - } - return true; - } catch (final InvalidStateStoreException exception) { - verifyRetrievableException(exception); - LOG.info("Either streams wasn't running or a re-balancing took place. Will try again."); - return false; + ); } - }); + return true; Review Comment: So the original reason for the return value is so that you can easily replace it with `until`. Regarding extending the `retryOnExceptionWithTimeout()`, the reason why `retryUntil` is handcoded is because its keeping the same unwrapping `RuntimeException` that the original `until` did (see https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java#L559-L563). I don't know the original reason why it was done this way but the intention was to minimize the amount of changes as to not increase the flakiness (but this is a hunch). Are you happy with just removing the `return` value for now or do you also want to extend ``retryOnExceptionWithTimeout()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org