showuon commented on a change in pull request #9733: URL: https://github.com/apache/kafka/pull/9733#discussion_r548399660
########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ########## @@ -246,18 +261,19 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); waitForRunning(stateTransitions1); - streams2Alpha = getKafkaStreams("appDir2", StreamsConfig.EXACTLY_ONCE); + streams2Alpha = getKafkaStreams(APP_DIR_2, StreamsConfig.EXACTLY_ONCE); streams2Alpha.setStateListener( (newState, oldState) -> stateTransitions2.add(KeyValue.pair(oldState, newState)) ); stateTransitions1.clear(); - assignmentListener.prepareForRebalance(); + prevNumAssignments = assignmentListener.prepareForRebalance(); streams2Alpha.cleanUp(); streams2Alpha.start(); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); + expectedNumAssignments = assignmentListener.numTotalAssignments() - prevNumAssignments; + waitForNumRebalancingToRunning(stateTransitions2, expectedNumAssignments); waitForRunning(stateTransitions1); Review comment: Thanks for your questions. Answer them below: > Also, you PR does not update all stages when we wait for state changes. Why? Would we not need to apply to logic each time? --> Yes, I should do that. What I did now is just focusing on the case: 1 new started stream + 1 already running stream, which will have more failures here. But you're right, I should put the change in all stages. > If you reasoning is right, do we need to use waitForNumRebalancingToRunning here, too? If there are multiple rebalances, both clients would participate? --> Good question! I have explained in the previous comment (https://github.com/apache/kafka/pull/9733#issuecomment-748849742) though, I can explain again since I know you didn't understand exactly why I did this change before. I only `waitForNumRebalancingToRunning` for the new started stream only, not for the "already running stream" because I found sometimes if the stream runs fast enough, the "already running stream" might not have the expected number of `[REBALANCING -> RUNNING]` state transition. The reason is this line: ``` [appDir2-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PARTITIONS_ASSIGNED ``` Basically, The already running stream thread should have the state change: `[RUNNING to PARTITIONS_REVOKED]`, `[PARTITIONS_REVOKED to PARTITIONS_ASSIGNED](unstable)`, `[PARTITIONS_ASSIGNED to RUNNING]`, `[RUNNING to PARTITIONS_ASSIGNED](stable)`, `[PARTITIONS_ASSIGNED to RUNNING]`. Because it needs one more PARTITIONS_REVOKED step, it might be under 2 PARTITIONS_ASSIGNED at the same time (no RUNNING in the middle). And that's why the stream client doesn't change to RUNNING as we expected. Does that make sense? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org