Hi everyone, Thanks a lot for all the feedback! I will open a vote for it since there is no more concern.
Best, Jing Zhang Jing Zhang <beyond1...@gmail.com> 于2022年7月5日周二 11:31写道: > Hi ZhuZhu, Jiangjie, > > Thanks a lot for your feedback. > > I agree that it's better to support most existing events. > I have updated the FLIP to cover how to deal with the > RequestSplitEvent/SourceEventWrapper/ReaderRegistrationEvent. > > The ReportedWatermarkEvent is only used in watermark alignment. > Watermark alignment is a new feature, and still evolving. > Moreover, most users will not use this feature in batch cases. > So I agree not to support it in speculative execution. > > Best, > Jing Zhang > > Becket Qin <becket....@gmail.com> 于2022年7月5日周二 08:38写道: > >> Yes, that sounds reasonable to me. That said, supporting custom events >> might still be preferable if that does not complicate the design too much. >> It would be good to avoid having a tricky feature availability matrix when >> we add new features to the project. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> >> >> On Mon, Jul 4, 2022 at 5:09 PM Zhu Zhu <reed...@gmail.com> wrote: >> >>> Hi Jiangjie, >>> >>> Yes you are that the goals of watermark alignment and speculative >>> execution do not conflict. For the example you gave, we can make it >>> work by only aligning watermarks for executions that are pipelined >>> connected (i.e. in the same execution attempt level pipelined region). >>> Even not considering speculative execution, it looks like to be a >>> possible improvement of watermark alignment, for streaming jobs that >>> contains embarrassingly parallel job vertices, so that a slow task >>> does not cause unconnected tasks to be throttled. >>> >>> At the moment, given that it is not needed yet and to avoid further >>> complicating things, I think it's fine to not support watermark >>> alignment in speculative execution cases. >>> >>> WDYT? >>> >>> Thanks, >>> Zhu >>> >>> Becket Qin <becket....@gmail.com> 于2022年7月4日周一 16:15写道: >>> > >>> > Hi Zhu, >>> > >>> > I agree that if we are talking about a single execution region with >>> > blocking shuffle, watermark alignment may not be that helpful as the >>> > subtasks are running independently of each other. >>> > >>> > That said, I don't think watermark alignment and speculative execution >>> > necessarily conflict with each other. The idea of watermark alignment >>> is to >>> > make sure the jobs run efficiently, regardless of whether or why the >>> job >>> > has performance issues. On the other hand, the purpose of speculative >>> > execution is to find out whether the jobs have performance issues due >>> to >>> > slow tasks, and fix them. >>> > >>> > For example, a job has one task whose watermark is always lagging >>> behind, >>> > therefore it causes the other tasks to be throttled. The speculative >>> > execution identified the slow task and decided to run it in another >>> node, >>> > thus unblocking the other subtasks. >>> > >>> > Thanks, >>> > >>> > Jiangjie (Becket) Qin >>> > >>> > >>> > >>> > On Mon, Jul 4, 2022 at 3:31 PM Zhu Zhu <reed...@gmail.com> wrote: >>> > >>> > > I had another thought and now I think watermark alignment is actually >>> > > conceptually conflicted with speculative execution. >>> > > This is because the idea of watermark alignment is to limit the >>> progress >>> > > of all sources to be around the progress of the slowest source in the >>> > > watermark group. However, speculative execution's goal is to solve >>> the >>> > > slow task problem and it never wants to limit the progress of tasks >>> with >>> > > the progress of the slow task. >>> > > Therefore, I think it's fine to not support watermark alignment. >>> Instead, >>> > > it should throw an error if watermark alignment is enabled in the >>> case >>> > > that speculative execution is enabled. >>> > > >>> > > Thanks, >>> > > Zhu >>> > > >>> > > Zhu Zhu <reed...@gmail.com> 于2022年7月4日周一 14:34写道: >>> > > > >>> > > > Thanks for updating the FLIP! >>> > > > >>> > > > I agree that at the moment users do not need watermark alignment(in >>> > > > which case ReportedWatermarkEvent would happen) in batch cases. >>> > > > However, I think the concept of watermark alignment is not >>> conflicted >>> > > > with speculative execution. It can work with speculative execution >>> with >>> > > > a little extra effort, by sending the WatermarkAlignmentEvent to >>> all >>> > > > the current executions of each subtask. >>> > > > Therefore, I prefer to support watermark alignment in case it will >>> be >>> > > > needed by batch jobs in the future. >>> > > > >>> > > > Thanks, >>> > > > Zhu >>> > > > >>> > > > Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:09写道: >>> > > > > >>> > > > > Hi all, >>> > > > > After an offline discussion with Jiangjie (Becket) Qin, Guowei, >>> Zhuzhu, >>> > > > > I've updated the FLIP-245[1] to including: >>> > > > > 1. Complete the fault-tolerant processing flow. >>> > > > > 2. Support for SourceEvent because it's useful for some >>> user-defined >>> > > > > sources which have a custom event protocol between reader and >>> > > enumerator. >>> > > > > 3. How to handle ReportedWatermarkEvent/ReaderRegistrationEvent >>> > > messages. >>> > > > > >>> > > > > Please review the FLIP-245[1] again, looking forward to your >>> feedback. >>> > > > > >>> > > > > [1] >>> > > > > >>> > > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job >>> > > > > >>> > > > > Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:02写道: >>> > > > > >>> > > > > > Hi Guowei, >>> > > > > > Thanks a lot for your feedback. >>> > > > > > Your advices are really helpful. I've updated the FLIP-245[1] >>> to >>> > > includes >>> > > > > > these parts. >>> > > > > > > First of all, please complete the fault-tolerant processing >>> flow >>> > > in the >>> > > > > > FLIP. >>> > > > > > >>> > > > > > After an execution is created and a source operator becomes >>> ready to >>> > > > > > receive events, subtaskReady is called, >>> > > SpeculativeSourceCoordinator would >>> > > > > > store the mapping of SubtaskGateway to execution attempt in >>> > > > > > SpeculativeSourceCoordinatorContext. >>> > > > > > Then source operator registers the reader to the coordinator, >>> > > > > > SpeculativeSourceCoordinator would store the mapping of source >>> > > reader to >>> > > > > > execution attempt in SpeculativeSourceCoordinatorContext. >>> > > > > > If the execution goes through a failover, subtaskFailed is >>> called, >>> > > > > > SpeculativeSourceCoordinator would clear information about this >>> > > execution, >>> > > > > > including source readers and SubtaskGateway. >>> > > > > > If all the current executions of the execution vertex are >>> failed, >>> > > > > > subtaskReset would be called, SpeculativeSourceCoordinator >>> would >>> > > clear all >>> > > > > > information about this executions and adding splits back to >>> the split >>> > > > > > enumerator of source. >>> > > > > > >>> > > > > > > Secondly the FLIP only says that user-defined events are not >>> > > supported, >>> > > > > > but it does not explain how to deal with the existing >>> > > > > > ReportedWatermarkEvent/ReaderRegistrationEvent. >>> > > > > > >>> > > > > > For ReaderRegistrationEvent: >>> > > > > > When source operator registers the reader to the coordinator, >>> > > > > > SpeculativeSourceCoordinator would also store the mapping of >>> source >>> > > reader >>> > > > > > to execution attempt in SpeculativeSourceCoordinatorContext. >>> Like >>> > > > > > SourceCoordinator, it also needs to call >>> SplitEnumerator#addReader >>> > > to add a >>> > > > > > new source reader. >>> > > > > > Besides, in order to distinguish source reader between >>> different >>> > > > > > execution, 'ReaderInfo' need to add 'attemptId' field. >>> > > > > > >>> > > > > > For ReportedWatermarkEvent: >>> > > > > > ReportedWatermarkEvent is introduced in 1.15 which is used to >>> support >>> > > > > > watermark alignment in streaming mode. >>> > > > > > Speculative execution is only enabled in batch mode. Therefore, >>> > > > > > SpeculativeSourceCoordinator would thrown an exception if >>> receive a >>> > > > > > ReportedWatermarkEvent. >>> > > > > > >>> > > > > > Besides, after offline discussion with Jiangjie (Becket) Qin, >>> I've >>> > > add >>> > > > > > support for SourceEvent because it's useful for some >>> user-defined >>> > > sources >>> > > > > > which have a custom event protocol between reader and >>> enumerator. >>> > > > > > >>> > > > > > Best, >>> > > > > > Jing Zhang >>> > > > > > >>> > > > > > [1] >>> > > > > > >>> > > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job >>> > > > > > >>> > > > > > Guowei Ma <guowei....@gmail.com> 于2022年6月29日周三 18:06写道: >>> > > > > > >>> > > > > >> Hi, Jing >>> > > > > >> >>> > > > > >> Thanks a lot for writing this FLIP, which is very useful to >>> Batch >>> > > users. >>> > > > > >> Currently I have only two small questions: >>> > > > > >> >>> > > > > >> 1. First of all, please complete the fault-tolerant >>> processing flow >>> > > in the >>> > > > > >> FLIP. (Maybe you've already considered it, but it's better to >>> > > explicitly >>> > > > > >> give the specific solution in the FLIP.) >>> > > > > >> For example, how to handle Source `Reader` in case of error. >>> As far >>> > > as I >>> > > > > >> know, once the reader is unavailable, it will result in the >>> > > inability to >>> > > > > >> allocate a new split, which may be unacceptable in the case of >>> > > speculative >>> > > > > >> execution. >>> > > > > >> >>> > > > > >> 2. Secondly the FLIP only says that user-defined events are >>> not >>> > > supported, >>> > > > > >> but it does not explain how to deal with the existing >>> > > > > >> ReportedWatermarkEvent/ReaderRegistrationEvent. After all, in >>> the >>> > > case of >>> > > > > >> speculative execution, there may be two "same" tasks being >>> executed >>> > > at the >>> > > > > >> same time. If these events are repeated, whether they really >>> have no >>> > > > > >> effect >>> > > > > >> on the execution of the job, there is still a clear >>> evaluation. >>> > > > > >> >>> > > > > >> Best, >>> > > > > >> Guowei >>> > > > > >> >>> > > > > >> >>> > > > > >> On Fri, Jun 24, 2022 at 5:41 PM Jing Zhang < >>> beyond1...@gmail.com> >>> > > wrote: >>> > > > > >> >>> > > > > >> > Hi all, >>> > > > > >> > One major problem of Flink batch jobs is slow tasks running >>> on >>> > > hot/bad >>> > > > > >> > nodes, resulting in very long execution time. >>> > > > > >> > >>> > > > > >> > In order to solve this problem, FLIP-168: Speculative >>> Execution >>> > > for >>> > > > > >> Batch >>> > > > > >> > Job[1] is introduced and approved recently. >>> > > > > >> > >>> > > > > >> > Here, Zhu Zhu and I propose to support speculative >>> execution of >>> > > sources >>> > > > > >> as >>> > > > > >> > one of follow up of FLIP-168. You could find more details in >>> > > > > >> FLIP-245[2]. >>> > > > > >> > Looking forward to your feedback. >>> > > > > >> > >>> > > > > >> > Best, >>> > > > > >> > Jing Zhang >>> > > > > >> > >>> > > > > >> > [1] >>> > > > > >> > >>> > > > > >> > >>> > > > > >> >>> > > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job#FLIP168:SpeculativeExecutionforBatchJob-NointegrationwithFlink'swebUI >>> > > > > >> > >>> > > > > >> > [2] >>> > > > > >> > >>> > > > > >> > >>> > > > > >> >>> > > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job >>> > > > > >> > >>> > > > > >> >>> > > > > > >>> > > >>> >>