mdedetrich commented on code in PR #12289:
URL: https://github.com/apache/kafka/pull/12289#discussion_r906835871
##########
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:
##########
@@ -210,39 +213,33 @@ public void shouldQuerySpecificActivePartitionStores()
throws Exception {
}
final StoreQueryParameters<ReadOnlyKeyValueStore<Integer,
Integer>> storeQueryParam2 =
- StoreQueryParameters.<ReadOnlyKeyValueStore<Integer,
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
- .withPartition(keyDontBelongPartition);
-
- try {
- // Assert that key is not served when wrong specific partition
is requested
- // If kafkaStreams1 is active for keyPartition, kafkaStreams2
would be active for keyDontBelongPartition
- // So, in that case, store3 would be null and the store4 would
not return the value for key as wrong partition was requested
- if (kafkaStreams1IsActive) {
- assertThat(store1.get(key), is(notNullValue()));
- assertThat(getStore(kafkaStreams2,
storeQueryParam2).get(key), is(nullValue()));
- final InvalidStateStoreException exception =
+ StoreQueryParameters.<ReadOnlyKeyValueStore<Integer,
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+ .withPartition(keyDontBelongPartition);
+ // Assert that key is not served when wrong specific partition is
requested
+ // If kafkaStreams1 is active for keyPartition, kafkaStreams2
would be active for keyDontBelongPartition
+ // So, in that case, store3 would be null and the store4 would not
return the value for key as wrong partition was requested
+ if (kafkaStreams1IsActive) {
+ assertThat(store1.get(key), is(notNullValue()));
+ assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key),
is(nullValue()));
+ final InvalidStateStoreException exception =
assertThrows(InvalidStateStoreException.class, () ->
getStore(kafkaStreams1, storeQueryParam2).get(key));
- assertThat(
+
+ assertThat(
exception.getMessage(),
containsString("The specified partition 1 for store
source-table does not exist.")
- );
- } else {
- assertThat(store2.get(key), is(notNullValue()));
- assertThat(getStore(kafkaStreams1,
storeQueryParam2).get(key), is(nullValue()));
- final InvalidStateStoreException exception =
+ );
+ } else {
+ assertThat(store2.get(key), is(notNullValue()));
+ assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key),
is(nullValue()));
+ final InvalidStateStoreException exception =
assertThrows(InvalidStateStoreException.class, () ->
getStore(kafkaStreams2, storeQueryParam2).get(key));
- assertThat(
+ assertThat(
exception.getMessage(),
containsString("The specified partition 1 for store
source-table does not exist.")
- );
- }
- return true;
- } catch (final InvalidStateStoreException exception) {
- verifyRetrievableException(exception);
- LOG.info("Either streams wasn't running or a re-balancing took
place. Will try again.");
- return false;
+ );
}
- });
+ return true;
Review Comment:
So the original reason for the return value is so that you can easily
replace it with `until`. Regarding extending the
`retryOnExceptionWithTimeout()`, the reason why `retryUntil` is handcoded is
because its keeping the same unwrapping `RuntimeException` that the original
`until` did (see
https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java#L559-L563).
I don't know the original reason why it was done this way but the intention
was to minimize the amount of changes as to not increase the flakiness (but
this is a hunch). Are you happy with just removing the `return` value for now
or do you also want to extend ``retryOnExceptionWithTimeout()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]