Thanks for updating the FLIP! I agree that at the moment users do not need watermark alignment(in which case ReportedWatermarkEvent would happen) in batch cases. However, I think the concept of watermark alignment is not conflicted with speculative execution. It can work with speculative execution with a little extra effort, by sending the WatermarkAlignmentEvent to all the current executions of each subtask. Therefore, I prefer to support watermark alignment in case it will be needed by batch jobs in the future.
Thanks, Zhu Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:09写道: > > Hi all, > After an offline discussion with Jiangjie (Becket) Qin, Guowei, Zhuzhu, > I've updated the FLIP-245[1] to including: > 1. Complete the fault-tolerant processing flow. > 2. Support for SourceEvent because it's useful for some user-defined > sources which have a custom event protocol between reader and enumerator. > 3. How to handle ReportedWatermarkEvent/ReaderRegistrationEvent messages. > > Please review the FLIP-245[1] again, looking forward to your feedback. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job > > Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:02写道: > > > Hi Guowei, > > Thanks a lot for your feedback. > > Your advices are really helpful. I've updated the FLIP-245[1] to includes > > these parts. > > > First of all, please complete the fault-tolerant processing flow in the > > FLIP. > > > > After an execution is created and a source operator becomes ready to > > receive events, subtaskReady is called, SpeculativeSourceCoordinator would > > store the mapping of SubtaskGateway to execution attempt in > > SpeculativeSourceCoordinatorContext. > > Then source operator registers the reader to the coordinator, > > SpeculativeSourceCoordinator would store the mapping of source reader to > > execution attempt in SpeculativeSourceCoordinatorContext. > > If the execution goes through a failover, subtaskFailed is called, > > SpeculativeSourceCoordinator would clear information about this execution, > > including source readers and SubtaskGateway. > > If all the current executions of the execution vertex are failed, > > subtaskReset would be called, SpeculativeSourceCoordinator would clear all > > information about this executions and adding splits back to the split > > enumerator of source. > > > > > Secondly the FLIP only says that user-defined events are not supported, > > but it does not explain how to deal with the existing > > ReportedWatermarkEvent/ReaderRegistrationEvent. > > > > For ReaderRegistrationEvent: > > When source operator registers the reader to the coordinator, > > SpeculativeSourceCoordinator would also store the mapping of source reader > > to execution attempt in SpeculativeSourceCoordinatorContext. Like > > SourceCoordinator, it also needs to call SplitEnumerator#addReader to add a > > new source reader. > > Besides, in order to distinguish source reader between different > > execution, 'ReaderInfo' need to add 'attemptId' field. > > > > For ReportedWatermarkEvent: > > ReportedWatermarkEvent is introduced in 1.15 which is used to support > > watermark alignment in streaming mode. > > Speculative execution is only enabled in batch mode. Therefore, > > SpeculativeSourceCoordinator would thrown an exception if receive a > > ReportedWatermarkEvent. > > > > Besides, after offline discussion with Jiangjie (Becket) Qin, I've add > > support for SourceEvent because it's useful for some user-defined sources > > which have a custom event protocol between reader and enumerator. > > > > Best, > > Jing Zhang > > > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job > > > > Guowei Ma <guowei....@gmail.com> 于2022年6月29日周三 18:06写道: > > > >> Hi, Jing > >> > >> Thanks a lot for writing this FLIP, which is very useful to Batch users. > >> Currently I have only two small questions: > >> > >> 1. First of all, please complete the fault-tolerant processing flow in the > >> FLIP. (Maybe you've already considered it, but it's better to explicitly > >> give the specific solution in the FLIP.) > >> For example, how to handle Source `Reader` in case of error. As far as I > >> know, once the reader is unavailable, it will result in the inability to > >> allocate a new split, which may be unacceptable in the case of speculative > >> execution. > >> > >> 2. Secondly the FLIP only says that user-defined events are not supported, > >> but it does not explain how to deal with the existing > >> ReportedWatermarkEvent/ReaderRegistrationEvent. After all, in the case of > >> speculative execution, there may be two "same" tasks being executed at the > >> same time. If these events are repeated, whether they really have no > >> effect > >> on the execution of the job, there is still a clear evaluation. > >> > >> Best, > >> Guowei > >> > >> > >> On Fri, Jun 24, 2022 at 5:41 PM Jing Zhang <beyond1...@gmail.com> wrote: > >> > >> > Hi all, > >> > One major problem of Flink batch jobs is slow tasks running on hot/bad > >> > nodes, resulting in very long execution time. > >> > > >> > In order to solve this problem, FLIP-168: Speculative Execution for > >> Batch > >> > Job[1] is introduced and approved recently. > >> > > >> > Here, Zhu Zhu and I propose to support speculative execution of sources > >> as > >> > one of follow up of FLIP-168. You could find more details in > >> FLIP-245[2]. > >> > Looking forward to your feedback. > >> > > >> > Best, > >> > Jing Zhang > >> > > >> > [1] > >> > > >> > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job#FLIP168:SpeculativeExecutionforBatchJob-NointegrationwithFlink'swebUI > >> > > >> > [2] > >> > > >> > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job > >> > > >> > >