mdedetrich commented on code in PR #12289: URL: https://github.com/apache/kafka/pull/12289#discussion_r906835167
########## streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java: ########## @@ -210,39 +213,33 @@ public void shouldQuerySpecificActivePartitionStores() throws Exception { } final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam2 = - StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore()) - .withPartition(keyDontBelongPartition); - - try { - // Assert that key is not served when wrong specific partition is requested - // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition - // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested - if (kafkaStreams1IsActive) { - assertThat(store1.get(key), is(notNullValue())); - assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue())); - final InvalidStateStoreException exception = + StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore()) + .withPartition(keyDontBelongPartition); + // Assert that key is not served when wrong specific partition is requested + // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition + // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested + if (kafkaStreams1IsActive) { Review Comment: We could, I have a suspicion that its done this way so the test is easier to read since the flow is more linear but I am not against merging the statements if you think its better. ########## streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java: ########## @@ -535,17 +532,26 @@ public void shouldQueryStoresAfterAddingAndRemovingStreamThread() throws Excepti }); } + private Matcher<String> retrievableException() { + return is( + anyOf( + containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"), + containsString("The state store, source-table, may have migrated to another instance"), + containsString("Cannot get state store source-table because the stream thread is STARTING, not RUNNING"), + containsString("The specified partition 1 for store source-table does not exist.") Review Comment: Agreed but if this is the case (and its a consistent bug) it would hit the timeout. Tbh investigating this was a bit out of my depth, maybe its wise to do a separate PR if we can find a better solution? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org