Github user bjlovegithub commented on the issue:

    https://github.com/apache/flink/pull/2629
  
    Hi @tillrohrmann , Thanks for your review ;D I will check through each of 
your comments and update the PR later.
    Coming to the first part of review, the first one is about `UNORDERED` mode 
against `Watermark`. This combination is meaningless, of course. Maybe an error 
can be printed out and the graph generator stops compiling the graph if  
`UNORDERED` mode and `Watermark` are enabled at the same time?
    
    Both of these two modes are guaranteed by `AsyncWaitOperator`. While doing 
checkpoint for the chained operator and making the snapshot for the 
`AsyncWaitOperator`, it will first try to  get all elements in the  
`AsyncCollectorBuffer` by calling `getStreamElementsInBuffer()`, which will try 
to get the lock first to block `Emitter` thread and set a flag named 
`isCheckpointing` to idle `Emitter` thread. So any finished `AsyncCollector` 
will not be transferred to the next operator. Calling the `snapshotState()` 
method is from the head operator to the tail operator, making sure that all 
states can be taken correctly since `Emitter` threads in parent operators have 
stopped working.
    
    I used to consider about using checkpoint lock in `Emitter` thread, but 
after testing with the case chaining multiple `AsyncWaitOperator` together, all 
`Emitter` thread can not fully utilize the the parallelism since they have to 
get the same lock while collecting outputs. One way to optimize this is to put 
a conditional statement at `performCheckpoint()`, if there is an 
`AsyncWaitOpeartor` in the chained operator, then it should broadcast barriers 
later after `checkpointState()`, otherwise, we can use original design.
    
    At last, I will add more test cases based on the 
`OneInputStreamTaskTestHarness`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to