Hi Mang,


Thanks for clarifying it. I am trying to understand your thoughts. Do you
actually mean the boundedness[1] instead of the execution modes[2]? I.e.
the atomic CTAS will be only supported for bounded data.



Best regards,

Jing



[1] https://flink.apache.org/what-is-flink/flink-architecture/

[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming

On Wed, Apr 19, 2023 at 9:14 AM Mang Zhang <zhangma...@163.com> wrote:

> hi, Jing
>
> Thank you for your reply.
>
> >1. It looks like you found another way to design the atomic CTAS with new
> >serializable TwoPhaseCatalogTable instead of making Catalog serializable as
> >described in FLIP-218. Did I understand correctly?
> Yes, when I was implementing the FLIP-218 solution, I encountered problems 
> with Catalog/CatalogTable serialization deserialization, for example, after 
> deserialization CatalogTable could not be converted to Hive Table. Also, 
> Catalog serialization is still a heavy operation, but it may not actually be 
> necessary, we just need Create Table.
> Therefore, the TwoPhaseCatalogTable program is proposed, which also 
> facilitates the implementation of the subsequent data lake, ReplaceTable and 
> other functions.
>
> >2. I am a little bit confused about the isStreamingMode parameter of
> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
> >smell) we should commonly avoid in the public interface. According to the
> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
> >support atomic or not. With this selector argument, there will be two
> >different logics built within one method and it is hard to follow without
> >reading the code or the doc carefully(another concern is to keep the doc
> >and code alway be consistent) i.e. sometimes there will be no difference by
> >using true/false isStreamingMode, sometimes they are quite different -
> >atomic vs. non-atomic. Another question is, before we call
> >Catalog#twoPhaseCreateTable(...), we have to know the value of
> >isStreamingMode. In case only non-atomic is supported for streaming mode,
> >we could just follow FLIP-218 instead of (twistedly) calling
> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
> >anything here?
>
> Here's what I think about this issue, atomic CTAS wants to be the default
> behavior and only fall back to non-atomic CTAS if it's completely
> unattainable. Atomic CTAS will bring a better experience to users.
> Flink is already a stream batch unified engine, In our company kwai, many
> users are also using flink to do batch data processing, but still running
> in Stream mode.
> The boundary between stream and batch is gradually blurred, stream mode
> jobs may also FINISH, so I added the isStreamingMode parameter, this
> provides different atomicity implementations in Batch and Stream modes.
> Not only to determine if atomicity is supported, but also to help select
> different TwoPhaseCatalogTable implementations to provide different levels
> of atomicity!
>
> Looking forward to more feedback.
>
>
>
> --
>
> Best regards,
>
> Mang Zhang
>
>
>
> At 2023-04-15 04:20:40, "Jing Ge" <j...@ververica.com.INVALID> wrote:
> >Hi Mang,
> >
> >This is the FLIP I was looking forward to after FLIP-218. Thanks for
> >driving it. I have two questions and would like to know your thoughts,
> >thanks:
> >
> >1. It looks like you found another way to design the atomic CTAS with new
> >serializable TwoPhaseCatalogTable instead of making Catalog serializable as
> >described in FLIP-218. Did I understand correctly?
> >2. I am a little bit confused about the isStreamingMode parameter of
> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
> >smell) we should commonly avoid in the public interface. According to the
> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
> >support atomic or not. With this selector argument, there will be two
> >different logics built within one method and it is hard to follow without
> >reading the code or the doc carefully(another concern is to keep the doc
> >and code alway be consistent) i.e. sometimes there will be no difference by
> >using true/false isStreamingMode, sometimes they are quite different -
> >atomic vs. non-atomic. Another question is, before we call
> >Catalog#twoPhaseCreateTable(...), we have to know the value of
> >isStreamingMode. In case only non-atomic is supported for streaming mode,
> >we could just follow FLIP-218 instead of (twistedly) calling
> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
> >anything here?
> >
> >Best regards,
> >Jing
> >
> >On Fri, Apr 14, 2023 at 1:55 PM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote:
> >
> >> Hi, Mang.
> >> +1 for completing the support for atomicity of CTAS, this is very useful
> >> in batch scenarios and integrate with the data lake which support
> >> transcation.
> >>
> >> I just have one question, IIUC, the DynamiacTableSink will need to know
> >> it's for normal case or the atomicity with CTAS as well as neccessary
> >> context.
> >> Take jdbc catalog as an example, if it's CTAS with atomicity supports, the
> >> jdbc DynamiacTableSink will write the temp table defined in the
> >> TwoPhaseCatalogTable which is different from normal case.
> >>
> >> How can the DynamiacTableSink can get it? Could you give some explanation
> >> or example in this FLIP?
> >>
> >>
> >> Best regards,
> >> Yuxia
> >>
> >> ----- 原始邮件 -----
> >> 发件人: "zhangmang1" <zhangma...@163.com>
> >> 收件人: "dev" <dev@flink.apache.org>, "ron9 liu" <ron9....@gmail.com>,
> >> "lincoln 86xy" <lincoln.8...@gmail.com>
> >> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
> >> SELECT(CTAS) statement
> >>
> >> Hi, Lincoln and Ron
> >>
> >>
> >> Thank you for your reply.
> >> On the naming wise I think OK, the future expansion of new features more
> >> uniform. I have updated the FLIP.
> >>
> >>
> >> About Hive support atomicity CTAS, Hive is rich in usage scenarios and can
> >> be divided into three scenarios: 1. writing Hive tables 2. writing Hive
> >> tables with speculative execution 3. writing Hive table with small file
> >> merge
> >>
> >>
> >> The main purpose of FLIP-305 is to implement support for CTAS atomicity in
> >> the Flink framework,
> >> so I only poc to verify the first scenario of writing to the Hive table,
> >> and we can subsequently split the sub-task to support the other two
> >> scenarios.
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> --
> >>
> >> Best regards,
> >> Mang Zhang
> >>
> >>
> >>
> >>
> >>
> >> At 2023-04-13 12:27:24, "Lincoln Lee" <lincoln.8...@gmail.com> wrote:
> >> >Hi, Mang
> >> >
> >> >+1 for completing the support for atomicity of CTAS, this is very useful
> >> in
> >> >batch scenarios.
> >> >
> >> >I have two questions:
> >> >1. naming wise:
> >> >  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
> >> >`Catalog#twoPhaseCreateTable` (and we may add
> >> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
> >> >  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
> >> >`TwoPhaseCatalogTable`?
> >> >  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word 'transaction'
> >> >in the method name, which may remind users of the relevance of transaction
> >> >support (however, it is not strictly so), so I suggest changing it to
> >> >`begin`
> >> >2. Has this design been validated by any relevant Poc on hive or other
> >> >catalogs?
> >> >
> >> >Best,
> >> >Lincoln Lee
> >> >
> >> >
> >> >liu ron <ron9....@gmail.com> 于2023年4月13日周四 10:17写道:
> >> >
> >> >> Hi, Mang
> >> >> Atomicity is very important for CTAS, especially for batch jobs. This
> >> FLIP
> >> >> is a continuation of FLIP-218, which is valuable for CTAS.
> >> >> I just have one question, in the Motivation part of FLIP-218, we
> >> mentioned
> >> >> three levels of atomicity semantics, can this current design do the
> >> same as
> >> >> Spark's DataSource V2, which can guarantee both atomicity and isolation,
> >> >> for example, can it be done by writing to Hive tables using CTAS?
> >> >>
> >> >> Best,
> >> >> Ron
> >> >>
> >> >> Mang Zhang <zhangma...@163.com> 于2023年4月10日周一 11:03写道:
> >> >>
> >> >> > Hi, everyone
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > I'd like to start a discussion about FLIP-305: Support atomic for
> >> CREATE
> >> >> > TABLE AS SELECT(CTAS) statement [1].
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not
> >> >> > atomic. It will create the table first before job running. If the job
> >> >> > execution fails, or is cancelled, the table will not be dropped.
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > So I want Flink to support atomic CTAS, where only the table is
> >> created
> >> >> > when the Job succeeds. Improve user experience.
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > Looking forward to your feedback.
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > [1]
> >> >> >
> >> >>
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > --
> >> >> >
> >> >> > Best regards,
> >> >> > Mang Zhang
> >> >>
> >>
>
>

Reply via email to