Hi Biao, While I rather wouldn't add new features to (to-be) deprecated features, I would be +0 for this.
Best regards, Martijn Op do 12 jan. 2023 om 08:42 schreef Biao Liu <mmyy1...@gmail.com>: > Hi Martijn, > > Thanks for your feedback! > > Yes, we propose to support speculative execution for SinkFunction. > 1. From the perspective of compatibility, SinkFunction is the most original > Sink implementation.There are lots of implementations based on > SinkFunction, not only in Flink official codebase but also in user's > private codebase. It's a more serious issue than Sink V1. Of course we hope > users could migrate the legacy implementation to the new interface. However > migration is always hard. > 2. From the perspective of cost, we don't need to do much extra work to > support speculative execution for SinkFunction. All we need to do is check > whether the SinkFunction implementation > inherits SupportsConcurrentExecutionAttempts or not. The other parts of > work are the same with Sink V2. > > To summarize, it's cheap to support speculative execution for SinkFunction. > And it may allow more existing scenarios to run with speculative execution. > > Thanks, > Biao /'bɪ.aʊ/ > > > > On Wed, 11 Jan 2023 at 21:22, Martijn Visser <martijnvis...@apache.org> > wrote: > > > Hi Biao, > > > > Apologies for the late jumping in. My only question is about > SinkFunction, > > does this imply that we want to add support for this to the SinkFunction? > > If so, I would not be in favour of that since we would like to deprecate > (I > > actually thought that was already the case) the SinkFunction in favour of > > SinkV2. > > > > Besides that, I have no other comments. > > > > Best regards, > > > > Martijn > > > > On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang <beyond1...@gmail.com> wrote: > > > > > Hi Biao, > > > > > > Thanks for explanation. > > > > > > +1 for the proposal. > > > > > > Best, > > > Jing Zhang > > > > > > Lijie Wang <wangdachui9...@gmail.com> 于2023年1月4日周三 12:11写道: > > > > > > > Hi Biao, > > > > > > > > Thanks for the explanation of how SinkV2 knows the right subtask > > > > attempt. I have no more questions, +1 for the proposal. > > > > > > > > Best, > > > > Lijie > > > > > > > > Biao Liu <mmyy1...@gmail.com> 于2022年12月28日周三 17:22写道: > > > > > > > > > Thanks for all your feedback! > > > > > > > > > > To @Yuxia, > > > > > > > > > > > What the sink expect to do to isolate data produced by > speculative > > > > > > executions? IIUC, if the taks failover, it also generate a new > > > > attempt. > > > > > > Does it make difference in isolating data produced? > > > > > > > > > > > > > > > Yes there is something different from the task failover scenario. > The > > > > > attempt number is more necessary for speculative execution than > > > failover. > > > > > Because there can be only one subtask instance running at the same > > time > > > > in > > > > > the failover scenario. > > > > > > > > > > Let's take FileSystemOutputFormat as an example. For the failover > > > > scenario, > > > > > the temporary directory to store produced data can be something > like > > > > > "$root_dir/task-$taskNumber/". At the initialization phase, subtask > > > > deletes > > > > > and re-creates the temporary directory. > > > > > > > > > > However in the speculative execution scenario, it does not work > > because > > > > > there might be several subtasks running at the same time. These > > > subtasks > > > > > might delete, re-create and write the same temporary directory at > the > > > > > same time. The correct temporary directory should be like > > > > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's > > necessary > > > to > > > > > expose the attempt number to the Sink implementation to do the data > > > > > isolation. > > > > > > > > > > > > > > > To @Lijie, > > > > > > > > > > > I have a question about this: does SinkV2 need to do the same > > thing? > > > > > > > > > > > > > > > Actually, yes. > > > > > > > > > > Should we/users do it in the committer? If yes, how does the > commiter > > > > know > > > > > > which one is the right subtask attempt? > > > > > > > > > > > > > > > Yes, we/users should do it in the committer. > > > > > > > > > > In the current design, the Committer of Sink V2 should get the > "which > > > one > > > > > is the right subtask attempt" information from the "committable > > data'' > > > > > produced by SinkWriter. Let's take the FileSink as example, the > > > > > "committable data" sent to the Committer contains the full path of > > the > > > > > files produced by SinkWriter. Users could also pass the attempt > > number > > > > > through "committable data" from SinkWriter to Committer. > > > > > > > > > > In the "Rejected Alternatives -> Introduce a way to clean leaked > data > > > of > > > > > Sink V2" section of the FLIP document, we discussed some of the > > reasons > > > > > that we didn't provide the API like OutputFormat. > > > > > > > > > > To @Jing Zhang > > > > > > > > > > I have a question about this: Speculative execution of Committer > will > > > be > > > > > > disabled. > > > > > > > > > > I agree with your point and I saw the similar requirements to > disable > > > > > speculative > > > > > > execution for specified operators. > > > > > > > > > > However the requirement is not supported currently. I think there > > > > > should be some > > > > > > place to describe how to support it. > > > > > > > > > > > > > > > In this FLIP design, the speculative execution of Committer of Sink > > V2 > > > > will > > > > > be disabled by Flink. It's not an optional operation. Users can not > > > > change > > > > > it. > > > > > And as you said, "disable speculative execution for specified > > > operators" > > > > is > > > > > not supported in the FLIP. Because it's a bit out of scope: "Sink > > > > Supports > > > > > Speculative Execution For Batch Job". I think it's better to start > > > > another > > > > > FLIP to discuss it. "Fine-grained control of enabling speculative > > > > execution > > > > > for operators" can be the title of that FLIP. And we can discuss > > there > > > > how > > > > > to enable or disable speculative execution for specified operators > > > > > including Committer and pre/post-committer of Sink V2. > > > > > > > > > > What do you think? > > > > > > > > > > Thanks, > > > > > Biao /'bɪ.aʊ/ > > > > > > > > > > > > > > > > > > > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang <beyond1...@gmail.com> > > wrote: > > > > > > > > > > > Hi Biao, > > > > > > > > > > > > Thanks for driving this FLIP. It's meaningful to support > > speculative > > > > > > execution > > > > > > of sinks is important. > > > > > > > > > > > > I have a question about this: Speculative execution of Committer > > will > > > > be > > > > > > disabled. > > > > > > > > > > > > I agree with your point and I saw the similar requirements to > > disable > > > > > > speculative execution for specified operators. > > > > > > > > > > > > However the requirement is not supported currently. I think there > > > > should > > > > > be > > > > > > some place to describe how to support it. > > > > > > > > > > > > Best, > > > > > > Jing Zhang > > > > > > > > > > > > Lijie Wang <wangdachui9...@gmail.com> 于2022年12月27日周二 18:51写道: > > > > > > > > > > > > > Hi Biao, > > > > > > > > > > > > > > Thanks for driving this FLIP. > > > > > > > In this FLIP, it introduces "int getFinishedAttempt(int > > > > subtaskIndex)" > > > > > > for > > > > > > > OutputFormat to know which subtask attempt is the one marked as > > > > > finished > > > > > > by > > > > > > > JM and commit the right data. > > > > > > > I have a question about this: does SinkV2 need to do the same > > > thing? > > > > > > Should > > > > > > > we/users do it in the committer? If yes, how does the commiter > > know > > > > > which > > > > > > > one is the right subtask attempt? > > > > > > > > > > > > > > Best, > > > > > > > Lijie > > > > > > > > > > > > > > yuxia <luoyu...@alumni.sjtu.edu.cn> 于2022年12月27日周二 10:01写道: > > > > > > > > > > > > > > > HI, Biao. > > > > > > > > Thanks for driving this FLIP. > > > > > > > > After quick look of this FLIP, I have a question about > "expose > > > the > > > > > > > attempt > > > > > > > > number which can be used to isolate data produced by > > speculative > > > > > > > executions > > > > > > > > with the same subtask id". > > > > > > > > What the sink expect to do to isolate data produced by > > > speculative > > > > > > > > executions? IIUC, if the taks failover, it also generate a > new > > > > > > attempt. > > > > > > > > Does it make difference in isolating data produced? > > > > > > > > > > > > > > > > Best regards, > > > > > > > > Yuxia > > > > > > > > > > > > > > > > ----- 原始邮件 ----- > > > > > > > > 发件人: "Biao Liu" <mmyy1...@gmail.com> > > > > > > > > 收件人: "dev" <dev@flink.apache.org> > > > > > > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21 > > > > > > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution > For > > > > Batch > > > > > > Job > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > I would like to start a discussion about making Sink support > > > > > > speculative > > > > > > > > execution for batch jobs. This proposal is a follow up of > > > > "FLIP-168: > > > > > > > > Speculative Execution For Batch Job"[1]. Speculative > execution > > is > > > > > very > > > > > > > > meaningful for batch jobs. And it would be more complete > after > > > > > > supporting > > > > > > > > speculative execution of Sink. Please find more details in > the > > > FLIP > > > > > > > > document > > > > > > > > [2]. > > > > > > > > > > > > > > > > Looking forward to your feedback. > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job > > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Biao /'bɪ.aʊ/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >