Hi Mang,
Thanks for clarifying it. I am trying to understand your thoughts. Do you actually mean the boundedness[1] instead of the execution modes[2]? I.e. the atomic CTAS will be only supported for bounded data. Best regards, Jing [1] https://flink.apache.org/what-is-flink/flink-architecture/ [2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming On Wed, Apr 19, 2023 at 9:14 AM Mang Zhang <zhangma...@163.com> wrote: > hi, Jing > > Thank you for your reply. > > >1. It looks like you found another way to design the atomic CTAS with new > >serializable TwoPhaseCatalogTable instead of making Catalog serializable as > >described in FLIP-218. Did I understand correctly? > Yes, when I was implementing the FLIP-218 solution, I encountered problems > with Catalog/CatalogTable serialization deserialization, for example, after > deserialization CatalogTable could not be converted to Hive Table. Also, > Catalog serialization is still a heavy operation, but it may not actually be > necessary, we just need Create Table. > Therefore, the TwoPhaseCatalogTable program is proposed, which also > facilitates the implementation of the subsequent data lake, ReplaceTable and > other functions. > > >2. I am a little bit confused about the isStreamingMode parameter of > >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code > >smell) we should commonly avoid in the public interface. According to the > >FLIP, isStreamingMode will be used by the Catalog to determine whether to > >support atomic or not. With this selector argument, there will be two > >different logics built within one method and it is hard to follow without > >reading the code or the doc carefully(another concern is to keep the doc > >and code alway be consistent) i.e. sometimes there will be no difference by > >using true/false isStreamingMode, sometimes they are quite different - > >atomic vs. non-atomic. Another question is, before we call > >Catalog#twoPhaseCreateTable(...), we have to know the value of > >isStreamingMode. In case only non-atomic is supported for streaming mode, > >we could just follow FLIP-218 instead of (twistedly) calling > >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss > >anything here? > > Here's what I think about this issue, atomic CTAS wants to be the default > behavior and only fall back to non-atomic CTAS if it's completely > unattainable. Atomic CTAS will bring a better experience to users. > Flink is already a stream batch unified engine, In our company kwai, many > users are also using flink to do batch data processing, but still running > in Stream mode. > The boundary between stream and batch is gradually blurred, stream mode > jobs may also FINISH, so I added the isStreamingMode parameter, this > provides different atomicity implementations in Batch and Stream modes. > Not only to determine if atomicity is supported, but also to help select > different TwoPhaseCatalogTable implementations to provide different levels > of atomicity! > > Looking forward to more feedback. > > > > -- > > Best regards, > > Mang Zhang > > > > At 2023-04-15 04:20:40, "Jing Ge" <j...@ververica.com.INVALID> wrote: > >Hi Mang, > > > >This is the FLIP I was looking forward to after FLIP-218. Thanks for > >driving it. I have two questions and would like to know your thoughts, > >thanks: > > > >1. It looks like you found another way to design the atomic CTAS with new > >serializable TwoPhaseCatalogTable instead of making Catalog serializable as > >described in FLIP-218. Did I understand correctly? > >2. I am a little bit confused about the isStreamingMode parameter of > >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code > >smell) we should commonly avoid in the public interface. According to the > >FLIP, isStreamingMode will be used by the Catalog to determine whether to > >support atomic or not. With this selector argument, there will be two > >different logics built within one method and it is hard to follow without > >reading the code or the doc carefully(another concern is to keep the doc > >and code alway be consistent) i.e. sometimes there will be no difference by > >using true/false isStreamingMode, sometimes they are quite different - > >atomic vs. non-atomic. Another question is, before we call > >Catalog#twoPhaseCreateTable(...), we have to know the value of > >isStreamingMode. In case only non-atomic is supported for streaming mode, > >we could just follow FLIP-218 instead of (twistedly) calling > >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss > >anything here? > > > >Best regards, > >Jing > > > >On Fri, Apr 14, 2023 at 1:55 PM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote: > > > >> Hi, Mang. > >> +1 for completing the support for atomicity of CTAS, this is very useful > >> in batch scenarios and integrate with the data lake which support > >> transcation. > >> > >> I just have one question, IIUC, the DynamiacTableSink will need to know > >> it's for normal case or the atomicity with CTAS as well as neccessary > >> context. > >> Take jdbc catalog as an example, if it's CTAS with atomicity supports, the > >> jdbc DynamiacTableSink will write the temp table defined in the > >> TwoPhaseCatalogTable which is different from normal case. > >> > >> How can the DynamiacTableSink can get it? Could you give some explanation > >> or example in this FLIP? > >> > >> > >> Best regards, > >> Yuxia > >> > >> ----- 原始邮件 ----- > >> 发件人: "zhangmang1" <zhangma...@163.com> > >> 收件人: "dev" <dev@flink.apache.org>, "ron9 liu" <ron9....@gmail.com>, > >> "lincoln 86xy" <lincoln.8...@gmail.com> > >> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40 > >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS > >> SELECT(CTAS) statement > >> > >> Hi, Lincoln and Ron > >> > >> > >> Thank you for your reply. > >> On the naming wise I think OK, the future expansion of new features more > >> uniform. I have updated the FLIP. > >> > >> > >> About Hive support atomicity CTAS, Hive is rich in usage scenarios and can > >> be divided into three scenarios: 1. writing Hive tables 2. writing Hive > >> tables with speculative execution 3. writing Hive table with small file > >> merge > >> > >> > >> The main purpose of FLIP-305 is to implement support for CTAS atomicity in > >> the Flink framework, > >> so I only poc to verify the first scenario of writing to the Hive table, > >> and we can subsequently split the sub-task to support the other two > >> scenarios. > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> -- > >> > >> Best regards, > >> Mang Zhang > >> > >> > >> > >> > >> > >> At 2023-04-13 12:27:24, "Lincoln Lee" <lincoln.8...@gmail.com> wrote: > >> >Hi, Mang > >> > > >> >+1 for completing the support for atomicity of CTAS, this is very useful > >> in > >> >batch scenarios. > >> > > >> >I have two questions: > >> >1. naming wise: > >> > a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to > >> >`Catalog#twoPhaseCreateTable` (and we may add > >> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later) > >> > b) for the `TwoPhaseCommitCatalogTable`, may it be better using > >> >`TwoPhaseCatalogTable`? > >> > c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word 'transaction' > >> >in the method name, which may remind users of the relevance of transaction > >> >support (however, it is not strictly so), so I suggest changing it to > >> >`begin` > >> >2. Has this design been validated by any relevant Poc on hive or other > >> >catalogs? > >> > > >> >Best, > >> >Lincoln Lee > >> > > >> > > >> >liu ron <ron9....@gmail.com> 于2023年4月13日周四 10:17写道: > >> > > >> >> Hi, Mang > >> >> Atomicity is very important for CTAS, especially for batch jobs. This > >> FLIP > >> >> is a continuation of FLIP-218, which is valuable for CTAS. > >> >> I just have one question, in the Motivation part of FLIP-218, we > >> mentioned > >> >> three levels of atomicity semantics, can this current design do the > >> same as > >> >> Spark's DataSource V2, which can guarantee both atomicity and isolation, > >> >> for example, can it be done by writing to Hive tables using CTAS? > >> >> > >> >> Best, > >> >> Ron > >> >> > >> >> Mang Zhang <zhangma...@163.com> 于2023年4月10日周一 11:03写道: > >> >> > >> >> > Hi, everyone > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > I'd like to start a discussion about FLIP-305: Support atomic for > >> CREATE > >> >> > TABLE AS SELECT(CTAS) statement [1]. > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not > >> >> > atomic. It will create the table first before job running. If the job > >> >> > execution fails, or is cancelled, the table will not be dropped. > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > So I want Flink to support atomic CTAS, where only the table is > >> created > >> >> > when the Job succeeds. Improve user experience. > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > Looking forward to your feedback. > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > [1] > >> >> > > >> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > -- > >> >> > > >> >> > Best regards, > >> >> > Mang Zhang > >> >> > >> > >