showuon commented on a change in pull request #9733:
URL: https://github.com/apache/kafka/pull/9733#discussion_r548399660



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -246,18 +261,19 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws 
Exception {
             assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
             waitForRunning(stateTransitions1);
 
-            streams2Alpha = getKafkaStreams("appDir2", 
StreamsConfig.EXACTLY_ONCE);
+            streams2Alpha = getKafkaStreams(APP_DIR_2, 
StreamsConfig.EXACTLY_ONCE);
             streams2Alpha.setStateListener(
                 (newState, oldState) -> 
stateTransitions2.add(KeyValue.pair(oldState, newState))
             );
             stateTransitions1.clear();
 
-            assignmentListener.prepareForRebalance();
+            prevNumAssignments = assignmentListener.prepareForRebalance();
             streams2Alpha.cleanUp();
             streams2Alpha.start();
             assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
+            expectedNumAssignments = assignmentListener.numTotalAssignments() 
- prevNumAssignments;
+            waitForNumRebalancingToRunning(stateTransitions2, 
expectedNumAssignments);
             waitForRunning(stateTransitions1);

Review comment:
       Thanks for your questions. Answer them below:
   > Also, you PR does not update all stages when we wait for state changes. 
Why? Would we not need to apply to logic each time?
   
   --> Yes, I should do that. What I did now is just focusing on the case: 1 
new started stream + 1 already running stream, which will have more failures 
here. But you're right, I should put the change in all stages.
   
   > If you reasoning is right, do we need to use 
waitForNumRebalancingToRunning here, too? If there are multiple rebalances, 
both clients would participate?
   
   --> Good question! I have explained in the previous comment 
(https://github.com/apache/kafka/pull/9733#issuecomment-748849742) though, I 
can explain again since I know you didn't understand exactly why I did this 
change before.
   
   I only `waitForNumRebalancingToRunning` for the new started stream only, not 
for the "already running stream" because I found sometimes if the stream runs 
fast enough, the "already running stream" might not have the expected number of 
`[REBALANCING -> RUNNING]` state transition. The reason is this line:
   ```
   [appDir2-StreamThread-1] State transition from PARTITIONS_ASSIGNED to 
PARTITIONS_ASSIGNED 
   ```
   Basically, The already running stream thread should have the state change: 
`[RUNNING to PARTITIONS_REVOKED]`, `[PARTITIONS_REVOKED to 
PARTITIONS_ASSIGNED](unstable)`, `[PARTITIONS_ASSIGNED to RUNNING]`, `[RUNNING 
to PARTITIONS_ASSIGNED](stable)`, `[PARTITIONS_ASSIGNED to RUNNING]`. Because 
it needs one more PARTITIONS_REVOKED step, it might be under 2 
PARTITIONS_ASSIGNED at the same time. And that's why the stream client doesn't 
change to RUNNING as we expected.
   
   Does that make sense? Any suggestion?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -246,18 +261,19 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws 
Exception {
             assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
             waitForRunning(stateTransitions1);
 
-            streams2Alpha = getKafkaStreams("appDir2", 
StreamsConfig.EXACTLY_ONCE);
+            streams2Alpha = getKafkaStreams(APP_DIR_2, 
StreamsConfig.EXACTLY_ONCE);
             streams2Alpha.setStateListener(
                 (newState, oldState) -> 
stateTransitions2.add(KeyValue.pair(oldState, newState))
             );
             stateTransitions1.clear();
 
-            assignmentListener.prepareForRebalance();
+            prevNumAssignments = assignmentListener.prepareForRebalance();
             streams2Alpha.cleanUp();
             streams2Alpha.start();
             assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
+            expectedNumAssignments = assignmentListener.numTotalAssignments() 
- prevNumAssignments;
+            waitForNumRebalancingToRunning(stateTransitions2, 
expectedNumAssignments);
             waitForRunning(stateTransitions1);

Review comment:
       Thanks for your questions. Answer them below:
   > Also, you PR does not update all stages when we wait for state changes. 
Why? Would we not need to apply to logic each time?
   
   --> Yes, I should do that. What I did now is just focusing on the case: 1 
new started stream + 1 already running stream, which will have more failures 
here. But you're right, I should put the change in all stages.
   
   > If you reasoning is right, do we need to use 
waitForNumRebalancingToRunning here, too? If there are multiple rebalances, 
both clients would participate?
   
   --> Good question! I have explained in the previous comment 
(https://github.com/apache/kafka/pull/9733#issuecomment-748849742) though, I 
can explain again since I know you didn't understand exactly why I did this 
change before.
   
   I only `waitForNumRebalancingToRunning` for the new started stream only, not 
for the "already running stream" because I found sometimes if the stream runs 
fast enough, the "already running stream" might not have the expected number of 
`[REBALANCING -> RUNNING]` state transition. The reason is this line:
   ```
   [appDir2-StreamThread-1] State transition from PARTITIONS_ASSIGNED to 
PARTITIONS_ASSIGNED 
   ```
   Basically, The already running stream thread should have the state change: 
`[RUNNING to PARTITIONS_REVOKED]`, `[PARTITIONS_REVOKED to 
PARTITIONS_ASSIGNED](unstable)`, `[PARTITIONS_ASSIGNED to RUNNING]`, `[RUNNING 
to PARTITIONS_ASSIGNED](stable)`, `[PARTITIONS_ASSIGNED to RUNNING]`. Because 
it needs one more PARTITIONS_REVOKED step, it might be under 2 
PARTITIONS_ASSIGNED at the same time. And that's why the stream client doesn't 
change to RUNNING as we expected.
   
   Does that make sense? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to