Hi Jingsong,
Thank you for your reply! We introduced `TwoPhaseCatalogTable` for two reasons: 1. The `TwoPhaseCatalogTable` of different data sources can have more operations, if through Catalog, there can only be simple create table and drop table, not flexible enough; For example, deleting a temporary directory, or using rename table in a relational database to implement atomic semantics in flink; 2. Facilitate subsequent extensions, such as support for replace table, extended data lake storage support; >And for `TwoPhase`, maybe `StagedXXX` like Spark is better? Regarding naming, at first, use `StagedCatalogTable`, but after offline discussions with yuxia and Lincoln, we think there is already TwoPhaseCommittingSink/TwoPhaseCommitSinkFunction in Flink, in order to keep the naming unity so change to `TwoPhaseCatalogTable`. -- Best regards, Mang Zhang At 2023-05-12 13:02:14, "Jingsong Li" <[email protected]> wrote: >Hi Mang, > >Thanks for starting this FLIP. > >I have some doubts about the `TwoPhaseCatalogTable`. Generally, our >Flink design places execution in the TableFactory or directly in the >Catalog, so introducing an executable table makes me feel a bit >strange. (Spark is this style, but Flink may not be) > >And for `TwoPhase`, maybe `StagedXXX` like Spark is better? > >Best, >Jingsong > >On Wed, May 10, 2023 at 9:29 PM Mang Zhang <[email protected]> wrote: >> >> Hi Ron, >> >> >> First of all, thank you for your reply! >> After our offline communication, what you said is mainly in the compilePlan >> scenario, but currently compilePlanSql does not support non INSERT >> statements, otherwise it will throw an exception. >> >Unsupported SQL query! compilePlanSql() only accepts a single SQL statement >> >of type INSERT >> But it's a good point that I will seriously consider. >> Non-atomic CTAS can be supported relatively easily; >> But atomic CTAS needs more adaptation work, so I'm going to leave it as is >> and follow up with a separate issue to implement CTAS support for >> compilePlanSql. >> >> >> >> >> >> >> -- >> >> Best regards, >> Mang Zhang >> >> >> >> >> >> At 2023-04-23 17:52:07, "liu ron" <[email protected]> wrote: >> >Hi, Mang >> > >> >I have a question about the implementation details. For the atomicity case, >> >since the target table is not created before the JobGraph is generated, but >> >then the target table is required to exist when optimizing plan to generate >> >the JobGraph. So how do you solve this problem? >> > >> >Best, >> >Ron >> > >> >yuxia <[email protected]> 于2023年4月20日周四 09:35写道: >> > >> >> Share some insights about the new TwoPhaseCatalogTable proposed after >> >> offline discussion with Mang. >> >> The main or important reason is that the TwoPhaseCatalogTable enables >> >> external connectors to implement theirs own logic for commit / abort. >> >> In FLIP-218, for atomic CTAS, the Catalog will then just drop the table >> >> when the job fail. It's not ideal for it's too generic to work well. >> >> For example, some connectors will need to clean some temporary files in >> >> abort method. And the actual connector can know the specific logic for >> >> aborting. >> >> >> >> Best regards, >> >> Yuxia >> >> >> >> >> >> 发件人: "zhangmang1" <[email protected]> >> >> 收件人: "dev" <[email protected]>, "Jing Ge" <[email protected]> >> >> 抄送: "ron9 liu" <[email protected]>, "lincoln 86xy" < >> >> [email protected]>, [email protected] >> >> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36 >> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS >> >> SELECT(CTAS) statement >> >> >> >> hi, Jing >> >> Thank you for your reply. >> >> >1. It looks like you found another way to design the atomic CTAS with new >> >> >serializable TwoPhaseCatalogTable instead of making Catalog serializable >> >> as >> >> >described in FLIP-218. Did I understand correctly? >> >> Yes, when I was implementing the FLIP-218 solution, I encountered problems >> >> with Catalog/CatalogTable serialization deserialization, for example, >> >> after >> >> deserialization CatalogTable could not be converted to Hive Table. Also, >> >> Catalog serialization is still a heavy operation, but it may not actually >> >> be necessary, we just need Create Table. >> >> Therefore, the TwoPhaseCatalogTable program is proposed, which also >> >> facilitates the implementation of the subsequent data lake, ReplaceTable >> >> and other functions. >> >> >> >> >2. I am a little bit confused about the isStreamingMode parameter of >> >> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code >> >> >smell) we should commonly avoid in the public interface. According to the >> >> >FLIP, isStreamingMode will be used by the Catalog to determine whether >> >> >to >> >> >support atomic or not. With this selector argument, there will be two >> >> >different logics built within one method and it is hard to follow without >> >> >reading the code or the doc carefully(another concern is to keep the doc >> >> >and code alway be consistent) i.e. sometimes there will be no difference >> >> by >> >> >using true/false isStreamingMode, sometimes they are quite different - >> >> >atomic vs. non-atomic. Another question is, before we call >> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of >> >> >isStreamingMode. In case only non-atomic is supported for streaming mode, >> >> >we could just follow FLIP-218 instead of (twistedly) calling >> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss >> >> >anything here? >> >> Here's what I think about this issue, atomic CTAS wants to be the default >> >> behavior and only fall back to non-atomic CTAS if it's completely >> >> unattainable. Atomic CTAS will bring a better experience to users. >> >> Flink is already a stream batch unified engine, In our company kwai, many >> >> users are also using flink to do batch data processing, but still running >> >> in Stream mode. >> >> The boundary between stream and batch is gradually blurred, stream mode >> >> jobs may also FINISH, so I added the isStreamingMode parameter, this >> >> provides different atomicity implementations in Batch and Stream modes. >> >> Not only to determine if atomicity is supported, but also to help select >> >> different TwoPhaseCatalogTable implementations to provide different levels >> >> of atomicity! >> >> >> >> Looking forward to more feedback. >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> -- >> >> Best regards, >> >> Mang Zhang >> >> >> >> >> >> >> >> >> >> At 2023-04-15 04:20:40, "Jing Ge" <[email protected]> wrote: >> >> >Hi Mang, >> >> > >> >> >This is the FLIP I was looking forward to after FLIP-218. Thanks for >> >> >driving it. I have two questions and would like to know your thoughts, >> >> >thanks: >> >> > >> >> >1. It looks like you found another way to design the atomic CTAS with new >> >> >serializable TwoPhaseCatalogTable instead of making Catalog serializable >> >> as >> >> >described in FLIP-218. Did I understand correctly? >> >> >2. I am a little bit confused about the isStreamingMode parameter of >> >> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code >> >> >smell) we should commonly avoid in the public interface. According to the >> >> >FLIP, isStreamingMode will be used by the Catalog to determine whether >> >> >to >> >> >support atomic or not. With this selector argument, there will be two >> >> >different logics built within one method and it is hard to follow without >> >> >reading the code or the doc carefully(another concern is to keep the doc >> >> >and code alway be consistent) i.e. sometimes there will be no difference >> >> by >> >> >using true/false isStreamingMode, sometimes they are quite different - >> >> >atomic vs. non-atomic. Another question is, before we call >> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of >> >> >isStreamingMode. In case only non-atomic is supported for streaming mode, >> >> >we could just follow FLIP-218 instead of (twistedly) calling >> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss >> >> >anything here? >> >> > >> >> >Best regards, >> >> >Jing >> >> > >> >> >On Fri, Apr 14, 2023 at 1:55 PM yuxia <[email protected]> >> >> wrote: >> >> > >> >> >> Hi, Mang. >> >> >> +1 for completing the support for atomicity of CTAS, this is very >> >> >> useful >> >> >> in batch scenarios and integrate with the data lake which support >> >> >> transcation. >> >> >> >> >> >> I just have one question, IIUC, the DynamiacTableSink will need to know >> >> >> it's for normal case or the atomicity with CTAS as well as neccessary >> >> >> context. >> >> >> Take jdbc catalog as an example, if it's CTAS with atomicity supports, >> >> the >> >> >> jdbc DynamiacTableSink will write the temp table defined in the >> >> >> TwoPhaseCatalogTable which is different from normal case. >> >> >> >> >> >> How can the DynamiacTableSink can get it? Could you give some >> >> explanation >> >> >> or example in this FLIP? >> >> >> >> >> >> >> >> >> Best regards, >> >> >> Yuxia >> >> >> >> >> >> ----- 原始邮件 ----- >> >> >> 发件人: "zhangmang1" <[email protected]> >> >> >> 收件人: "dev" <[email protected]>, "ron9 liu" <[email protected]>, >> >> >> "lincoln 86xy" <[email protected]> >> >> >> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40 >> >> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS >> >> >> SELECT(CTAS) statement >> >> >> >> >> >> Hi, Lincoln and Ron >> >> >> >> >> >> >> >> >> Thank you for your reply. >> >> >> On the naming wise I think OK, the future expansion of new features >> >> >> more >> >> >> uniform. I have updated the FLIP. >> >> >> >> >> >> >> >> >> About Hive support atomicity CTAS, Hive is rich in usage scenarios and >> >> can >> >> >> be divided into three scenarios: 1. writing Hive tables 2. writing Hive >> >> >> tables with speculative execution 3. writing Hive table with small file >> >> >> merge >> >> >> >> >> >> >> >> >> The main purpose of FLIP-305 is to implement support for CTAS atomicity >> >> in >> >> >> the Flink framework, >> >> >> so I only poc to verify the first scenario of writing to the Hive >> >> >> table, >> >> >> and we can subsequently split the sub-task to support the other two >> >> >> scenarios. >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> -- >> >> >> >> >> >> Best regards, >> >> >> Mang Zhang >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> At 2023-04-13 12:27:24, "Lincoln Lee" <[email protected]> wrote: >> >> >> >Hi, Mang >> >> >> > >> >> >> >+1 for completing the support for atomicity of CTAS, this is very >> >> useful >> >> >> in >> >> >> >batch scenarios. >> >> >> > >> >> >> >I have two questions: >> >> >> >1. naming wise: >> >> >> > a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to >> >> >> >`Catalog#twoPhaseCreateTable` (and we may add >> >> >> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later) >> >> >> > b) for the `TwoPhaseCommitCatalogTable`, may it be better using >> >> >> >`TwoPhaseCatalogTable`? >> >> >> > c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word >> >> 'transaction' >> >> >> >in the method name, which may remind users of the relevance of >> >> transaction >> >> >> >support (however, it is not strictly so), so I suggest changing it to >> >> >> >`begin` >> >> >> >2. Has this design been validated by any relevant Poc on hive or other >> >> >> >catalogs? >> >> >> > >> >> >> >Best, >> >> >> >Lincoln Lee >> >> >> > >> >> >> > >> >> >> >liu ron <[email protected]> 于2023年4月13日周四 10:17写道: >> >> >> > >> >> >> >> Hi, Mang >> >> >> >> Atomicity is very important for CTAS, especially for batch jobs. >> >> >> >> This >> >> >> FLIP >> >> >> >> is a continuation of FLIP-218, which is valuable for CTAS. >> >> >> >> I just have one question, in the Motivation part of FLIP-218, we >> >> >> mentioned >> >> >> >> three levels of atomicity semantics, can this current design do the >> >> >> same as >> >> >> >> Spark's DataSource V2, which can guarantee both atomicity and >> >> isolation, >> >> >> >> for example, can it be done by writing to Hive tables using CTAS? >> >> >> >> >> >> >> >> Best, >> >> >> >> Ron >> >> >> >> >> >> >> >> Mang Zhang <[email protected]> 于2023年4月10日周一 11:03写道: >> >> >> >> >> >> >> >> > Hi, everyone >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > I'd like to start a discussion about FLIP-305: Support atomic for >> >> >> CREATE >> >> >> >> > TABLE AS SELECT(CTAS) statement [1]. >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's >> >> not >> >> >> >> > atomic. It will create the table first before job running. If the >> >> job >> >> >> >> > execution fails, or is cancelled, the table will not be dropped. >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > So I want Flink to support atomic CTAS, where only the table is >> >> >> created >> >> >> >> > when the Job succeeds. Improve user experience. >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > Looking forward to your feedback. >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > [1] >> >> >> >> > >> >> >> >> >> >> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > -- >> >> >> >> > >> >> >> >> > Best regards, >> >> >> >> > Mang Zhang >> >> >> >> >> >> >> >> >> >> >>
