showuon commented on pull request #9733:
URL: https://github.com/apache/kafka/pull/9733#issuecomment-748849742


   @mjsax , as mentioned in 
https://github.com/apache/kafka/pull/9690#issuecomment-747243284, I've updated 
the commit. What my change is:
   1.  Before my change, when we started a stream with the other one is 
running, we'll just `waitForRunning` for both streams. But we actually expect 
more than 1 rebalancing here (unstable + stable assignments), that would make 
us run the following tests under the wrong stream state. For example: When 
stream1 started, it might go through 
   ```
   [CREATED -> REBALANCING], [REBALANCING -> RUNNING](unstable), [RUNNING -> 
REBALANCING], [REBALANCING -> RUNNING](stable)
   ```
   So, if we only `waitForRunning`, it might be just after the unstable 
rebalancing. Though we have `waitForStableAssignment`, it still doesn't 
guarantee it already completed the stable rebalacing. That is, we need to wait 
for the "specific number" of `[REBALANCING -> RUNNING]` state transition so 
that we can make sure it completes all necessary rebalancing. And the "specific 
number" can be counted by the `onAssignmentComplete` callback. 
   
   Note: I only `waitForNumRebalancingToRunning` for the new started stream 
because I found sometimes if the stream runs fast enough, the already running 
stream might not have the expected number of `[REBALANCING -> RUNNING]` state 
transition. The reason is this line:
   ```
   [appDir2-StreamThread-1] State transition from PARTITIONS_ASSIGNED to 
PARTITIONS_ASSIGNED 
   ```
   Basically, The already running stream thread should have the state change: 
`[RUNNING to PARTITIONS_REVOKED]`, `[PARTITIONS_REVOKED to 
PARTITIONS_ASSIGNED](unstable)`, `[PARTITIONS_ASSIGNED to RUNNING] + [RUNNING 
to PARTITIONS_ASSIGNED](stable)`, `[PARTITIONS_ASSIGNED to RUNNING]`. Because 
it needs one more PARTITIONS_REVOKED step, it might be under 2 
PARTITIONS_ASSIGNED at the same time. 
   
   2. Fail the test when there are too many exception thrown or unexpected 
exception thrown by the `hasUnexpectedError` flag.
   
   Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to