Thanks Mang for updating! Looks good to me!
Best, Jingsong On Wed, Jun 7, 2023 at 2:31 PM Mang Zhang <zhangma...@163.com> wrote: > > Hi Jingsong, > > >I have some doubts about the `TwoPhaseCatalogTable`. Generally, our > >Flink design places execution in the TableFactory or directly in the > >Catalog, so introducing an executable table makes me feel a bit > >strange. (Spark is this style, but Flink may not be) > On this issue, we introduce the executable logic commit/abort a bit of > strange on CatalogTable. > After an offline discussion with yuxia, I tweaked the FLIP-305[1] scenario. > The new solution is similar to the implementation of SupportsOverwrite, > which introduces the SupportsStaging interface and infers whether > DynamicTableSink supports atomic ctas based on whether it implements the > SupportsStaging interface, > and if so, it will get the StagedTable object from DynamicTableSink. > > For more implementation details, please see the FLIP-305 document. > > This is my poc commits > https://github.com/Tartarus0zm/flink/commit/025b30ad8f1a03e7738e9bb534e6e491c31990fa > > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement > > > -- > > Best regards, > > Mang Zhang > > > > At 2023-05-12 13:02:14, "Jingsong Li" <jingsongl...@gmail.com> wrote: > >Hi Mang, > > > >Thanks for starting this FLIP. > > > >I have some doubts about the `TwoPhaseCatalogTable`. Generally, our > >Flink design places execution in the TableFactory or directly in the > >Catalog, so introducing an executable table makes me feel a bit > >strange. (Spark is this style, but Flink may not be) > > > >And for `TwoPhase`, maybe `StagedXXX` like Spark is better? > > > >Best, > >Jingsong > > > >On Wed, May 10, 2023 at 9:29 PM Mang Zhang <zhangma...@163.com> wrote: > >> > >> Hi Ron, > >> > >> > >> First of all, thank you for your reply! > >> After our offline communication, what you said is mainly in the > >> compilePlan scenario, but currently compilePlanSql does not support non > >> INSERT statements, otherwise it will throw an exception. > >> >Unsupported SQL query! compilePlanSql() only accepts a single SQL > >> >statement of type INSERT > >> But it's a good point that I will seriously consider. > >> Non-atomic CTAS can be supported relatively easily; > >> But atomic CTAS needs more adaptation work, so I'm going to leave it as is > >> and follow up with a separate issue to implement CTAS support for > >> compilePlanSql. > >> > >> > >> > >> > >> > >> > >> -- > >> > >> Best regards, > >> Mang Zhang > >> > >> > >> > >> > >> > >> At 2023-04-23 17:52:07, "liu ron" <ron9....@gmail.com> wrote: > >> >Hi, Mang > >> > > >> >I have a question about the implementation details. For the atomicity > >> >case, > >> >since the target table is not created before the JobGraph is generated, > >> >but > >> >then the target table is required to exist when optimizing plan to > >> >generate > >> >the JobGraph. So how do you solve this problem? > >> > > >> >Best, > >> >Ron > >> > > >> >yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年4月20日周四 09:35写道: > >> > > >> >> Share some insights about the new TwoPhaseCatalogTable proposed after > >> >> offline discussion with Mang. > >> >> The main or important reason is that the TwoPhaseCatalogTable enables > >> >> external connectors to implement theirs own logic for commit / abort. > >> >> In FLIP-218, for atomic CTAS, the Catalog will then just drop the table > >> >> when the job fail. It's not ideal for it's too generic to work well. > >> >> For example, some connectors will need to clean some temporary files in > >> >> abort method. And the actual connector can know the specific logic for > >> >> aborting. > >> >> > >> >> Best regards, > >> >> Yuxia > >> >> > >> >> > >> >> 发件人: "zhangmang1" <zhangma...@163.com> > >> >> 收件人: "dev" <dev@flink.apache.org>, "Jing Ge" <j...@ververica.com> > >> >> 抄送: "ron9 liu" <ron9....@gmail.com>, "lincoln 86xy" < > >> >> lincoln.8...@gmail.com>, luoyu...@alumni.sjtu.edu.cn > >> >> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36 > >> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS > >> >> SELECT(CTAS) statement > >> >> > >> >> hi, Jing > >> >> Thank you for your reply. > >> >> >1. It looks like you found another way to design the atomic CTAS with > >> >> >new > >> >> >serializable TwoPhaseCatalogTable instead of making Catalog > >> >> >serializable > >> >> as > >> >> >described in FLIP-218. Did I understand correctly? > >> >> Yes, when I was implementing the FLIP-218 solution, I encountered > >> >> problems > >> >> with Catalog/CatalogTable serialization deserialization, for example, > >> >> after > >> >> deserialization CatalogTable could not be converted to Hive Table. Also, > >> >> Catalog serialization is still a heavy operation, but it may not > >> >> actually > >> >> be necessary, we just need Create Table. > >> >> Therefore, the TwoPhaseCatalogTable program is proposed, which also > >> >> facilitates the implementation of the subsequent data lake, ReplaceTable > >> >> and other functions. > >> >> > >> >> >2. I am a little bit confused about the isStreamingMode parameter of > >> >> >Catalog#twoPhaseCreateTable(...), since it is the selector > >> >> >argument(code > >> >> >smell) we should commonly avoid in the public interface. According to > >> >> >the > >> >> >FLIP, isStreamingMode will be used by the Catalog to determine > >> >> >whether to > >> >> >support atomic or not. With this selector argument, there will be two > >> >> >different logics built within one method and it is hard to follow > >> >> >without > >> >> >reading the code or the doc carefully(another concern is to keep the > >> >> >doc > >> >> >and code alway be consistent) i.e. sometimes there will be no > >> >> >difference > >> >> by > >> >> >using true/false isStreamingMode, sometimes they are quite different - > >> >> >atomic vs. non-atomic. Another question is, before we call > >> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of > >> >> >isStreamingMode. In case only non-atomic is supported for streaming > >> >> >mode, > >> >> >we could just follow FLIP-218 instead of (twistedly) calling > >> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I > >> >> >miss > >> >> >anything here? > >> >> Here's what I think about this issue, atomic CTAS wants to be the > >> >> default > >> >> behavior and only fall back to non-atomic CTAS if it's completely > >> >> unattainable. Atomic CTAS will bring a better experience to users. > >> >> Flink is already a stream batch unified engine, In our company kwai, > >> >> many > >> >> users are also using flink to do batch data processing, but still > >> >> running > >> >> in Stream mode. > >> >> The boundary between stream and batch is gradually blurred, stream mode > >> >> jobs may also FINISH, so I added the isStreamingMode parameter, this > >> >> provides different atomicity implementations in Batch and Stream modes. > >> >> Not only to determine if atomicity is supported, but also to help select > >> >> different TwoPhaseCatalogTable implementations to provide different > >> >> levels > >> >> of atomicity! > >> >> > >> >> Looking forward to more feedback. > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> -- > >> >> Best regards, > >> >> Mang Zhang > >> >> > >> >> > >> >> > >> >> > >> >> At 2023-04-15 04:20:40, "Jing Ge" <j...@ververica.com.INVALID> wrote: > >> >> >Hi Mang, > >> >> > > >> >> >This is the FLIP I was looking forward to after FLIP-218. Thanks for > >> >> >driving it. I have two questions and would like to know your thoughts, > >> >> >thanks: > >> >> > > >> >> >1. It looks like you found another way to design the atomic CTAS with > >> >> >new > >> >> >serializable TwoPhaseCatalogTable instead of making Catalog > >> >> >serializable > >> >> as > >> >> >described in FLIP-218. Did I understand correctly? > >> >> >2. I am a little bit confused about the isStreamingMode parameter of > >> >> >Catalog#twoPhaseCreateTable(...), since it is the selector > >> >> >argument(code > >> >> >smell) we should commonly avoid in the public interface. According to > >> >> >the > >> >> >FLIP, isStreamingMode will be used by the Catalog to determine > >> >> >whether to > >> >> >support atomic or not. With this selector argument, there will be two > >> >> >different logics built within one method and it is hard to follow > >> >> >without > >> >> >reading the code or the doc carefully(another concern is to keep the > >> >> >doc > >> >> >and code alway be consistent) i.e. sometimes there will be no > >> >> >difference > >> >> by > >> >> >using true/false isStreamingMode, sometimes they are quite different - > >> >> >atomic vs. non-atomic. Another question is, before we call > >> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of > >> >> >isStreamingMode. In case only non-atomic is supported for streaming > >> >> >mode, > >> >> >we could just follow FLIP-218 instead of (twistedly) calling > >> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I > >> >> >miss > >> >> >anything here? > >> >> > > >> >> >Best regards, > >> >> >Jing > >> >> > > >> >> >On Fri, Apr 14, 2023 at 1:55 PM yuxia <luoyu...@alumni.sjtu.edu.cn> > >> >> wrote: > >> >> > > >> >> >> Hi, Mang. > >> >> >> +1 for completing the support for atomicity of CTAS, this is very > >> >> >> useful > >> >> >> in batch scenarios and integrate with the data lake which support > >> >> >> transcation. > >> >> >> > >> >> >> I just have one question, IIUC, the DynamiacTableSink will need to > >> >> >> know > >> >> >> it's for normal case or the atomicity with CTAS as well as neccessary > >> >> >> context. > >> >> >> Take jdbc catalog as an example, if it's CTAS with atomicity > >> >> >> supports, > >> >> the > >> >> >> jdbc DynamiacTableSink will write the temp table defined in the > >> >> >> TwoPhaseCatalogTable which is different from normal case. > >> >> >> > >> >> >> How can the DynamiacTableSink can get it? Could you give some > >> >> explanation > >> >> >> or example in this FLIP? > >> >> >> > >> >> >> > >> >> >> Best regards, > >> >> >> Yuxia > >> >> >> > >> >> >> ----- 原始邮件 ----- > >> >> >> 发件人: "zhangmang1" <zhangma...@163.com> > >> >> >> 收件人: "dev" <dev@flink.apache.org>, "ron9 liu" <ron9....@gmail.com>, > >> >> >> "lincoln 86xy" <lincoln.8...@gmail.com> > >> >> >> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40 > >> >> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS > >> >> >> SELECT(CTAS) statement > >> >> >> > >> >> >> Hi, Lincoln and Ron > >> >> >> > >> >> >> > >> >> >> Thank you for your reply. > >> >> >> On the naming wise I think OK, the future expansion of new features > >> >> >> more > >> >> >> uniform. I have updated the FLIP. > >> >> >> > >> >> >> > >> >> >> About Hive support atomicity CTAS, Hive is rich in usage scenarios > >> >> >> and > >> >> can > >> >> >> be divided into three scenarios: 1. writing Hive tables 2. writing > >> >> >> Hive > >> >> >> tables with speculative execution 3. writing Hive table with small > >> >> >> file > >> >> >> merge > >> >> >> > >> >> >> > >> >> >> The main purpose of FLIP-305 is to implement support for CTAS > >> >> >> atomicity > >> >> in > >> >> >> the Flink framework, > >> >> >> so I only poc to verify the first scenario of writing to the Hive > >> >> >> table, > >> >> >> and we can subsequently split the sub-task to support the other two > >> >> >> scenarios. > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> -- > >> >> >> > >> >> >> Best regards, > >> >> >> Mang Zhang > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> At 2023-04-13 12:27:24, "Lincoln Lee" <lincoln.8...@gmail.com> wrote: > >> >> >> >Hi, Mang > >> >> >> > > >> >> >> >+1 for completing the support for atomicity of CTAS, this is very > >> >> useful > >> >> >> in > >> >> >> >batch scenarios. > >> >> >> > > >> >> >> >I have two questions: > >> >> >> >1. naming wise: > >> >> >> > a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to > >> >> >> >`Catalog#twoPhaseCreateTable` (and we may add > >> >> >> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later) > >> >> >> > b) for the `TwoPhaseCommitCatalogTable`, may it be better using > >> >> >> >`TwoPhaseCatalogTable`? > >> >> >> > c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word > >> >> 'transaction' > >> >> >> >in the method name, which may remind users of the relevance of > >> >> transaction > >> >> >> >support (however, it is not strictly so), so I suggest changing it > >> >> >> >to > >> >> >> >`begin` > >> >> >> >2. Has this design been validated by any relevant Poc on hive or > >> >> >> >other > >> >> >> >catalogs? > >> >> >> > > >> >> >> >Best, > >> >> >> >Lincoln Lee > >> >> >> > > >> >> >> > > >> >> >> >liu ron <ron9....@gmail.com> 于2023年4月13日周四 10:17写道: > >> >> >> > > >> >> >> >> Hi, Mang > >> >> >> >> Atomicity is very important for CTAS, especially for batch jobs. > >> >> >> >> This > >> >> >> FLIP > >> >> >> >> is a continuation of FLIP-218, which is valuable for CTAS. > >> >> >> >> I just have one question, in the Motivation part of FLIP-218, we > >> >> >> mentioned > >> >> >> >> three levels of atomicity semantics, can this current design do > >> >> >> >> the > >> >> >> same as > >> >> >> >> Spark's DataSource V2, which can guarantee both atomicity and > >> >> isolation, > >> >> >> >> for example, can it be done by writing to Hive tables using CTAS? > >> >> >> >> > >> >> >> >> Best, > >> >> >> >> Ron > >> >> >> >> > >> >> >> >> Mang Zhang <zhangma...@163.com> 于2023年4月10日周一 11:03写道: > >> >> >> >> > >> >> >> >> > Hi, everyone > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > I'd like to start a discussion about FLIP-305: Support atomic > >> >> >> >> > for > >> >> >> CREATE > >> >> >> >> > TABLE AS SELECT(CTAS) statement [1]. > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but > >> >> >> >> > it's > >> >> not > >> >> >> >> > atomic. It will create the table first before job running. If > >> >> >> >> > the > >> >> job > >> >> >> >> > execution fails, or is cancelled, the table will not be dropped. > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > So I want Flink to support atomic CTAS, where only the table is > >> >> >> created > >> >> >> >> > when the Job succeeds. Improve user experience. > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > Looking forward to your feedback. > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > [1] > >> >> >> >> > > >> >> >> >> > >> >> >> > >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > -- > >> >> >> >> > > >> >> >> >> > Best regards, > >> >> >> >> > Mang Zhang > >> >> >> >> > >> >> >> > >> >> > >> >>