Hi all, After an offline discussion with Jiangjie (Becket) Qin, Guowei, Zhuzhu, I've updated the FLIP-245[1] to including: 1. Complete the fault-tolerant processing flow. 2. Support for SourceEvent because it's useful for some user-defined sources which have a custom event protocol between reader and enumerator. 3. How to handle ReportedWatermarkEvent/ReaderRegistrationEvent messages.
Please review the FLIP-245[1] again, looking forward to your feedback. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:02写道: > Hi Guowei, > Thanks a lot for your feedback. > Your advices are really helpful. I've updated the FLIP-245[1] to includes > these parts. > > First of all, please complete the fault-tolerant processing flow in the > FLIP. > > After an execution is created and a source operator becomes ready to > receive events, subtaskReady is called, SpeculativeSourceCoordinator would > store the mapping of SubtaskGateway to execution attempt in > SpeculativeSourceCoordinatorContext. > Then source operator registers the reader to the coordinator, > SpeculativeSourceCoordinator would store the mapping of source reader to > execution attempt in SpeculativeSourceCoordinatorContext. > If the execution goes through a failover, subtaskFailed is called, > SpeculativeSourceCoordinator would clear information about this execution, > including source readers and SubtaskGateway. > If all the current executions of the execution vertex are failed, > subtaskReset would be called, SpeculativeSourceCoordinator would clear all > information about this executions and adding splits back to the split > enumerator of source. > > > Secondly the FLIP only says that user-defined events are not supported, > but it does not explain how to deal with the existing > ReportedWatermarkEvent/ReaderRegistrationEvent. > > For ReaderRegistrationEvent: > When source operator registers the reader to the coordinator, > SpeculativeSourceCoordinator would also store the mapping of source reader > to execution attempt in SpeculativeSourceCoordinatorContext. Like > SourceCoordinator, it also needs to call SplitEnumerator#addReader to add a > new source reader. > Besides, in order to distinguish source reader between different > execution, 'ReaderInfo' need to add 'attemptId' field. > > For ReportedWatermarkEvent: > ReportedWatermarkEvent is introduced in 1.15 which is used to support > watermark alignment in streaming mode. > Speculative execution is only enabled in batch mode. Therefore, > SpeculativeSourceCoordinator would thrown an exception if receive a > ReportedWatermarkEvent. > > Besides, after offline discussion with Jiangjie (Becket) Qin, I've add > support for SourceEvent because it's useful for some user-defined sources > which have a custom event protocol between reader and enumerator. > > Best, > Jing Zhang > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job > > Guowei Ma <guowei....@gmail.com> 于2022年6月29日周三 18:06写道: > >> Hi, Jing >> >> Thanks a lot for writing this FLIP, which is very useful to Batch users. >> Currently I have only two small questions: >> >> 1. First of all, please complete the fault-tolerant processing flow in the >> FLIP. (Maybe you've already considered it, but it's better to explicitly >> give the specific solution in the FLIP.) >> For example, how to handle Source `Reader` in case of error. As far as I >> know, once the reader is unavailable, it will result in the inability to >> allocate a new split, which may be unacceptable in the case of speculative >> execution. >> >> 2. Secondly the FLIP only says that user-defined events are not supported, >> but it does not explain how to deal with the existing >> ReportedWatermarkEvent/ReaderRegistrationEvent. After all, in the case of >> speculative execution, there may be two "same" tasks being executed at the >> same time. If these events are repeated, whether they really have no >> effect >> on the execution of the job, there is still a clear evaluation. >> >> Best, >> Guowei >> >> >> On Fri, Jun 24, 2022 at 5:41 PM Jing Zhang <beyond1...@gmail.com> wrote: >> >> > Hi all, >> > One major problem of Flink batch jobs is slow tasks running on hot/bad >> > nodes, resulting in very long execution time. >> > >> > In order to solve this problem, FLIP-168: Speculative Execution for >> Batch >> > Job[1] is introduced and approved recently. >> > >> > Here, Zhu Zhu and I propose to support speculative execution of sources >> as >> > one of follow up of FLIP-168. You could find more details in >> FLIP-245[2]. >> > Looking forward to your feedback. >> > >> > Best, >> > Jing Zhang >> > >> > [1] >> > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job#FLIP168:SpeculativeExecutionforBatchJob-NointegrationwithFlink'swebUI >> > >> > [2] >> > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job >> > >> >