lindong28 commented on PR #21690: URL: https://github.com/apache/flink/pull/21690#issuecomment-1386793709
Thanks for the detailed explanation @pnowojski :) Let me provides comments below. > It would be strange for me that in order to fully implement this interface and use it as it's designed, one would have to ask > "mailbox" or anything else. With the approach using `CanEmitBatchOfRecordsChecker`, the implementation of the `DataOutput` interface does not rely on `mailbox`. Yes the caller would have to call `CanEmitBatchOfRecordsChecker#check` to use it as designated. With the approach using `boolean DataOutput#emitRecord`, now the implementation would have to reply on `mailbox` etc. And the caller needs to check the returned boolean and understand its semantics (the complexity depends on how we define its semantics). Also note that `SourceOperator#emitNext` is still checking things like `shouldWaitForAlignment` at every step in order to use DataOutput as designated. So I am not sure this approach is any less strange.. > This pattern would be also consistent with returned status from the PushingAsyncDataInput#emitNext method I guess that probably depends on how we explain the semantics of the returned boolean. Maybe we can discuss more after seeing the proposed Java doc for this method. > And on top of that, it would save us one extra method call, so should be slightly faster. Hmm... I am not sure this approach can be faster. It seems that the same number of calls are made on the per-record runtime path. Though the SourceOperator will no longer call that supplier, the `DataOutput#emitRecord()` would now additionally call that supplier, right? BTW, I do agree that implementations should just emit record and check if they can immediately emit next one. That is the key motivation of [FLINK-30533](https://issues.apache.org/jira/browse/FLINK-30533). But it seems that stream processing needs to have a lot of control flow work (e.g. new source split added/removed) and has to periodically check control flow messages. Batch processing does not have this issue. I also plan to see if there is to further mitigate this overhead in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org