Hi, Jark Thanks for your reply and suggest! >With this interface, we can easily support LEVEL-2 semantics by calling >`Catalog#dropTable` in the >`JobListener#onJobFailed`. We can also support LEVEL-3 by introducing >`StagingTableCatalog` like Spark, >calling `StagedTable#commitStagedChanges()` in `JobListener#onJobFinished` >and >calling StagedTable#abortStagedChanges() in `JobListener#onJobFailed`. I think we can achieve it in stages. In the first step, we support Level-2 first. There are still two questions that need to be discussed: 1. The new API we introduced to CTAS, in Table or Tableenvalonment? 2. The Drop Table Operation in JM is everyone's consensus, more details design at runtime. Add a new hook mechanism called on the JM side, need @gaoyunhaii, @Kevin.yingjie provides some help.
-- Best regards, Mang Zhang At 2022-05-18 22:53:43, "Jark Wu" <imj...@gmail.com> wrote: >Hi Mang, > >Thanks for proposing this, CTAS is a very important API for batch users. > >I think the key problem of this FLIP is the ACID semantics of the CTAS >operation. >We care most about two parts of the semantics: >1) Atomicity: the created table should be rolled back if the write is >failed. >2) Isolation: the created table shouldn't be visible before the write is >successful (read uncommitted). > >From your investigation, it seems that: >- Flink (your FLIP): none of them. ==> LEVEL-1 >- Spark DataSource v1: is atomic (can roll back), but is not isolated. ==> >LEVEL-2 >- Spark DataSource v2: guarantees both of them. ==> LEVEL-3 >- Hive MR: guarantees both of them. ==> LEVEL-3 > >In order to support higher ACID semantics, I agree with Godfrey that we >need some hooks in JM >which can be called when the job is finished or failed/canceled. It might >look like >`StreamExecutionEnvironment#registerJobListener(JobListener)`, >but JobListener is called on the >client side. What we need is an interface called on the JM side, because >the job can be submitted in >detached mode. > >With this interface, we can easily support LEVEL-2 semantics by calling >`Catalog#dropTable` in the >`JobListener#onJobFailed`. We can also support LEVEL-3 by introducing >`StagingTableCatalog` like Spark, >calling `StagedTable#commitStagedChanges()` in `JobListener#onJobFinished` >and >calling StagedTable#abortStagedChanges() in `JobListener#onJobFailed`. > >Best, >Jark > > >On Wed, 18 May 2022 at 12:29, godfrey he <godfre...@gmail.com> wrote: > >> Hi Mang, >> >> Thanks for driving this FLIP. >> >> Please follow the FLIP template[1] style, and the `Syntax ` is part of >> the `Public API Changes` section. >> ‘Program research’ and 'Implementation Plan' are part of the `Proposed >> Changes` section, >> or move ‘Program research’ to the appendix. >> >> > Providing methods that are used to execute CTAS for Table API users. >> We should introduce `createTable` in `Table` instead of `TableEnvironment`. >> Because all table operations are defined in `Table`, see: >> Table#executeInsert, >> Table#insertInto, etc. >> About the method name, I prefer to use `createTableAs`. >> >> > TableSink needs to provide the CleanUp API, developers implement as >> needed. >> I think it's hard for TableSink to implement a clean up operation. For >> file system sink, >> the data can be written to a temporary directory, but for key/value >> sinks, it's hard to >> remove the written keys, unless the sink records all written keys. >> >> > Do not do drop table operations in the framework, drop table is >> implemented in >> TableSink according to the needs of specific TableSink >> The TM process may crash at any time, and the drop operation will not >> be executed any more. >> >> How about we do the drop table operation and cleanup data action in the >> catalog? >> Where to execute the drop operation. one approach is in client, other is >> in JM. >> 1. in client: this requires the client to be alive until the job is >> finished and failed. >> 2. in JM: this requires the JM could provide some interfaces/hooks >> that the planner >> implements the logic and the code will be executed in JM. >> I prefer the approach two, but it requires more detail design with >> runtime @gaoyunhaii, @kevin.yingjie >> >> >> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template >> >> Best, >> Godfrey >> >> >> Mang Zhang <zhangma...@163.com> 于2022年5月6日周五 11:24写道: >> >> > >> > Hi, Yuxia >> > Thanks for your reply! >> > About the question 1, we will not support, FLIP-218[1] is to simplify >> the complexity of user DDL and make it easier for users to use. I have >> never encountered this case in a big data. >> > About the question 2, we will provide a public API like below public >> void cleanUp(); >> > >> > Regarding the mechanism of cleanUp, people who are familiar with >> the runtime module need to provide professional advice, which is what we >> need to focus on. >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > -- >> > >> > Best regards, >> > Mang Zhang >> > >> > >> > >> > >> > >> > At 2022-04-29 17:00:03, "yuxia" <luoyu...@alumni.sjtu.edu.cn> wrote: >> > >Thanks for for driving this work, it's to be a useful feature. >> > >About the flip-218, I have some questions. >> > > >> > >1: Does our CTAS syntax support specify target table's schema including >> column name and data type? I think it maybe a useful fature in case we want >> to change the data types in target table instead of always copy the source >> table's schema. It'll be more flexible with this feature. >> > >Btw, MySQL's "CREATE TABLE ... SELECT Statement"[1] support this >> feature. >> > > >> > >2: Seems it'll requre sink to implement an public interface to drop >> table, so what's the interface will look like? >> > > >> > >[1] https://dev.mysql.com/doc/refman/8.0/en/create-table-select.html >> > > >> > >Best regards, >> > >Yuxia >> > > >> > >----- 原始邮件 ----- >> > >发件人: "Mang Zhang" <zhangma...@163.com> >> > >收件人: "dev" <dev@flink.apache.org> >> > >发送时间: 星期四, 2022年 4 月 28日 下午 4:57:24 >> > >主题: [DISCUSS] FLIP-218: Support SELECT clause in CREATE TABLE(CTAS) >> > > >> > >Hi, everyone >> > > >> > > >> > >I would like to open a discussion for support select clause in CREATE >> TABLE(CTAS), >> > >With the development of business and the enhancement of flink sql >> capabilities, queries become more and more complex. >> > >Now the user needs to use the Create Table statement to create the >> target table first, and then execute the insert statement. >> > >However, the target table may have many columns, which will bring a lot >> of work outside the business logic to the user. >> > >At the same time, ensure that the schema of the created target table is >> consistent with the schema of the query result. >> > >Using a CTAS syntax like Hive/Spark can greatly facilitate the user. >> > > >> > > >> > > >> > >You can find more details in FLIP-218[1]. Looking forward to your >> feedback. >> > > >> > > >> > > >> > >[1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-218%3A+Support+SELECT+clause+in+CREATE+TABLE(CTAS) >> > > >> > > >> > > >> > > >> > >-- >> > > >> > >Best regards, >> > >Mang Zhang >> > >> > >>