Hi Guowei, Thanks a lot for your feedback. Your advices are really helpful. I've updated the FLIP-245[1] to includes these parts. > First of all, please complete the fault-tolerant processing flow in the FLIP.
After an execution is created and a source operator becomes ready to receive events, subtaskReady is called, SpeculativeSourceCoordinator would store the mapping of SubtaskGateway to execution attempt in SpeculativeSourceCoordinatorContext. Then source operator registers the reader to the coordinator, SpeculativeSourceCoordinator would store the mapping of source reader to execution attempt in SpeculativeSourceCoordinatorContext. If the execution goes through a failover, subtaskFailed is called, SpeculativeSourceCoordinator would clear information about this execution, including source readers and SubtaskGateway. If all the current executions of the execution vertex are failed, subtaskReset would be called, SpeculativeSourceCoordinator would clear all information about this executions and adding splits back to the split enumerator of source. > Secondly the FLIP only says that user-defined events are not supported, but it does not explain how to deal with the existing ReportedWatermarkEvent/ReaderRegistrationEvent. For ReaderRegistrationEvent: When source operator registers the reader to the coordinator, SpeculativeSourceCoordinator would also store the mapping of source reader to execution attempt in SpeculativeSourceCoordinatorContext. Like SourceCoordinator, it also needs to call SplitEnumerator#addReader to add a new source reader. Besides, in order to distinguish source reader between different execution, 'ReaderInfo' need to add 'attemptId' field. For ReportedWatermarkEvent: ReportedWatermarkEvent is introduced in 1.15 which is used to support watermark alignment in streaming mode. Speculative execution is only enabled in batch mode. Therefore, SpeculativeSourceCoordinator would thrown an exception if receive a ReportedWatermarkEvent. Besides, after offline discussion with Jiangjie (Becket) Qin, I've add support for SourceEvent because it's useful for some user-defined sources which have a custom event protocol between reader and enumerator. Best, Jing Zhang [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job Guowei Ma <guowei....@gmail.com> 于2022年6月29日周三 18:06写道: > Hi, Jing > > Thanks a lot for writing this FLIP, which is very useful to Batch users. > Currently I have only two small questions: > > 1. First of all, please complete the fault-tolerant processing flow in the > FLIP. (Maybe you've already considered it, but it's better to explicitly > give the specific solution in the FLIP.) > For example, how to handle Source `Reader` in case of error. As far as I > know, once the reader is unavailable, it will result in the inability to > allocate a new split, which may be unacceptable in the case of speculative > execution. > > 2. Secondly the FLIP only says that user-defined events are not supported, > but it does not explain how to deal with the existing > ReportedWatermarkEvent/ReaderRegistrationEvent. After all, in the case of > speculative execution, there may be two "same" tasks being executed at the > same time. If these events are repeated, whether they really have no effect > on the execution of the job, there is still a clear evaluation. > > Best, > Guowei > > > On Fri, Jun 24, 2022 at 5:41 PM Jing Zhang <beyond1...@gmail.com> wrote: > > > Hi all, > > One major problem of Flink batch jobs is slow tasks running on hot/bad > > nodes, resulting in very long execution time. > > > > In order to solve this problem, FLIP-168: Speculative Execution for Batch > > Job[1] is introduced and approved recently. > > > > Here, Zhu Zhu and I propose to support speculative execution of sources > as > > one of follow up of FLIP-168. You could find more details in FLIP-245[2]. > > Looking forward to your feedback. > > > > Best, > > Jing Zhang > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job#FLIP168:SpeculativeExecutionforBatchJob-NointegrationwithFlink'swebUI > > > > [2] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job > > >