Thanks for updating the FLIP!

I agree that at the moment users do not need watermark alignment(in
which case ReportedWatermarkEvent would happen) in batch cases.
However, I think the concept of watermark alignment is not conflicted
with speculative execution. It can work with speculative execution with
a little extra effort, by sending the WatermarkAlignmentEvent to all
the current executions of each subtask.
Therefore, I prefer to support watermark alignment in case it will be
needed by batch jobs in the future.

Thanks,
Zhu

Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:09写道:
>
> Hi all,
> After an offline discussion with Jiangjie (Becket) Qin, Guowei, Zhuzhu,
> I've updated the FLIP-245[1] to including:
> 1. Complete the fault-tolerant processing flow.
> 2. Support for SourceEvent because it's useful for some user-defined
> sources which have a custom event protocol between reader and enumerator.
> 3. How to handle ReportedWatermarkEvent/ReaderRegistrationEvent messages.
>
> Please review the FLIP-245[1] again, looking forward to your feedback.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
>
> Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:02写道:
>
> > Hi Guowei,
> > Thanks a lot for your feedback.
> > Your advices are really helpful.  I've updated the FLIP-245[1] to includes
> > these parts.
> > > First of all, please complete the fault-tolerant processing flow in the
> > FLIP.
> >
> > After an execution is created and a source operator becomes ready to
> > receive events,  subtaskReady is called, SpeculativeSourceCoordinator would
> > store the mapping of SubtaskGateway to execution attempt in
> > SpeculativeSourceCoordinatorContext.
> > Then source operator registers the reader to the coordinator,
> > SpeculativeSourceCoordinator would store the mapping of source reader to
> > execution attempt in SpeculativeSourceCoordinatorContext.
> > If the execution goes through a failover, subtaskFailed is called,
> > SpeculativeSourceCoordinator would clear information about this execution,
> > including source readers and SubtaskGateway.
> > If all the current executions of the execution vertex are failed,
> > subtaskReset would be called, SpeculativeSourceCoordinator would clear all
> > information about this executions and adding splits back to the split
> > enumerator of source.
> >
> > > Secondly the FLIP only says that user-defined events are not supported,
> > but it does not explain how to deal with the existing
> > ReportedWatermarkEvent/ReaderRegistrationEvent.
> >
> > For ReaderRegistrationEvent:
> > When source operator registers the reader to the coordinator,
> > SpeculativeSourceCoordinator would also store the mapping of source reader
> > to execution attempt in SpeculativeSourceCoordinatorContext. Like
> > SourceCoordinator, it also needs to call SplitEnumerator#addReader to add a
> > new source reader.
> > Besides, in order to distinguish source reader between different
> > execution, 'ReaderInfo' need to add 'attemptId' field.
> >
> > For ReportedWatermarkEvent:
> > ReportedWatermarkEvent is introduced in 1.15 which is used to support
> > watermark alignment in streaming mode.
> > Speculative execution is only enabled in batch mode. Therefore,
> > SpeculativeSourceCoordinator would thrown an exception if receive a
> > ReportedWatermarkEvent.
> >
> > Besides, after offline discussion with Jiangjie (Becket) Qin, I've add
> > support for SourceEvent because it's useful for some user-defined sources
> > which have a custom event protocol between reader and enumerator.
> >
> > Best,
> > Jing Zhang
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> >
> > Guowei Ma <guowei....@gmail.com> 于2022年6月29日周三 18:06写道:
> >
> >> Hi, Jing
> >>
> >> Thanks a lot for writing this FLIP, which is very useful to Batch users.
> >> Currently  I have only two small questions:
> >>
> >> 1. First of all, please complete the fault-tolerant processing flow in the
> >> FLIP. (Maybe you've already considered it, but it's better to explicitly
> >> give the specific solution in the FLIP.)
> >> For example, how to handle Source `Reader` in case of error. As far as I
> >> know, once the reader is unavailable, it will result in the inability to
> >> allocate a new split, which may be unacceptable in the case of speculative
> >> execution.
> >>
> >> 2. Secondly the FLIP only says that user-defined events are not supported,
> >> but it does not explain how to deal with the existing
> >> ReportedWatermarkEvent/ReaderRegistrationEvent. After all, in the case of
> >> speculative execution, there may be two "same" tasks being executed at the
> >> same time. If these events are repeated, whether they really have no
> >> effect
> >> on the execution of the job, there is still a clear evaluation.
> >>
> >> Best,
> >> Guowei
> >>
> >>
> >> On Fri, Jun 24, 2022 at 5:41 PM Jing Zhang <beyond1...@gmail.com> wrote:
> >>
> >> > Hi all,
> >> > One major problem of Flink batch jobs is slow tasks running on hot/bad
> >> > nodes, resulting in very long execution time.
> >> >
> >> > In order to solve this problem, FLIP-168: Speculative Execution for
> >> Batch
> >> > Job[1] is introduced and approved recently.
> >> >
> >> > Here, Zhu Zhu and I propose to support speculative execution of sources
> >> as
> >> > one of follow up of FLIP-168. You could find more details in
> >> FLIP-245[2].
> >> > Looking forward to your feedback.
> >> >
> >> > Best,
> >> > Jing Zhang
> >> >
> >> > [1]
> >> >
> >> >
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job#FLIP168:SpeculativeExecutionforBatchJob-NointegrationwithFlink'swebUI
> >> >
> >> > [2]
> >> >
> >> >
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> >> >
> >>
> >

Reply via email to