Thanks Mang for updating!

Looks good to me!

Best,
Jingsong

On Wed, Jun 7, 2023 at 2:31 PM Mang Zhang <zhangma...@163.com> wrote:
>
> Hi Jingsong,
>
> >I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
> >Flink design places execution in the TableFactory or directly in the
> >Catalog, so introducing an executable table makes me feel a bit
> >strange. (Spark is this style, but Flink may not be)
> On this issue, we introduce the executable logic commit/abort a bit of 
> strange on CatalogTable.
> After an offline discussion with yuxia, I tweaked the FLIP-305[1] scenario.
> The new solution is similar to the implementation of SupportsOverwrite,
> which introduces the SupportsStaging interface and infers whether 
> DynamicTableSink supports atomic ctas based on whether it implements the 
> SupportsStaging interface,
> and if so, it will get the StagedTable object from DynamicTableSink.
>
> For more implementation details, please see the FLIP-305 document.
>
> This is my poc commits 
> https://github.com/Tartarus0zm/flink/commit/025b30ad8f1a03e7738e9bb534e6e491c31990fa
>
>
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>
>
> --
>
> Best regards,
>
> Mang Zhang
>
>
>
> At 2023-05-12 13:02:14, "Jingsong Li" <jingsongl...@gmail.com> wrote:
> >Hi Mang,
> >
> >Thanks for starting this FLIP.
> >
> >I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
> >Flink design places execution in the TableFactory or directly in the
> >Catalog, so introducing an executable table makes me feel a bit
> >strange. (Spark is this style, but Flink may not be)
> >
> >And for `TwoPhase`, maybe `StagedXXX` like Spark is better?
> >
> >Best,
> >Jingsong
> >
> >On Wed, May 10, 2023 at 9:29 PM Mang Zhang <zhangma...@163.com> wrote:
> >>
> >> Hi Ron,
> >>
> >>
> >> First of all, thank you for your reply!
> >> After our offline communication, what you said is mainly in the 
> >> compilePlan scenario, but currently compilePlanSql does not support non 
> >> INSERT statements, otherwise it will throw an exception.
> >> >Unsupported SQL query! compilePlanSql() only accepts a single SQL 
> >> >statement of type INSERT
> >> But it's a good point that I will seriously consider.
> >> Non-atomic CTAS can be supported relatively easily;
> >> But atomic CTAS needs more adaptation work, so I'm going to leave it as is 
> >> and follow up with a separate issue to implement CTAS support for 
> >> compilePlanSql.
> >>
> >>
> >>
> >>
> >>
> >>
> >> --
> >>
> >> Best regards,
> >> Mang Zhang
> >>
> >>
> >>
> >>
> >>
> >> At 2023-04-23 17:52:07, "liu ron" <ron9....@gmail.com> wrote:
> >> >Hi, Mang
> >> >
> >> >I have a question about the implementation details. For the atomicity 
> >> >case,
> >> >since the target table is not created before the JobGraph is generated, 
> >> >but
> >> >then the target table is required to exist when optimizing plan to 
> >> >generate
> >> >the JobGraph. So how do you solve this problem?
> >> >
> >> >Best,
> >> >Ron
> >> >
> >> >yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年4月20日周四 09:35写道:
> >> >
> >> >> Share some insights about the new TwoPhaseCatalogTable proposed after
> >> >> offline discussion with Mang.
> >> >> The main or important reason is that the TwoPhaseCatalogTable enables
> >> >> external connectors to implement theirs own logic for commit / abort.
> >> >> In FLIP-218, for atomic CTAS, the Catalog will then just drop the table
> >> >> when the job fail. It's not ideal for it's too generic to work well.
> >> >> For example, some connectors will need to clean some temporary files in
> >> >> abort method. And the actual connector can know the specific logic for
> >> >> aborting.
> >> >>
> >> >> Best regards,
> >> >> Yuxia
> >> >>
> >> >>
> >> >> 发件人: "zhangmang1" <zhangma...@163.com>
> >> >> 收件人: "dev" <dev@flink.apache.org>, "Jing Ge" <j...@ververica.com>
> >> >> 抄送: "ron9 liu" <ron9....@gmail.com>, "lincoln 86xy" <
> >> >> lincoln.8...@gmail.com>, luoyu...@alumni.sjtu.edu.cn
> >> >> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36
> >> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
> >> >> SELECT(CTAS) statement
> >> >>
> >> >> hi, Jing
> >> >> Thank you for your reply.
> >> >> >1. It looks like you found another way to design the atomic CTAS with 
> >> >> >new
> >> >> >serializable TwoPhaseCatalogTable instead of making Catalog 
> >> >> >serializable
> >> >> as
> >> >> >described in FLIP-218. Did I understand correctly?
> >> >> Yes, when I was implementing the FLIP-218 solution, I encountered 
> >> >> problems
> >> >> with Catalog/CatalogTable serialization deserialization, for example, 
> >> >> after
> >> >> deserialization CatalogTable could not be converted to Hive Table. Also,
> >> >> Catalog serialization is still a heavy operation, but it may not 
> >> >> actually
> >> >> be necessary, we just need Create Table.
> >> >> Therefore, the TwoPhaseCatalogTable program is proposed, which also
> >> >> facilitates the implementation of the subsequent data lake, ReplaceTable
> >> >> and other functions.
> >> >>
> >> >> >2. I am a little bit confused about the isStreamingMode parameter of
> >> >> >Catalog#twoPhaseCreateTable(...), since it is the selector 
> >> >> >argument(code
> >> >> >smell) we should commonly avoid in the public interface. According to 
> >> >> >the
> >> >> >FLIP,  isStreamingMode will be used by the Catalog to determine 
> >> >> >whether to
> >> >> >support atomic or not. With this selector argument, there will be two
> >> >> >different logics built within one method and it is hard to follow 
> >> >> >without
> >> >> >reading the code or the doc carefully(another concern is to keep the 
> >> >> >doc
> >> >> >and code alway be consistent) i.e. sometimes there will be no 
> >> >> >difference
> >> >> by
> >> >> >using true/false isStreamingMode, sometimes they are quite different -
> >> >> >atomic vs. non-atomic. Another question is, before we call
> >> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of
> >> >> >isStreamingMode. In case only non-atomic is supported for streaming 
> >> >> >mode,
> >> >> >we could just follow FLIP-218 instead of (twistedly) calling
> >> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I 
> >> >> >miss
> >> >> >anything here?
> >> >> Here's what I think about this issue, atomic CTAS wants to be the 
> >> >> default
> >> >> behavior and only fall back to non-atomic CTAS if it's completely
> >> >> unattainable. Atomic CTAS will bring a better experience to users.
> >> >> Flink is already a stream batch unified engine, In our company kwai, 
> >> >> many
> >> >> users are also using flink to do batch data processing, but still 
> >> >> running
> >> >> in Stream mode.
> >> >> The boundary between stream and batch is gradually blurred, stream mode
> >> >> jobs may also FINISH, so I added the isStreamingMode parameter, this
> >> >> provides different atomicity implementations in Batch and Stream modes.
> >> >> Not only to determine if atomicity is supported, but also to help select
> >> >> different TwoPhaseCatalogTable implementations to provide different 
> >> >> levels
> >> >> of atomicity!
> >> >>
> >> >> Looking forward to more feedback.
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >> Best regards,
> >> >> Mang Zhang
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> At 2023-04-15 04:20:40, "Jing Ge" <j...@ververica.com.INVALID> wrote:
> >> >> >Hi Mang,
> >> >> >
> >> >> >This is the FLIP I was looking forward to after FLIP-218. Thanks for
> >> >> >driving it. I have two questions and would like to know your thoughts,
> >> >> >thanks:
> >> >> >
> >> >> >1. It looks like you found another way to design the atomic CTAS with 
> >> >> >new
> >> >> >serializable TwoPhaseCatalogTable instead of making Catalog 
> >> >> >serializable
> >> >> as
> >> >> >described in FLIP-218. Did I understand correctly?
> >> >> >2. I am a little bit confused about the isStreamingMode parameter of
> >> >> >Catalog#twoPhaseCreateTable(...), since it is the selector 
> >> >> >argument(code
> >> >> >smell) we should commonly avoid in the public interface. According to 
> >> >> >the
> >> >> >FLIP,  isStreamingMode will be used by the Catalog to determine 
> >> >> >whether to
> >> >> >support atomic or not. With this selector argument, there will be two
> >> >> >different logics built within one method and it is hard to follow 
> >> >> >without
> >> >> >reading the code or the doc carefully(another concern is to keep the 
> >> >> >doc
> >> >> >and code alway be consistent) i.e. sometimes there will be no 
> >> >> >difference
> >> >> by
> >> >> >using true/false isStreamingMode, sometimes they are quite different -
> >> >> >atomic vs. non-atomic. Another question is, before we call
> >> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of
> >> >> >isStreamingMode. In case only non-atomic is supported for streaming 
> >> >> >mode,
> >> >> >we could just follow FLIP-218 instead of (twistedly) calling
> >> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I 
> >> >> >miss
> >> >> >anything here?
> >> >> >
> >> >> >Best regards,
> >> >> >Jing
> >> >> >
> >> >> >On Fri, Apr 14, 2023 at 1:55 PM yuxia <luoyu...@alumni.sjtu.edu.cn>
> >> >> wrote:
> >> >> >
> >> >> >> Hi, Mang.
> >> >> >> +1 for completing the support for atomicity of CTAS, this is very 
> >> >> >> useful
> >> >> >> in batch scenarios and integrate with the data lake which support
> >> >> >> transcation.
> >> >> >>
> >> >> >> I just have one question, IIUC, the DynamiacTableSink will need to 
> >> >> >> know
> >> >> >> it's for normal case or the atomicity with CTAS as well as neccessary
> >> >> >> context.
> >> >> >> Take jdbc catalog as an example, if it's CTAS with atomicity 
> >> >> >> supports,
> >> >> the
> >> >> >> jdbc DynamiacTableSink will write the temp table defined in the
> >> >> >> TwoPhaseCatalogTable which is different from normal case.
> >> >> >>
> >> >> >> How can the DynamiacTableSink can get it? Could you give some
> >> >> explanation
> >> >> >> or example in this FLIP?
> >> >> >>
> >> >> >>
> >> >> >> Best regards,
> >> >> >> Yuxia
> >> >> >>
> >> >> >> ----- 原始邮件 -----
> >> >> >> 发件人: "zhangmang1" <zhangma...@163.com>
> >> >> >> 收件人: "dev" <dev@flink.apache.org>, "ron9 liu" <ron9....@gmail.com>,
> >> >> >> "lincoln 86xy" <lincoln.8...@gmail.com>
> >> >> >> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
> >> >> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
> >> >> >> SELECT(CTAS) statement
> >> >> >>
> >> >> >> Hi, Lincoln and Ron
> >> >> >>
> >> >> >>
> >> >> >> Thank you for your reply.
> >> >> >> On the naming wise I think OK, the future expansion of new features 
> >> >> >> more
> >> >> >> uniform. I have updated the FLIP.
> >> >> >>
> >> >> >>
> >> >> >> About Hive support atomicity CTAS, Hive is rich in usage scenarios 
> >> >> >> and
> >> >> can
> >> >> >> be divided into three scenarios: 1. writing Hive tables 2. writing 
> >> >> >> Hive
> >> >> >> tables with speculative execution 3. writing Hive table with small 
> >> >> >> file
> >> >> >> merge
> >> >> >>
> >> >> >>
> >> >> >> The main purpose of FLIP-305 is to implement support for CTAS 
> >> >> >> atomicity
> >> >> in
> >> >> >> the Flink framework,
> >> >> >> so I only poc to verify the first scenario of writing to the Hive 
> >> >> >> table,
> >> >> >> and we can subsequently split the sub-task to support the other two
> >> >> >> scenarios.
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> --
> >> >> >>
> >> >> >> Best regards,
> >> >> >> Mang Zhang
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> At 2023-04-13 12:27:24, "Lincoln Lee" <lincoln.8...@gmail.com> wrote:
> >> >> >> >Hi, Mang
> >> >> >> >
> >> >> >> >+1 for completing the support for atomicity of CTAS, this is very
> >> >> useful
> >> >> >> in
> >> >> >> >batch scenarios.
> >> >> >> >
> >> >> >> >I have two questions:
> >> >> >> >1. naming wise:
> >> >> >> >  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
> >> >> >> >`Catalog#twoPhaseCreateTable` (and we may add
> >> >> >> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
> >> >> >> >  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
> >> >> >> >`TwoPhaseCatalogTable`?
> >> >> >> >  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word
> >> >> 'transaction'
> >> >> >> >in the method name, which may remind users of the relevance of
> >> >> transaction
> >> >> >> >support (however, it is not strictly so), so I suggest changing it 
> >> >> >> >to
> >> >> >> >`begin`
> >> >> >> >2. Has this design been validated by any relevant Poc on hive or 
> >> >> >> >other
> >> >> >> >catalogs?
> >> >> >> >
> >> >> >> >Best,
> >> >> >> >Lincoln Lee
> >> >> >> >
> >> >> >> >
> >> >> >> >liu ron <ron9....@gmail.com> 于2023年4月13日周四 10:17写道:
> >> >> >> >
> >> >> >> >> Hi, Mang
> >> >> >> >> Atomicity is very important for CTAS, especially for batch jobs. 
> >> >> >> >> This
> >> >> >> FLIP
> >> >> >> >> is a continuation of FLIP-218, which is valuable for CTAS.
> >> >> >> >> I just have one question, in the Motivation part of FLIP-218, we
> >> >> >> mentioned
> >> >> >> >> three levels of atomicity semantics, can this current design do 
> >> >> >> >> the
> >> >> >> same as
> >> >> >> >> Spark's DataSource V2, which can guarantee both atomicity and
> >> >> isolation,
> >> >> >> >> for example, can it be done by writing to Hive tables using CTAS?
> >> >> >> >>
> >> >> >> >> Best,
> >> >> >> >> Ron
> >> >> >> >>
> >> >> >> >> Mang Zhang <zhangma...@163.com> 于2023年4月10日周一 11:03写道:
> >> >> >> >>
> >> >> >> >> > Hi, everyone
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> > I'd like to start a discussion about FLIP-305: Support atomic 
> >> >> >> >> > for
> >> >> >> CREATE
> >> >> >> >> > TABLE AS SELECT(CTAS) statement [1].
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but 
> >> >> >> >> > it's
> >> >> not
> >> >> >> >> > atomic. It will create the table first before job running. If 
> >> >> >> >> > the
> >> >> job
> >> >> >> >> > execution fails, or is cancelled, the table will not be dropped.
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> > So I want Flink to support atomic CTAS, where only the table is
> >> >> >> created
> >> >> >> >> > when the Job succeeds. Improve user experience.
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> > Looking forward to your feedback.
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> > [1]
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> > --
> >> >> >> >> >
> >> >> >> >> > Best regards,
> >> >> >> >> > Mang Zhang
> >> >> >> >>
> >> >> >>
> >> >>
> >> >>

Reply via email to