Hi Biao,

While I rather wouldn't add new features to (to-be) deprecated features, I
would be +0 for this.

Best regards,

Martijn

Op do 12 jan. 2023 om 08:42 schreef Biao Liu <mmyy1...@gmail.com>:

> Hi Martijn,
>
> Thanks for your feedback!
>
> Yes, we propose to support speculative execution for SinkFunction.
> 1. From the perspective of compatibility, SinkFunction is the most original
> Sink implementation.There are lots of implementations based on
> SinkFunction, not only in Flink official codebase but also in user's
> private codebase. It's a more serious issue than Sink V1. Of course we hope
> users could migrate the legacy implementation to the new interface. However
> migration is always hard.
> 2. From the perspective of cost, we don't need to do much extra work to
> support speculative execution for SinkFunction. All we need to do is check
> whether the SinkFunction implementation
> inherits SupportsConcurrentExecutionAttempts or not. The other parts of
> work are the same with Sink V2.
>
> To summarize, it's cheap to support speculative execution for SinkFunction.
> And it may allow more existing scenarios to run with speculative execution.
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Wed, 11 Jan 2023 at 21:22, Martijn Visser <martijnvis...@apache.org>
> wrote:
>
> > Hi Biao,
> >
> > Apologies for the late jumping in. My only question is about
> SinkFunction,
> > does this imply that we want to add support for this to the SinkFunction?
> > If so, I would not be in favour of that since we would like to deprecate
> (I
> > actually thought that was already the case) the SinkFunction in favour of
> > SinkV2.
> >
> > Besides that, I have no other comments.
> >
> > Best regards,
> >
> > Martijn
> >
> > On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang <beyond1...@gmail.com> wrote:
> >
> > > Hi Biao,
> > >
> > > Thanks for explanation.
> > >
> > > +1 for the proposal.
> > >
> > > Best,
> > > Jing Zhang
> > >
> > > Lijie Wang <wangdachui9...@gmail.com> 于2023年1月4日周三 12:11写道:
> > >
> > > > Hi Biao,
> > > >
> > > > Thanks for the explanation of how SinkV2  knows the right subtask
> > > > attempt. I have no more questions, +1 for the proposal.
> > > >
> > > > Best,
> > > > Lijie
> > > >
> > > > Biao Liu <mmyy1...@gmail.com> 于2022年12月28日周三 17:22写道:
> > > >
> > > > > Thanks for all your feedback!
> > > > >
> > > > > To @Yuxia,
> > > > >
> > > > > > What the sink expect to do to isolate data produced by
> speculative
> > > > > > executions?  IIUC, if the taks failover, it also generate a new
> > > > attempt.
> > > > > > Does it make difference in isolating data produced?
> > > > >
> > > > >
> > > > > Yes there is something different from the task failover scenario.
> The
> > > > > attempt number is more necessary for speculative execution than
> > > failover.
> > > > > Because there can be only one subtask instance running at the same
> > time
> > > > in
> > > > > the failover scenario.
> > > > >
> > > > > Let's take FileSystemOutputFormat as an example. For the failover
> > > > scenario,
> > > > > the temporary directory to store produced data can be something
> like
> > > > > "$root_dir/task-$taskNumber/". At the initialization phase, subtask
> > > > deletes
> > > > > and re-creates the temporary directory.
> > > > >
> > > > > However in the speculative execution scenario, it does not work
> > because
> > > > > there might be several subtasks running at the same time. These
> > > subtasks
> > > > > might delete, re-create and write the same temporary directory at
> the
> > > > > same time. The correct temporary directory should be like
> > > > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's
> > necessary
> > > to
> > > > > expose the attempt number to the Sink implementation to do the data
> > > > > isolation.
> > > > >
> > > > >
> > > > > To @Lijie,
> > > > >
> > > > > > I have a question about this: does SinkV2 need to do the same
> > thing?
> > > > >
> > > > >
> > > > > Actually, yes.
> > > > >
> > > > > Should we/users do it in the committer? If yes, how does the
> commiter
> > > > know
> > > > > > which one is the right subtask attempt?
> > > > >
> > > > >
> > > > > Yes, we/users should do it in the committer.
> > > > >
> > > > > In the current design, the Committer of Sink V2 should get the
> "which
> > > one
> > > > > is the right subtask attempt" information from the "committable
> > data''
> > > > > produced by SinkWriter. Let's take the FileSink as example, the
> > > > > "committable data" sent to the Committer contains the full path of
> > the
> > > > > files produced by SinkWriter. Users could also pass the attempt
> > number
> > > > > through "committable data" from SinkWriter to Committer.
> > > > >
> > > > > In the "Rejected Alternatives -> Introduce a way to clean leaked
> data
> > > of
> > > > > Sink V2" section of the FLIP document, we discussed some of the
> > reasons
> > > > > that we didn't provide the API like OutputFormat.
> > > > >
> > > > > To @Jing Zhang
> > > > >
> > > > > I have a question about this: Speculative execution of Committer
> will
> > > be
> > > > > > disabled.
> > > > >
> > > > > I agree with your point and I saw the similar requirements to
> disable
> > > > > speculative
> > > > > > execution for specified operators.
> > > > >
> > > > > However the requirement is not supported currently. I think there
> > > > > should be some
> > > > > > place to describe how to support it.
> > > > >
> > > > >
> > > > > In this FLIP design, the speculative execution of Committer of Sink
> > V2
> > > > will
> > > > > be disabled by Flink. It's not an optional operation. Users can not
> > > > change
> > > > > it.
> > > > > And as you said, "disable speculative execution for specified
> > > operators"
> > > > is
> > > > > not supported in the FLIP. Because it's a bit out of scope: "Sink
> > > > Supports
> > > > > Speculative Execution For Batch Job". I think it's better to start
> > > > another
> > > > > FLIP to discuss it. "Fine-grained control of enabling speculative
> > > > execution
> > > > > for operators" can be the title of that FLIP. And we can discuss
> > there
> > > > how
> > > > > to enable or disable speculative execution for specified operators
> > > > > including Committer and pre/post-committer of Sink V2.
> > > > >
> > > > > What do you think?
> > > > >
> > > > > Thanks,
> > > > > Biao /'bɪ.aʊ/
> > > > >
> > > > >
> > > > >
> > > > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang <beyond1...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi Biao,
> > > > > >
> > > > > > Thanks for driving this FLIP. It's meaningful to support
> > speculative
> > > > > > execution
> > > > > > of sinks is important.
> > > > > >
> > > > > > I have a question about this: Speculative execution of Committer
> > will
> > > > be
> > > > > > disabled.
> > > > > >
> > > > > > I agree with your point and I saw the similar requirements to
> > disable
> > > > > > speculative execution for specified operators.
> > > > > >
> > > > > > However the requirement is not supported currently. I think there
> > > > should
> > > > > be
> > > > > > some place to describe how to support it.
> > > > > >
> > > > > > Best,
> > > > > > Jing Zhang
> > > > > >
> > > > > > Lijie Wang <wangdachui9...@gmail.com> 于2022年12月27日周二 18:51写道:
> > > > > >
> > > > > > > Hi Biao,
> > > > > > >
> > > > > > > Thanks for driving this FLIP.
> > > > > > > In this FLIP, it introduces "int getFinishedAttempt(int
> > > > subtaskIndex)"
> > > > > > for
> > > > > > > OutputFormat to know which subtask attempt is the one marked as
> > > > > finished
> > > > > > by
> > > > > > > JM and commit the right data.
> > > > > > > I have a question about this: does SinkV2 need to do the same
> > > thing?
> > > > > > Should
> > > > > > > we/users do it in the committer? If yes, how does the commiter
> > know
> > > > > which
> > > > > > > one is the right subtask attempt?
> > > > > > >
> > > > > > > Best,
> > > > > > > Lijie
> > > > > > >
> > > > > > > yuxia <luoyu...@alumni.sjtu.edu.cn> 于2022年12月27日周二 10:01写道:
> > > > > > >
> > > > > > > > HI, Biao.
> > > > > > > > Thanks for driving this FLIP.
> > > > > > > > After quick look of this FLIP, I have a question about
> "expose
> > > the
> > > > > > > attempt
> > > > > > > > number which can be used to isolate data produced by
> > speculative
> > > > > > > executions
> > > > > > > > with the same subtask id".
> > > > > > > > What the sink expect to do to isolate data produced by
> > > speculative
> > > > > > > > executions?  IIUC, if the taks failover, it also generate a
> new
> > > > > > attempt.
> > > > > > > > Does it make difference in isolating data produced?
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > > Yuxia
> > > > > > > >
> > > > > > > > ----- 原始邮件 -----
> > > > > > > > 发件人: "Biao Liu" <mmyy1...@gmail.com>
> > > > > > > > 收件人: "dev" <dev@flink.apache.org>
> > > > > > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > > > > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution
> For
> > > > Batch
> > > > > > Job
> > > > > > > >
> > > > > > > > Hi everyone,
> > > > > > > >
> > > > > > > > I would like to start a discussion about making Sink support
> > > > > > speculative
> > > > > > > > execution for batch jobs. This proposal is a follow up of
> > > > "FLIP-168:
> > > > > > > > Speculative Execution For Batch Job"[1]. Speculative
> execution
> > is
> > > > > very
> > > > > > > > meaningful for batch jobs. And it would be more complete
> after
> > > > > > supporting
> > > > > > > > speculative execution of Sink. Please find more details in
> the
> > > FLIP
> > > > > > > > document
> > > > > > > > [2].
> > > > > > > >
> > > > > > > > Looking forward to your feedback.
> > > > > > > > [1]
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > > > > > > [2]
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Biao /'bɪ.aʊ/
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to