Hi, Jark
Thanks for your reply and suggest!
>With this interface, we can easily support LEVEL-2 semantics by calling
>`Catalog#dropTable` in the
>`JobListener#onJobFailed`. We can also support LEVEL-3 by introducing
>`StagingTableCatalog` like Spark,
>calling `StagedTable#commitStagedChanges()` in `JobListener#onJobFinished`
>and
>calling StagedTable#abortStagedChanges() in `JobListener#onJobFailed`.
I think we can achieve it in stages. In the first step, we support Level-2 
first.
There are still two questions that need to be discussed:
1. The new API we introduced to CTAS, in Table or Tableenvalonment?
2. The Drop Table Operation in JM is everyone's consensus, more details design 
at runtime.
Add a new hook mechanism called on the JM side,  need @gaoyunhaii, 
@Kevin.yingjie provides some help.

--

Best regards,
Mang Zhang





At 2022-05-18 22:53:43, "Jark Wu" <imj...@gmail.com> wrote:
>Hi Mang,
>
>Thanks for proposing this, CTAS is a very important API for batch users.
>
>I think the key problem of this FLIP is the ACID semantics of the CTAS
>operation.
>We care most about two parts of the semantics:
>1) Atomicity: the created table should be rolled back if the write is
>failed.
>2) Isolation: the created table shouldn't be visible before the write is
>successful (read uncommitted).
>
>From your investigation, it seems that:
>- Flink (your FLIP): none of them.   ==> LEVEL-1
>- Spark DataSource v1: is atomic (can roll back), but is not isolated. ==>
>LEVEL-2
>- Spark DataSource v2: guarantees both of them.  ==> LEVEL-3
>- Hive MR: guarantees both of them. ==> LEVEL-3
>
>In order to support higher ACID semantics, I agree with Godfrey that we
>need some hooks in JM
>which can be called when the job is finished or failed/canceled. It might
>look like
>`StreamExecutionEnvironment#registerJobListener(JobListener)`,
>but JobListener is called on the
>client side. What we need is an interface called on the JM side, because
>the job can be submitted in
>detached mode.
>
>With this interface, we can easily support LEVEL-2 semantics by calling
>`Catalog#dropTable` in the
>`JobListener#onJobFailed`. We can also support LEVEL-3 by introducing
>`StagingTableCatalog` like Spark,
>calling `StagedTable#commitStagedChanges()` in `JobListener#onJobFinished`
>and
>calling StagedTable#abortStagedChanges() in `JobListener#onJobFailed`.
>
>Best,
>Jark
>
>
>On Wed, 18 May 2022 at 12:29, godfrey he <godfre...@gmail.com> wrote:
>
>> Hi Mang,
>>
>> Thanks for driving this FLIP.
>>
>> Please follow the FLIP template[1] style, and the `Syntax ` is part of
>> the `Public API Changes` section.
>> ‘Program research’ and 'Implementation Plan' are part of the `Proposed
>> Changes` section,
>> or move ‘Program research’ to the appendix.
>>
>> > Providing methods that are used to execute CTAS for Table API users.
>> We should introduce `createTable` in `Table` instead of `TableEnvironment`.
>> Because all table operations are defined in `Table`, see:
>> Table#executeInsert,
>> Table#insertInto, etc.
>> About the method name, I prefer to use `createTableAs`.
>>
>> > TableSink needs to provide the CleanUp API, developers implement as
>> needed.
>> I think it's hard for TableSink to implement a clean up operation. For
>> file system sink,
>> the data can be written to a temporary directory, but for key/value
>> sinks, it's hard to
>> remove the written keys, unless the sink records all written keys.
>>
>> > Do not do drop table operations in the framework, drop table is
>> implemented in
>> TableSink according to the needs of specific TableSink
>> The TM process may crash at any time, and the drop operation will not
>> be executed any more.
>>
>> How about we do the drop table operation and cleanup data action in the
>> catalog?
>> Where to execute the drop operation. one approach is in client, other is
>> in JM.
>> 1. in client: this requires the client to be alive until the job is
>> finished and failed.
>> 2. in JM: this requires the JM could provide some interfaces/hooks
>> that the planner
>> implements the logic and the code will be executed in JM.
>> I prefer the approach two, but it requires more detail design with
>> runtime @gaoyunhaii, @kevin.yingjie
>>
>>
>> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
>>
>> Best,
>> Godfrey
>>
>>
>> Mang Zhang <zhangma...@163.com> 于2022年5月6日周五 11:24写道:
>>
>> >
>> > Hi, Yuxia
>> > Thanks for your reply!
>> > About the question 1, we will not support, FLIP-218[1] is to simplify
>> the complexity of user DDL and make it easier for users to use. I have
>> never encountered this case in a big data.
>> > About the question 2, we will provide a public API like below public
>> void cleanUp();
>> >
>> >       Regarding the mechanism of cleanUp, people who are familiar with
>> the runtime module need to provide professional advice, which is what we
>> need to focus on.
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > --
>> >
>> > Best regards,
>> > Mang Zhang
>> >
>> >
>> >
>> >
>> >
>> > At 2022-04-29 17:00:03, "yuxia" <luoyu...@alumni.sjtu.edu.cn> wrote:
>> > >Thanks for for driving this work, it's to be a useful feature.
>> > >About the flip-218, I have some questions.
>> > >
>> > >1: Does our CTAS syntax support specify target table's schema including
>> column name and data type? I think it maybe a useful fature in case we want
>> to change the data types in target table instead of always copy the source
>> table's schema. It'll be more flexible with this feature.
>> > >Btw, MySQL's "CREATE TABLE ... SELECT Statement"[1] support this
>> feature.
>> > >
>> > >2: Seems it'll requre sink to implement an public interface to drop
>> table, so what's the interface will look like?
>> > >
>> > >[1] https://dev.mysql.com/doc/refman/8.0/en/create-table-select.html
>> > >
>> > >Best regards,
>> > >Yuxia
>> > >
>> > >----- 原始邮件 -----
>> > >发件人: "Mang Zhang" <zhangma...@163.com>
>> > >收件人: "dev" <dev@flink.apache.org>
>> > >发送时间: 星期四, 2022年 4 月 28日 下午 4:57:24
>> > >主题: [DISCUSS] FLIP-218: Support SELECT clause in CREATE TABLE(CTAS)
>> > >
>> > >Hi, everyone
>> > >
>> > >
>> > >I would like to open a discussion for support select clause in CREATE
>> TABLE(CTAS),
>> > >With the development of business and the enhancement of flink sql
>> capabilities, queries become more and more complex.
>> > >Now the user needs to use the Create Table statement to create the
>> target table first, and then execute the insert statement.
>> > >However, the target table may have many columns, which will bring a lot
>> of work outside the business logic to the user.
>> > >At the same time, ensure that the schema of the created target table is
>> consistent with the schema of the query result.
>> > >Using a CTAS syntax like Hive/Spark can greatly facilitate the user.
>> > >
>> > >
>> > >
>> > >You can find more details in FLIP-218[1]. Looking forward to your
>> feedback.
>> > >
>> > >
>> > >
>> > >[1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-218%3A+Support+SELECT+clause+in+CREATE+TABLE(CTAS)
>> > >
>> > >
>> > >
>> > >
>> > >--
>> > >
>> > >Best regards,
>> > >Mang Zhang
>> >
>> >
>>

Reply via email to