Hi all, I have been following this thread and it looks interesting. Can I please be of any help, please let me know.
Thanks, Teja On Wed, Dec 12, 2018, 4:31 AM Kurt Young <ykt...@gmail.com wrote: > Sounds great, thanks for the effort, Shuyi. > > Best, > Kurt > > > On Wed, Dec 12, 2018 at 5:14 PM Shuyi Chen <suez1...@gmail.com> wrote: > > > Hi all, > > > > I summarize the MVP based on the features that we agreed upon. For table > > update mode and custom watermark strategy and ts extractor, I found there > > are some discussions, so I decided to leave them out for the MVP. > > For row/map/array data type, I think we can add it as well if everyone > > agrees. > > > > > > 4) Event-Time Attributes and Watermarks > > Cited from SQL Server 2017 document ( > > > > > https://docs.microsoft.com/en-us/sql/relational-databases/tables/specify-computed-columns-in-a-table?view=sql-server-2017 > > ), > > "A > > computed column is a virtual column that is not physically stored in the > > table, unless the column is marked PERSISTED. A computed column > expression > > can use data from other columns to calculate a value for the column to > > which it belongs. " I think we can also use introduce the PERSISTED > keyword > > for computed column to indicate that the field can be stored back to the > > table, i.e. ts AS SYSTEMROWTIME() PERSISTED. > > > > 3) SOURCE / SINK / BOTH > > GRANT/INVOKE sounds like a more standard option than adding a property to > > CREATE TABLE to manage the ACL/permission. The ACL can be stored > somewhere > > in a database, and allow/disallow access to a dynamic table depending on > > whether it's a "INSERT INTO" or "SELECT". > > > > I can volunteer to put the discussion as a FLIP. I can try to summarize > > the current discussion, and share edit permission with you to collaborate > > on the documents. After we finalized the doc, we can publish it as a > FLIP. > > What do you think? > > > > Shuyi > > > > > > > > On Tue, Dec 11, 2018 at 9:13 AM Timo Walther <twal...@apache.org> wrote: > > > > > Hi all, > > > > > > thanks for summarizing the discussion @Shuyi. I think we need to > include > > > the "table update mode" problem as it might not be changed easily in > the > > > future. Regarding "support row/map/array data type", I don't see a > > > problem why we should not support them now as the data types are > already > > > included in the runtime. The "support custom timestamp extractor" is > > > solved by the computed columns approach. The "custom watermark > strategy" > > > can be added by supplying a class name as paramter in my opinion. > > > > > > Regarding the comments of Lin and Jark: > > > > > > @Lin: Instantiating a TableSource/Sink should not cost much, but we > > > should not mix catalog discussion and DDL at this point. > > > > > > 4) Event-Time Attributes and Watermarks > > > 4.b) Regarding `ts AS SYSTEMROWTIME()` and Lin's comment about "will > > > violate the rule": there is no explicit rule of doing so. Computed > > > column are also not standard compliant, if we can use information that > > > is encoded in constraints we should use it. Adding more and more > > > top-level properties makes the interaction with connectors more > > > difficult. An additional HEADER keyword sounds too connector-specific > > > and also not SQL compliant to me. > > > > > > 3) SOURCE / SINK / BOTH > > > GRANT/INVOKE are mutating an existing table, right? In my opinion, > > > independent of SQL databases but focusing on Flink user requirements, a > > > CREATE TABLE statement should be an immutable definition of a > connection > > > to an external system. > > > > > > 7) Table Update Mode > > > As far as I can see, the only thing missing for enabling all table > modes > > > is the declaration of a change flag. We could introduce a new keyword > > > here similar to WATERMARK: > > > > > > CREATE TABLE output_kafka_t1( > > > id bigint, > > > msg varchar, > > > CHANGE_FLAG FOR isRetraction > > > ) WITH ( > > > type=kafka > > > ,... > > > ); > > > > > > CREATE TABLE output_kafka_t1( > > > CHANGE_FLAG FOR isUpsert > > > id bigint, > > > msg varchar, > > > PRIMARY_KEY(id) > > > ) WITH ( > > > type=kafka > > > ,... > > > ); > > > > > > What do you think? > > > > > > @Jark: We should definitely stage the discussions and mention the > > > opinions and advantages/disadvantages that have been proposed already > in > > > the FLIP. > > > > > > Regards, > > > Timo > > > > > > Am 10.12.18 um 08:10 schrieb Jark Wu: > > > > Hi all, > > > > > > > > It's great to see we have an agreement on MVP. > > > > > > > > 4.b) Ingesting and writing timestamps to systems. > > > > I would treat the field as a physical column not a virtual column. If > > we > > > > treat it as computed column, it will be confused that the behavior is > > > > different when it is a source or sink. > > > > When it is a physical column, the behavior could be unified. Then the > > > > problem is how to mapping from the field to kafka message timestamp? > > > > One is Lin proposed above and is also used in KSQL[1]. Another idea > is > > > > introducing a HEADER column which strictly map by name to the fields > in > > > > message header. > > > > For example, > > > > > > > > CREATE TABLE output_kafka_t1( > > > > id bigint, > > > > ts timestamp HEADER, > > > > msg varchar > > > > ) WITH ( > > > > type=kafka > > > > ,... > > > > ); > > > > > > > > This is used in Alibaba but not included in the DDL draft. It will > > > further > > > > extend the SQL syntax, which is we should be cautious about. What do > > you > > > > think about this two solutions? > > > > > > > > 4.d) Custom watermark strategies: > > > > @Timo, I don't have a strong opinion on this. > > > > > > > > 3) SOURCE/SINK/BOTH > > > > Agree with Lin, GRANT/INVOKE [SELECT|UPDATE] ON TABLE is a clean and > > > > standard way to manage the permission, which is also adopted by > HIVE[2] > > > and > > > > many databases. > > > > > > > > [1]: > > https://docs.confluent.io/current/ksql/docs/tutorials/examples.html > > > > [2]: > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=45876173#Hivedeprecatedauthorizationmode/LegacyMode-Grant/RevokePrivileges > > > > > > > > @Timo, it's great if someone can conclude the discussion and > summarize > > > into > > > > a FLIP. > > > > @Shuyi, Thanks a lot for putting it all together. The google doc > looks > > > good > > > > to me, and I left some minor comments there. > > > > > > > > Regarding to the FLIP, I have some suggestions: > > > > 1. The FLIP can contain MILESTONE1 and FUTURE WORKS. > > > > 2. The MILESTONE1 is the MVP. It describes the MVP DDL syntax. > > > > 3. Separate FUTURE WORKS into two parts: UNDER DISCUSSION and > ADOPTED. > > We > > > > can derive MILESTONE2 from this easily when it is ready. > > > > > > > > I summarized the Future Works based on Shuyi's work: > > > > > > > > Adopted: (Should detailed described here...) > > > > 1. support data type nullability and precision. > > > > 2. comment on table and columns. > > > > > > > > Under Discussion: (Should briefly describe some options...) > > > > 1. Ingesting and writing timestamps to systems. > > > > 2. support custom watermark strategy. > > > > 3. support table update mode > > > > 4. support row/map/array data type > > > > 5. support schema derivation > > > > 6. support system versioned temporal table > > > > 7. support table index > > > > > > > > We can continue the further discussion here, also can separate to an > > > other > > > > DISCUSS topic if it is a sophisticated problem such as Table Update > > Mode, > > > > Temporal Table. > > > > > > > > Best, > > > > Jark > > > > > > > > On Mon, 10 Dec 2018 at 11:54, Lin Li <lincoln.8...@gmail.com> wrote: > > > > > > > >> hi all, > > > >> Thanks for your valuable input! > > > >> > > > >> 4) Event-Time Attributes and Watermarks: > > > >> 4.b) @Fabian As you mentioned using a computed columns `ts AS > > > >> SYSTEMROWTIME()` > > > >> for writing out to kafka table sink will violate the rule that > > computed > > > >> fields are not emitted. > > > >> Since the timestamp column in kafka's header area is a specific > > > >> materialization protocol, > > > >> why don't we treat it as an connector property? For an example: > > > >> ``` > > > >> CREATE TABLE output_kafka_t1( > > > >> id bigint, > > > >> ts timestamp, > > > >> msg varchar > > > >> ) WITH ( > > > >> type=kafka, > > > >> header.timestamp=ts > > > >> ,... > > > >> ); > > > >> ``` > > > >> > > > >> 4d) For custom watermark strategies > > > >> @Fabian Agree with you that opening another topic about this feature > > > later. > > > >> > > > >> 3) SOURCE / SINK / BOTH > > > >> I think the permissions and availabilities are two separately > things, > > > >> permissions > > > >> can be managed well by using GRANT/INVOKE(you can call it DCL) > > solutions > > > >> which > > > >> commonly used in different DBs. The permission part can be an new > > topic > > > for > > > >> later discussion, what do you think? > > > >> > > > >> For the availabilities, @Fabian @Timo I've another question, > > > >> does instantiate a TableSource/Sink cost much or has some other > > > downsides? > > > >> IMO, create a new source/sink object via the construct seems not > > costly. > > > >> When receiving a DDL we should associate it with the catalog object > > > >> (reusing an existence or create a new one). > > > >> Am I lost something important? > > > >> > > > >> 5. Schema declaration: > > > >> @Timo yes, your concern about the user convenience is very > important. > > > But > > > >> I haven't seen a clear way to solve this so far. > > > >> Do we put it later and wait for more inputs from the community? > > > >> > > > >> Shuyi Chen <suez1...@gmail.com> 于2018年12月8日周六 下午4:27写道: > > > >> > > > >>> Hi all, > > > >>> > > > >>> Thanks a lot for the great discussion. I think we can continue the > > > >>> discussion here while carving out a MVP so that the community can > > start > > > >>> working on. Based on the discussion so far, I try to summarize what > > we > > > >> will > > > >>> do for the MVP: > > > >>> > > > >>> MVP > > > >>> > > > >>> 1. support CREATE TABLE > > > >>> 2. support exisiting data type in Flink SQL, ignore nullability > > and > > > >>> precision > > > >>> 3. support table comments and column comments > > > >>> 4. support table constraint PRIMARY KEY and UNIQUE > > > >>> 5. support table properties using key-value pairs > > > >>> 6. support partitioned by > > > >>> 7. support computed column > > > >>> 8. support from-field and from-source timestamp extractors > > > >>> 9. support PERIODIC-ASCENDING, PERIODIC-BOUNDED, FROM-SOURCE > > > watermark > > > >>> strategies. > > > >>> 10. support a table property to allow explicit enforcement of > > > >>> read/write(source/sink) permission of a table > > > >>> > > > >>> I try to put up the DDL grammar ( > > > >>> > > > >>> > > > >> > > > > > > https://docs.google.com/document/d/1ug1-aVBSCxZQk58kR-yaK2ETCgL3zg0eDUVGCnW2V9E/edit?usp=sharing > > > >>> ) > > > >>> based on the MVP features above and the previous design docs. > Please > > > >> take a > > > >>> look and comment on it. > > > >>> > > > >>> > > > >>> Also, I summarize the future Improvement on CREATE TABLE as the > > > >> followings: > > > >>> 1. support table update mode > > > >>> 2. support data type nullability and precision > > > >>> 3. support row/map/array data type > > > >>> 4. support custom timestamp extractor and watermark strategy > > > >>> 5. support schema derivation > > > >>> 6. support system versioned temporal table > > > >>> 7. support table index > > > >>> > > > >>> I suggest we first agree on the MVP feature list and the MVP > grammar. > > > And > > > >>> then we can either continue the discussion of the future > improvements > > > >> here, > > > >>> or create separate JIRAs for each item and discuss further in the > > JIRA. > > > >>> What do you guys think? > > > >>> > > > >>> Shuyi > > > >>> > > > >>> On Fri, Dec 7, 2018 at 7:54 AM Timo Walther <twal...@apache.org> > > > wrote: > > > >>> > > > >>>> Hi all, > > > >>>> > > > >>>> I think we are making good progress. Thanks for all the feedback > so > > > >> far. > > > >>>> 3. Sources/Sinks: > > > >>>> It seems that I can not find supporters for explicit SOURCE/SINK > > > >>>> declaration so I'm fine with not using those keywords. > > > >>>> @Fabian: Maybe we don't haven have to change the TableFactory > > > interface > > > >>>> but just provide some helper functions in the TableFactoryService. > > > This > > > >>>> would solve the availability problem, but the permission problem > > would > > > >>>> still not be solved. If you are fine with it, we could introduce a > > > >>>> property instead? > > > >>>> > > > >>>> 5. Schema declaration: > > > >>>> @Lin: We should find an agreement on this as it requires changes > to > > > the > > > >>>> TableFactory interface. We should minimize changes to this > interface > > > >>>> because it is user-facing. Especially, if format schema and table > > > >> schema > > > >>>> differ, the need for such a functionality is very important. Our > > goal > > > >> is > > > >>>> to connect to existing infrastructure. For example, if we are > using > > > >> Avro > > > >>>> and the existing Avro format has enums but Flink SQL does not > > support > > > >>>> enums, it would be helpful to let the Avro format derive a table > > > >> schema. > > > >>>> Otherwise your need to declare both schemas which leads to CREATE > > > TABLE > > > >>>> statements of 400 lines+. > > > >>>> I think the mentioned query: > > > >>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro, > > > >>>> format.schema-file = "/my/avrofile.avsc") > > > >>>> is fine and should only be valid if the schema contains no > > > non-computed > > > >>>> columns. > > > >>>> > > > >>>> 7. Table Update Mode: > > > >>>> After thinking about it again, I agree. The mode of the sinks can > be > > > >>>> derived from the query and the existence of a PRIMARY KEY > > declaration. > > > >>>> But Fabian raised a very good point. How do we deal with sources? > > > Shall > > > >>>> we introduce a new keywords similar to WATERMARKS such that a > > > >>>> upsert/retract flag is not part of the visible schema? > > > >>>> > > > >>>> 4a. How to mark a field as attribute? > > > >>>> @Jark: Thanks for the explanation of the WATERMARK clause > semantics. > > > >>>> This is a nice way of marking existing fields. This sounds good to > > me. > > > >>>> > > > >>>> 4c) WATERMARK as constraint > > > >>>> I'm fine with leaving the WATERMARK clause in the schema > definition. > > > >>>> > > > >>>> 4d) Custom watermark strategies: > > > >>>> I would already think about custom watermark strategies as the > > current > > > >>>> descriptor design already supports this. ScalarFunction's don't > work > > > as > > > >>>> a PeriodicWatermarkAssigner has different semantics. Why not > simply > > > >>>> entering the a full class name here as it is done in the current > > > >> design? > > > >>>> 4.b) Ingesting and writing timestamps to systems (like Kafka) > > > >>>> @Fabian: Yes, your suggestion sounds good to me. This behavior > would > > > be > > > >>>> similar to our current `timestamps: from-source` design. > > > >>>> > > > >>>> Once our discussion has found a conclusion, I would like to > > volunteer > > > >>>> and summarize the outcome of this mailing thread. It nicely aligns > > > with > > > >>>> the update work on the connector improvements document (that I > > wanted > > > >> to > > > >>>> do anyway) and the ongoing external catalog discussion. > > Furthermore, I > > > >>>> would also want to propose how to change existing interfaces by > > > keeping > > > >>>> the DDL, connector improvements, and external catalog support in > > mind. > > > >>>> Would that be ok for you? > > > >>>> > > > >>>> Thanks, > > > >>>> Timo > > > >>>> > > > >>>> > > > >>>> > > > >>>> Am 07.12.18 um 14:48 schrieb Fabian Hueske: > > > >>>>> Hi all, > > > >>>>> > > > >>>>> Thanks for the discussion. > > > >>>>> I'd like to share my point of view as well. > > > >>>>> > > > >>>>> 4) Event-Time Attributes and Watermarks: > > > >>>>> 4.a) I agree with Lin and Jark's proposal. Declaring a watermark > on > > > >> an > > > >>>>> attribute declares it as an event-time attribute. > > > >>>>> 4.b) Ingesting and writing timestamps to systems (like Kafka). We > > > >> could > > > >>>> use > > > >>>>> a special function like (ts AS SYSTEMROWTIME()). This function > will > > > >>>>> indicate that we read the timestamp directly from the system (and > > not > > > >>> the > > > >>>>> data). We can also write the field back to the system when > emitting > > > >> the > > > >>>>> table (violating the rule that computed fields are not emitted). > > > >>>>> 4c) I would treat WATERMARK similar to a PRIMARY KEY or UNIQUE > KEY > > > >>>>> constraint and therefore keep it in the schema definition. > > > >>>>> 4d) For custom watermark strategies, a simple expressions or > > > >>>>> ScalarFunctions won't be sufficient. Sophisticated approaches > could > > > >>>> collect > > > >>>>> histograms, etc. But I think we can leave that out for later. > > > >>>>> > > > >>>>> 3) SOURCE / SINK / BOTH > > > >>>>> As you said, there are two things to consider here: permission > and > > > >>>>> availability of a TableSource/TableSink. > > > >>>>> I think that neither should be a reason to add a keyword at such > a > > > >>>>> sensitive position. > > > >>>>> However, I also see Timo's point that it would be good to know > > > >> up-front > > > >>>> how > > > >>>>> a table can be used without trying to instantiate a > > TableSource/Sink > > > >>> for > > > >>>> a > > > >>>>> query. > > > >>>>> Maybe we can extend the TableFactory such that it provides > > > >> information > > > >>>>> about which sources/sinks it can provide. > > > >>>>> > > > >>>>> 7. Table Update Mode > > > >>>>> Something that we definitely need to consider is how tables are > > > >>> ingested, > > > >>>>> i.e., append, retract or upsert. > > > >>>>> Especially, since upsert and retraction need a meta-data column > > that > > > >>>>> indicates whether an event is an insert (or upsert) or a delete > > > >> change. > > > >>>>> This column needs to be identified somehow, most likely as part > of > > > >> the > > > >>>>> input format. Ideally, this column should not be part of the > table > > > >>> schema > > > >>>>> (as it would be always true). > > > >>>>> Emitting tables is not so much of an issue as the properties of > the > > > >>> table > > > >>>>> tell use what to do (append-only/update, unique key y/n). > > > >>>>> > > > >>>>> Best, > > > >>>>> Fabian > > > >>>>> > > > >>>>> > > > >>>>> Am Fr., 7. Dez. 2018 um 10:39 Uhr schrieb Jark Wu < > > imj...@gmail.com > > > >>> : > > > >>>>>> Hi Timo, > > > >>>>>> > > > >>>>>> Thanks for your quickly feedback! Here are some of my thoughts: > > > >>>>>> > > > >>>>>> Append, upserts, retract mode on sinks is also a very complex > > > >>> problem. I > > > >>>>>> think append/upserts/retract is the ability of a table, user do > > not > > > >>>> need to > > > >>>>>> specify a table is used for append or retraction or upsert. The > > > >> query > > > >>>> can > > > >>>>>> choose which mode the sink is. If an unbounded groupby is > inserted > > > >>> into > > > >>>> an > > > >>>>>> append sink (the sink only implements/supports append), an > > exception > > > >>>> can be > > > >>>>>> thrown. A more complex problem is, if we want to write > > > >>>> retractions/upserts > > > >>>>>> to Kafka, how to encode the change flag (add or retract/delete) > on > > > >> the > > > >>>>>> table? Maybe we should propose some protocal for the change flag > > > >>>> encoding, > > > >>>>>> but I don't have a clear idea about this right now. > > > >>>>>> > > > >>>>>> 3. Sources/Sinks: The source/sink tag is similar to the > > > >>>>>> append/upsert/retract problem. Besides source/sink, actully we > > have > > > >>>> stream > > > >>>>>> source, stream sink, batch source, batch sink, and the stream > sink > > > >>> also > > > >>>>>> include append/upsert/retract three modes. Should we put all the > > > >> tags > > > >>> on > > > >>>>>> the CREATE TABLE? IMO, the table's ability is defined by the > table > > > >>>> itself, > > > >>>>>> user do not need to specify it. If it is only a readable table, > an > > > >>>>>> exception can be thrown when write to it. As the source/sink tag > > can > > > >>> be > > > >>>>>> omitted in CREATE TABLE, could we skip it and only support > CREATE > > > >>> TABLE > > > >>>> in > > > >>>>>> the first version, and add it back in the future when we really > > need > > > >>>> it? It > > > >>>>>> keeps API compatible and make sure the MVP is what we consider > > > >>> clearly. > > > >>>>>> 4a. How to mark a field as attribute? > > > >>>>>> The watermark definition includes two parts: use which field as > > time > > > >>>>>> attribute and use what generate strategy. > > > >>>>>> When we want to mark `ts` field as attribute: WATERMARK FOR `ts` > > AS > > > >>>> OFFSET > > > >>>>>> '5' SECOND. > > > >>>>>> If we have a POJO{id, user, ts} field named "pojo", we can mark > it > > > >>> like > > > >>>>>> this: WATERMARK FOR pojo.ts AS OFFSET '5' SECOND > > > >>>>>> > > > >>>>>> 4b. timestamp write to Kafka message header > > > >>>>>> Even though we can define multiple time attribute on a table, > only > > > >> one > > > >>>> time > > > >>>>>> attribute can be actived/used in a query (in a stream). When we > > > >> enable > > > >>>>>> `writeTiemstamp`, the only attribute actived in the stream will > be > > > >>>> write to > > > >>>>>> Kafka message header. What I mean the timestmap in StreamRecord > is > > > >> the > > > >>>> time > > > >>>>>> attribute in the stream. > > > >>>>>> > > > >>>>>> 4c. Yes. We introduced the WATERMARK keyword similar to the > INDEX, > > > >>>> PRIMARY > > > >>>>>> KEY keywords. > > > >>>>>> > > > >>>>>> @Timo, Do you have any other advice or questions on the > watermark > > > >>>> syntax ? > > > >>>>>> For example, the builtin strategy name: "BOUNDED WITH OFFSET" VS > > > >>>> "OFFSET" > > > >>>>>> VS ... > > > >>>>>> > > > >>>>>> > > > >>>>>> Cheers, > > > >>>>>> Jark > > > >>>>>> > > > >>>>>> On Fri, 7 Dec 2018 at 17:13, Lin Li <lincoln.8...@gmail.com> > > wrote: > > > >>>>>> > > > >>>>>>> Hi Timo, > > > >>>>>>> Thanks for your feedback, here's some thoughts of mine: > > > >>>>>>> > > > >>>>>>> 3. Sources/Sinks: > > > >>>>>>> "Let's assume an interactive CLI session, people should be able > > to > > > >>> list > > > >>>>>> all > > > >>>>>>> source table and sink tables to know upfront if they can use an > > > >>> INSERT > > > >>>>>> INTO > > > >>>>>>> here or not." > > > >>>>>>> This requirement can be simply resolved by a document that list > > all > > > >>>>>>> supported source/sink/both connectors and the sql-client can > > > >> perform > > > >>> a > > > >>>>>>> quick check. It's only an implementation choice, not necessary > > for > > > >>> the > > > >>>>>>> syntax. > > > >>>>>>> For connector implementation, a connector may implement one or > > some > > > >>> or > > > >>>>>> all > > > >>>>>>> of the [Stream|Batch]Source/[Stream|Batch]Sink traits, we can > > > >> derive > > > >>>> the > > > >>>>>>> availability for any give query without the SOURCE/SINk > keywords > > or > > > >>>>>>> specific table properties in WITH clause. > > > >>>>>>> Since there's still indeterminacy, shall we skip these two > > keywords > > > >>> for > > > >>>>>> the > > > >>>>>>> MVP DDL? We can make further discussion after users' feedback. > > > >>>>>>> > > > >>>>>>> 6. Partitioning and keys > > > >>>>>>> Agree with you that raise the priority of table constraint and > > > >>>>>> partitioned > > > >>>>>>> table support for better connectivity to Hive and Kafka. I'll > add > > > >>>>>>> partitioned table syntax(compatible to hive) into the DDL Draft > > doc > > > >>>>>>> later[1]. > > > >>>>>>> > > > >>>>>>> 5. Schema declaration > > > >>>>>>> "if users want to declare computed columns they have a "schema" > > > >>>>>> constraints > > > >>>>>>> but without columns > > > >>>>>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro, > > > >>>>>>> format.schema-file = "/my/avrofile.avsc") " > > > >>>>>>> > > > >>>>>>> From the point of my view, this ddl is invalid because the > > > primary > > > >>> key > > > >>>>>>> constraint already references two columns but types unseen. > > > >>>>>>> And Xuefu pointed a important matching problem, so let's put > > schema > > > >>>>>>> derivation as a follow-up extension ? > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> Timo Walther <twal...@apache.org> 于2018年12月6日周四 下午6:05写道: > > > >>>>>>> > > > >>>>>>>> Hi everyone, > > > >>>>>>>> > > > >>>>>>>> great to have such a lively discussion. My next batch of > > feedback: > > > >>>>>>>> > > > >>>>>>>> @Jark: We don't need to align the descriptor approach with > SQL. > > > >> I'm > > > >>>>>> open > > > >>>>>>>> for different approaches as long as we can serve a broad set > of > > > >> use > > > >>>>>>>> cases and systems. The descriptor approach was a first attempt > > to > > > >>>> cover > > > >>>>>>>> all aspects and connector/format characteristics. Just another > > > >>>> example, > > > >>>>>>>> that is missing in the DDL design: How can a user decide if > > > >> append, > > > >>>>>>>> retraction, or upserts should be used to sink data into the > > target > > > >>>>>>>> system? Do we want to define all these improtant properties in > > the > > > >>> big > > > >>>>>>>> WITH property map? If yes, we are already close to the > > descriptor > > > >>>>>>>> approach. Regarding the "standard way", most DDL languages > have > > > >> very > > > >>>>>>>> custom syntax so there is not a real "standard". > > > >>>>>>>> > > > >>>>>>>> 3. Sources/Sinks: @Lin: If a table has both read/write access > it > > > >> can > > > >>>> be > > > >>>>>>>> created using a regular CREATE TABLE (omitting a specific > > > >>> source/sink) > > > >>>>>>>> declaration. Regarding the transition from source/sink to > both, > > > >> yes > > > >>> we > > > >>>>>>>> would need to update the a DDL and catalogs. But is this a > > > >> problem? > > > >>>> One > > > >>>>>>>> also needs to add new queries that use the tables. @Xuefu: It > is > > > >> not > > > >>>>>>>> only about security aspects. Especially for streaming use > cases, > > > >> not > > > >>>>>>>> every connector can be used as a source easily. For example, a > > > >> JDBC > > > >>>>>> sink > > > >>>>>>>> is easier than a JDBC source. Let's assume an interactive CLI > > > >>> session, > > > >>>>>>>> people should be able to list all source table and sink tables > > to > > > >>> know > > > >>>>>>>> upfront if they can use an INSERT INTO here or not. > > > >>>>>>>> > > > >>>>>>>> 6. Partitioning and keys: @Lin: I would like to include this > in > > > >> the > > > >>>>>>>> design given that Hive integration and Kafka key support are > in > > > >> the > > > >>>>>>>> making/are on our roadmap for this release. > > > >>>>>>>> > > > >>>>>>>> 5. Schema declaration: @Lin: You are right it is not > > conflicting. > > > >> I > > > >>>>>> just > > > >>>>>>>> wanted to raise the point because if users want to declare > > > >> computed > > > >>>>>>>> columns they have a "schema" constraints but without columns. > > Are > > > >> we > > > >>>> ok > > > >>>>>>>> with a syntax like ... > > > >>>>>>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro, > > > >>>>>>>> format.schema-file = "/my/avrofile.avsc") ? > > > >>>>>>>> @Xuefu: Yes, you are right that an external schema might not > > > >> excatly > > > >>>>>>>> match but this is true for both directions: > > > >>>>>>>> table schema "derives" format schema and format schema > "derives" > > > >>> table > > > >>>>>>>> schema. > > > >>>>>>>> > > > >>>>>>>> 7. Hive compatibility: @Xuefu: I agree that Hive is popular > but > > we > > > >>>>>>>> should not just adopt everything from Hive as there syntax is > > very > > > >>>>>>>> batch-specific. We should come up with a superset of > historical > > > >> and > > > >>>>>>>> future requirements. Supporting Hive queries can be an > > > >> intermediate > > > >>>>>>>> layer on top of Flink's DDL. > > > >>>>>>>> > > > >>>>>>>> 4. Time attributes: @Lin: I'm fine with changing the > > > >>>> TimestampExtractor > > > >>>>>>>> interface as this is also important for better separation of > > > >>> connector > > > >>>>>>>> and table module [1]. However, I'm wondering about watermark > > > >>>>>> generation. > > > >>>>>>>> 4a. timestamps are in the schema twice: > > > >>>>>>>> @Jark: "existing field is Long/Timestamp, we can just use it > as > > > >>>>>>>> rowtime": yes, but we need to mark a field as such an > attribute. > > > >> How > > > >>>>>>>> does the syntax for marking look like? Also in case of > > timestamps > > > >>> that > > > >>>>>>>> are nested in the schema? > > > >>>>>>>> > > > >>>>>>>> 4b. how can we write out a timestamp into the message header?: > > > >>>>>>>> I agree to simply ignore computed columns when writing out. > This > > > >> is > > > >>>>>> like > > > >>>>>>>> 'field-change: add' that I mentioned in the improvements > > document. > > > >>>>>>>> @Jark: "then the timestmap in StreamRecord will be write to > > Kafka > > > >>>>>>>> message header": Unfortunately, there is no timestamp in the > > > >> stream > > > >>>>>>>> record. Additionally, multiple time attributes can be in a > > schema. > > > >>> So > > > >>>>>> we > > > >>>>>>>> need a constraint that tells the sink which column to use > > > >> (possibly > > > >>>>>>>> computed as well)? > > > >>>>>>>> > > > >>>>>>>> 4c. separate all time attribute concerns into a special clause > > > >> next > > > >>> to > > > >>>>>>>> the regular schema? > > > >>>>>>>> @Jark: I don't have a strong opinion on this. I just have the > > > >>> feeling > > > >>>>>>>> that the "schema part" becomes quite messy because the actual > > > >> schema > > > >>>>>>>> with types and fields is accompanied by so much metadata about > > > >>>>>>>> timestamps, watermarks, keys,... and we would need to > introduce > > a > > > >>> new > > > >>>>>>>> WATERMARK keyword within a schema that was close to standard > up > > to > > > >>>> this > > > >>>>>>>> point. > > > >>>>>>>> > > > >>>>>>>> Thanks everyone, > > > >>>>>>>> Timo > > > >>>>>>>> > > > >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-9461 > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> Am 06.12.18 um 07:08 schrieb Jark Wu: > > > >>>>>>>>> Hi Timo, > > > >>>>>>>>> > > > >>>>>>>>> Thank you for the valuable feedbacks. > > > >>>>>>>>> > > > >>>>>>>>> First of all, I think we don't need to align the SQL > > > >> functionality > > > >>> to > > > >>>>>>>>> Descriptor. Because SQL is a more standard API, we should be > as > > > >>>>>>> cautious > > > >>>>>>>> as > > > >>>>>>>>> possible to extend the SQL syntax. If something can be done > in > > a > > > >>>>>>> standard > > > >>>>>>>>> way, we shouldn't introduce something new. > > > >>>>>>>>> > > > >>>>>>>>> Here are some of my thoughts: > > > >>>>>>>>> > > > >>>>>>>>> 1. Scope: Agree. > > > >>>>>>>>> 2. Constraints: Agree. > > > >>>>>>>>> 4. Time attributes: > > > >>>>>>>>> 4a. timestamps are in the schema twice. > > > >>>>>>>>> If an existing field is Long/Timestamp, we can just use > > it > > > >> as > > > >>>>>>>> rowtime, > > > >>>>>>>>> no twice defined. If it is not a Long/Timestamp, we use > > computed > > > >>>>>> column > > > >>>>>>>> to > > > >>>>>>>>> get an expected timestamp column to be rowtime, is this what > > you > > > >>> mean > > > >>>>>>>>> defined twice? But I don't think it is a problem, but an > > > >>> advantages, > > > >>>>>>>>> because it is easy to use, user do not need to consider > whether > > > >> to > > > >>>>>>>> "replace > > > >>>>>>>>> the existing column" or "add a new column", he will not be > > > >> confused > > > >>>>>>>> what's > > > >>>>>>>>> the real schema is, what's the index of rowtime in the > schema? > > > >>>>>>> Regarding > > > >>>>>>>> to > > > >>>>>>>>> the optimization, even if timestamps are in schema twice, > when > > > >> the > > > >>>>>>>> original > > > >>>>>>>>> timestamp is never used in query, then the projection > pushdown > > > >>>>>>>> optimization > > > >>>>>>>>> can cut this field as early as possible, which is exactly the > > > >> same > > > >>> as > > > >>>>>>>>> "replacing the existing column" in runtime. > > > >>>>>>>>> > > > >>>>>>>>> 4b. how can we write out a timestamp into the message > > > >> header? > > > >>>>>>>>> That's a good point. I think computed column is just a > > > >>> virtual > > > >>>>>>>> column > > > >>>>>>>>> on table which is only relative to reading. If we want to > write > > > >> to > > > >>> a > > > >>>>>>>> table > > > >>>>>>>>> with computed column defined, we only need to provide the > > columns > > > >>>>>>> except > > > >>>>>>>>> computed columns (see SQL Server [1]). The computed column is > > > >>> ignored > > > >>>>>>> in > > > >>>>>>>>> the insert statement. Get back to the question, how can we > > write > > > >>> out > > > >>>>>> a > > > >>>>>>>>> timestamp into the message header? IMO, we can provide a > > > >>>>>> configuration > > > >>>>>>> to > > > >>>>>>>>> support this, such as `kafka.writeTimestamp=true`, then the > > > >>> timestmap > > > >>>>>>> in > > > >>>>>>>>> StreamRecord will be write to Kafka message header. What do > you > > > >>>>>> think? > > > >>>>>>>>> 4c. separate all time attribute concerns into a > special > > > >>> clause > > > >>>>>>> next > > > >>>>>>>> to > > > >>>>>>>>> the regular schema? > > > >>>>>>>>> Separating watermark into a special clause similar to > > > >>>>>> PARTITIONED > > > >>>>>>>> BY is > > > >>>>>>>>> also a good idea. Conceptually, it's fine to put watermark in > > > >>> schema > > > >>>>>>> part > > > >>>>>>>>> or out schema part. But if we want to support multiple > > watermark > > > >>>>>>>>> definition, maybe it would be better to put it in schema > part. > > It > > > >>> is > > > >>>>>>>>> similar to Index Definition that we can define several > indexes > > > >> on a > > > >>>>>>> table > > > >>>>>>>>> in schema part. > > > >>>>>>>>> > > > >>>>>>>>> 4d. How can people come up with a custom watermark > > > >> strategy? > > > >>>>>>>>> In most cases, the built-in strategy can works good. > If > > we > > > >>> need > > > >>>>>> a > > > >>>>>>>>> custom one, we can use a scalar function which restrict to > only > > > >>>>>> return > > > >>>>>>> a > > > >>>>>>>>> nullable Long, and use it in SQL like: WATERMARK for rowtime > AS > > > >>>>>>>>> watermarkUdf(a, b, c). The `watermarkUdf` is a user-defined > > > >> scalar > > > >>>>>>>> function > > > >>>>>>>>> accepts 3 parameters and return a nullable Long which can be > > used > > > >>> as > > > >>>>>>>>> punctuated watermark assigner. Another choice is > implementing a > > > >>> class > > > >>>>>>>>> extending the > > > >>>>>>>>> > `org.apache.flink.table.sources.wmstrategies.WatermarkStrategy` > > > >> and > > > >>>>>> use > > > >>>>>>>> it > > > >>>>>>>>> in SQL: WATERMARK for rowtime AS > 'com.my.MyWatermarkStrategy'. > > > >> But > > > >>> if > > > >>>>>>>>> scalar function can cover the requirements here, I would > prefer > > > >> it > > > >>>>>>> here, > > > >>>>>>>>> because it keeps standard compliant. BTW, this feature is not > > in > > > >>> MVP, > > > >>>>>>> we > > > >>>>>>>>> can discuss it more depth in the future when we need it. > > > >>>>>>>>> > > > >>>>>>>>> 5. Schema declaration: > > > >>>>>>>>> I like the proposal to omit the schema if we can get the > schema > > > >>> from > > > >>>>>>>>> external storage or something schema file. Actually, we have > > > >>> already > > > >>>>>>>>> encountered this requirement in out company. > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> +1 to @Xuefu that we should be as close as possible to Hive > > > >> syntax > > > >>>>>>> while > > > >>>>>>>>> keeping SQL ANSI standard. This will make it more acceptable > > and > > > >>>>>> reduce > > > >>>>>>>> the > > > >>>>>>>>> learning cost for user. > > > >>>>>>>>> > > > >>>>>>>>> [1]: > > > >>>>>>>>> > > > >> > > > > > > https://docs.microsoft.com/en-us/sql/relational-databases/partitions/create-partitioned-tables-and-indexes?view=sql-server-2017 > > > >>>>>>>>> Best, > > > >>>>>>>>> Jark > > > >>>>>>>>> > > > >>>>>>>>> On Thu, 6 Dec 2018 at 12:09, Zhang, Xuefu < > > > >> xuef...@alibaba-inc.com > > > >>>>>>>> wrote: > > > >>>>>>>>>> Hi Timo/Shuyi/Lin, > > > >>>>>>>>>> > > > >>>>>>>>>> Thanks for the discussions. It seems that we are converging > to > > > >>>>>>> something > > > >>>>>>>>>> meaningful. Here are some of my thoughts: > > > >>>>>>>>>> > > > >>>>>>>>>> 1. +1 on MVP DDL > > > >>>>>>>>>> 3. Markers for source or sink seem more about permissions on > > > >>> tables > > > >>>>>>> that > > > >>>>>>>>>> belong to a security component. Unless the table is created > > > >>>>>>> differently > > > >>>>>>>>>> based on source, sink, or both, it doesn't seem necessary to > > use > > > >>>>>> these > > > >>>>>>>>>> keywords to enforce permissions. > > > >>>>>>>>>> 5. It might be okay if schema declaration is always needed. > > > >> While > > > >>>>>>> there > > > >>>>>>>>>> might be some duplication sometimes, it's not always true. > For > > > >>>>>>> example, > > > >>>>>>>>>> external schema may not be exactly matching Flink schema. > For > > > >>>>>>> instance, > > > >>>>>>>>>> data types. Even if so, perfect match is not required. For > > > >>> instance, > > > >>>>>>> the > > > >>>>>>>>>> external schema file may evolve while table schema in Flink > > may > > > >>> stay > > > >>>>>>>>>> unchanged. A responsible reader should be able to scan the > > file > > > >>>>>> based > > > >>>>>>> on > > > >>>>>>>>>> file schema and return the data based on table schema. > > > >>>>>>>>>> > > > >>>>>>>>>> Other aspects: > > > >>>>>>>>>> > > > >>>>>>>>>> 7. Hive compatibility. Since Flink SQL will soon be able to > > > >>> operate > > > >>>>>> on > > > >>>>>>>>>> Hive metadata and data, it's an add-on benefit if we can be > > > >>>>>> compatible > > > >>>>>>>> with > > > >>>>>>>>>> Hive syntax/semantics while following ANSI standard. At > least > > we > > > >>>>>>> should > > > >>>>>>>> be > > > >>>>>>>>>> as close as possible. Hive DDL can found at > > > >>>>>>>>>> > > > >>> > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL > > > >>>>>>>>>> Thanks, > > > >>>>>>>>>> Xuefu > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >> ------------------------------------------------------------------ > > > >>>>>>>>>> Sender:Lin Li <lincoln.8...@gmail.com> > > > >>>>>>>>>> Sent at:2018 Dec 6 (Thu) 10:49 > > > >>>>>>>>>> Recipient:dev <dev@flink.apache.org> > > > >>>>>>>>>> Subject:Re: [DISCUSS] Flink SQL DDL Design > > > >>>>>>>>>> > > > >>>>>>>>>> Hi Timo and Shuyi, > > > >>>>>>>>>> thanks for your feedback. > > > >>>>>>>>>> > > > >>>>>>>>>> 1. Scope > > > >>>>>>>>>> agree with you we should focus on the MVP DDL first. > > > >>>>>>>>>> > > > >>>>>>>>>> 2. Constraints > > > >>>>>>>>>> yes, this can be a follow-up issue. > > > >>>>>>>>>> > > > >>>>>>>>>> 3. Sources/Sinks > > > >>>>>>>>>> If a TABLE has both read/write access requirements, should > we > > > >>>>>> declare > > > >>>>>>> it > > > >>>>>>>>>> using > > > >>>>>>>>>> `CREATE [SOURCE_SINK|BOTH] TABLE tableName ...` ? A further > > > >>>>>> question, > > > >>>>>>>> if a > > > >>>>>>>>>> TABLE > > > >>>>>>>>>> t1 firstly declared as read only (as a source table), then > for > > > >>> some > > > >>>>>>> new > > > >>>>>>>>>> requirements > > > >>>>>>>>>> t1 will change to a sink table, in this case we need > updating > > > >>> both > > > >>>>>>> the > > > >>>>>>>> DDL > > > >>>>>>>>>> and catalogs. > > > >>>>>>>>>> Further more, let's think about the BATCH query, update one > > > >> table > > > >>>>>>>> in-place > > > >>>>>>>>>> can be a common case. > > > >>>>>>>>>> e.g., > > > >>>>>>>>>> ``` > > > >>>>>>>>>> CREATE TABLE t1 ( > > > >>>>>>>>>> col1 varchar, > > > >>>>>>>>>> col2 int, > > > >>>>>>>>>> col3 varchar > > > >>>>>>>>>> ... > > > >>>>>>>>>> ); > > > >>>>>>>>>> > > > >>>>>>>>>> INSERT [OVERWRITE] TABLE t1 > > > >>>>>>>>>> AS > > > >>>>>>>>>> SELECT > > > >>>>>>>>>> (some computing ...) > > > >>>>>>>>>> FROM t1; > > > >>>>>>>>>> ``` > > > >>>>>>>>>> So, let's forget these SOURCE/SINK keywords in DDL. For the > > > >>>>>> validation > > > >>>>>>>>>> purpose, we can find out other ways. > > > >>>>>>>>>> > > > >>>>>>>>>> 4. Time attributes > > > >>>>>>>>>> As Shuyi mentioned before, there exists an > > > >>>>>>>>>> > > `org.apache.flink.table.sources.tsextractors.TimestampExtractor` > > > >>> for > > > >>>>>>>> custom > > > >>>>>>>>>> defined time attributes usage, but this expression based > class > > > >> is > > > >>>>>> more > > > >>>>>>>>>> friendly for table api not the SQL. > > > >>>>>>>>>> ``` > > > >>>>>>>>>> /** > > > >>>>>>>>>> * Provides the an expression to extract the timestamp > > for a > > > >>>>>> rowtime > > > >>>>>>>>>> attribute. > > > >>>>>>>>>> */ > > > >>>>>>>>>> abstract class TimestampExtractor extends > FieldComputer[Long] > > > >> with > > > >>>>>>>>>> Serializable { > > > >>>>>>>>>> > > > >>>>>>>>>> /** Timestamp extractors compute the timestamp as Long. > > */ > > > >>>>>>>>>> override def getReturnType: TypeInformation[Long] = > > > >>>>>>>>>> Types.LONG.asInstanceOf[TypeInformation[Long]] > > > >>>>>>>>>> } > > > >>>>>>>>>> ``` > > > >>>>>>>>>> BTW, I think both the Scalar function and the > > TimestampExtractor > > > >>> are > > > >>>>>>>>>> expressing computing logic, the TimestampExtractor has no > more > > > >>>>>>>> advantage in > > > >>>>>>>>>> SQL scenarios. > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> 6. Partitioning and keys > > > >>>>>>>>>> Primary Key is included in Constraint part, and partitioned > > > >> table > > > >>>>>>>> support > > > >>>>>>>>>> can be another topic later. > > > >>>>>>>>>> > > > >>>>>>>>>> 5. Schema declaration > > > >>>>>>>>>> Agree with you that we can do better schema derivation for > > user > > > >>>>>>>>>> convenience, but this is not conflict with the syntax. > > > >>>>>>>>>> Table properties can carry any useful informations both for > > the > > > >>>>>> users > > > >>>>>>>> and > > > >>>>>>>>>> the framework, I like your `contract name` proposal, > > > >>>>>>>>>> e.g., `WITH (format.type = avro)`, the framework can > recognize > > > >>> some > > > >>>>>>>>>> `contract name` like `format.type`, `connector.type` and > etc. > > > >>>>>>>>>> And also derive the table schema from an existing schema > file > > > >> can > > > >>> be > > > >>>>>>>> handy > > > >>>>>>>>>> especially one with too many table columns. > > > >>>>>>>>>> > > > >>>>>>>>>> Regards, > > > >>>>>>>>>> Lin > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> Timo Walther <twal...@apache.org> 于2018年12月5日周三 下午10:40写道: > > > >>>>>>>>>> > > > >>>>>>>>>>> Hi Jark and Shuyi, > > > >>>>>>>>>>> > > > >>>>>>>>>>> thanks for pushing the DDL efforts forward. I agree that we > > > >>> should > > > >>>>>>> aim > > > >>>>>>>>>>> to combine both Shuyi's design and your design. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Here are a couple of concerns that I think we should > address > > in > > > >>> the > > > >>>>>>>>>> design: > > > >>>>>>>>>>> 1. Scope: Let's focuses on a MVP DDL for CREATE TABLE > > > >> statements > > > >>>>>>> first. > > > >>>>>>>>>>> I think this topic has already enough potential for long > > > >>>>>> discussions > > > >>>>>>>> and > > > >>>>>>>>>>> is very helpful for users. We can discuss CREATE VIEW and > > > >> CREATE > > > >>>>>>>>>>> FUNCTION afterwards as they are not related to each other. > > > >>>>>>>>>>> > > > >>>>>>>>>>> 2. Constraints: I think we should consider things like > > > >>> nullability, > > > >>>>>>>>>>> VARCHAR length, and decimal scale and precision in the > future > > > >> as > > > >>>>>> they > > > >>>>>>>>>>> allow for nice optimizations. However, since both the > > > >> translation > > > >>>>>> and > > > >>>>>>>>>>> runtime operators do not support those features. I would > not > > > >>>>>>> introduce > > > >>>>>>>> a > > > >>>>>>>>>>> arbitrary default value but omit those parameters for now. > > This > > > >>> can > > > >>>>>>> be > > > >>>>>>>> a > > > >>>>>>>>>>> follow-up issue once the basic DDL has been merged. > > > >>>>>>>>>>> > > > >>>>>>>>>>> 3. Sources/Sinks: We had a discussion about CREATE TABLE vs > > > >>> CREATE > > > >>>>>>>>>>> [SOURCE|SINK|] TABLE before. In my opinion we should allow > > for > > > >>>>>> these > > > >>>>>>>>>>> explicit declaration because in most production scenarios, > > > >> teams > > > >>>>>> have > > > >>>>>>>>>>> strict read/write access requirements. For example, a data > > > >>> science > > > >>>>>>> team > > > >>>>>>>>>>> should only consume from a event Kafka topic but should not > > > >>>>>>> accidently > > > >>>>>>>>>>> write back to the single source of truth. > > > >>>>>>>>>>> > > > >>>>>>>>>>> 4. Time attributes: In general, I like your computed > columns > > > >>>>>> approach > > > >>>>>>>>>>> because it makes defining a rowtime attributes transparent > > and > > > >>>>>>> simple. > > > >>>>>>>>>>> However, there are downsides that we should discuss. > > > >>>>>>>>>>> 4a. Jarks current design means that timestamps are in the > > > >> schema > > > >>>>>>> twice. > > > >>>>>>>>>>> The design that is mentioned in [1] makes this more > flexible > > as > > > >>> it > > > >>>>>>>>>>> either allows to replace an existing column or add a > computed > > > >>>>>> column. > > > >>>>>>>>>>> 4b. We need to consider the zoo of storage systems that is > > out > > > >>>>>> there > > > >>>>>>>>>>> right now. Take Kafka as an example, how can we write out a > > > >>>>>> timestamp > > > >>>>>>>>>>> into the message header? We need to think of a reverse > > > >> operation > > > >>>>>> to a > > > >>>>>>>>>>> computed column. > > > >>>>>>>>>>> 4c. Does defining a watermark really fit into the schema > part > > > >> of > > > >>> a > > > >>>>>>>>>>> table? Shouldn't we separate all time attribute concerns > > into a > > > >>>>>>> special > > > >>>>>>>>>>> clause next to the regular schema, similar how PARTITIONED > BY > > > >>> does > > > >>>>>> it > > > >>>>>>>> in > > > >>>>>>>>>>> Hive? > > > >>>>>>>>>>> 4d. How can people come up with a custom watermark > strategy? > > I > > > >>>>>> guess > > > >>>>>>>>>>> this can not be implemented in a scalar function and would > > > >>> require > > > >>>>>>> some > > > >>>>>>>>>>> new type of UDF? > > > >>>>>>>>>>> > > > >>>>>>>>>>> 6. Partitioning and keys: Another question that the DDL > > design > > > >>>>>> should > > > >>>>>>>>>>> answer is how do we express primary keys (for upserts), > > > >>>>>> partitioning > > > >>>>>>>>>>> keys (for Hive, Kafka message keys). All part of the table > > > >>> schema? > > > >>>>>>>>>>> 5. Schema declaration: I find it very annoying that we want > > to > > > >>>>>> force > > > >>>>>>>>>>> people to declare all columns and types again even though > > this > > > >> is > > > >>>>>>>>>>> usually already defined in some company-wide format. I know > > > >> that > > > >>>>>>>> catalog > > > >>>>>>>>>>> support will greatly improve this. But if no catalog is > used, > > > >>>>>> people > > > >>>>>>>>>>> need to manually define a schema with 50+ fields in a Flink > > > >> DDL. > > > >>>>>>> What I > > > >>>>>>>>>>> actually promoted having two ways of reading data: > > > >>>>>>>>>>> > > > >>>>>>>>>>> 1. Either the format derives its schema from the table > > schema. > > > >>>>>>>>>>> CREATE TABLE (col INT) WITH (format.type = avro) > > > >>>>>>>>>>> > > > >>>>>>>>>>> 2. Or the table schema can be omitted and the format schema > > > >>> defines > > > >>>>>>> the > > > >>>>>>>>>>> table schema (+ time attributes). > > > >>>>>>>>>>> CREATE TABLE WITH (format.type = avro, format.schema-file = > > > >>>>>>>>>>> "/my/avrofile.avsc") > > > >>>>>>>>>>> > > > >>>>>>>>>>> Please let me know what you think about each item. I will > try > > > >> to > > > >>>>>>>>>>> incorporate your feedback in [1] this week. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Regards, > > > >>>>>>>>>>> Timo > > > >>>>>>>>>>> > > > >>>>>>>>>>> [1] > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >> > > > > > > https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf > > > >>>>>>>>>>> Am 05.12.18 um 13:01 schrieb Jark Wu: > > > >>>>>>>>>>>> Hi Shuyi, > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> It's exciting to see we can make such a great progress > here. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Regarding to the watermark: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Watermarks can be defined on any columns (including > > > >>>>>> computed-column) > > > >>>>>>>> in > > > >>>>>>>>>>>> table schema. > > > >>>>>>>>>>>> The computed column can be computed from existing columns > > > >> using > > > >>>>>>>> builtin > > > >>>>>>>>>>>> functions and *UserDefinedFunctions* (ScalarFunction). > > > >>>>>>>>>>>> So IMO, it can work out for almost all the scenarios not > > only > > > >>>>>> common > > > >>>>>>>>>>>> scenarios. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> I don't think using a `TimestampExtractor` to support > custom > > > >>>>>>> timestamp > > > >>>>>>>>>>>> extractor in SQL is a good idea. Because > > `TimestampExtractor` > > > >>>>>>>>>>>> is not a SQL standard function. If we support > > > >>> `TimestampExtractor` > > > >>>>>>> in > > > >>>>>>>>>>> SQL, > > > >>>>>>>>>>>> do we need to support CREATE FUNCTION for > > > >> `TimestampExtractor`? > > > >>>>>>>>>>>> I think `ScalarFunction` can do the same thing with > > > >>>>>>>>>> `TimestampExtractor` > > > >>>>>>>>>>>> but more powerful and standard. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> The core idea of the watermark definition syntax is that > the > > > >>>>>> schema > > > >>>>>>>>>> part > > > >>>>>>>>>>>> defines all the columns of the table, it is exactly what > the > > > >>> query > > > >>>>>>>>>> sees. > > > >>>>>>>>>>>> The watermark part is something like a primary key > > definition > > > >> or > > > >>>>>>>>>>> constraint > > > >>>>>>>>>>>> on SQL Table, it has no side effect on the schema, only > > > >> defines > > > >>>>>> what > > > >>>>>>>>>>>> watermark strategy is and makes which field as the rowtime > > > >>>>>> attribute > > > >>>>>>>>>>> field. > > > >>>>>>>>>>>> If the rowtime field is not in the existing fields, we can > > use > > > >>>>>>>> computed > > > >>>>>>>>>>>> column > > > >>>>>>>>>>>> to generate it from other existing fields. The Descriptor > > > >>> Pattern > > > >>>>>>> API > > > >>>>>>>>>> [1] > > > >>>>>>>>>>>> is very useful when writing a Table API job, but is not > > > >>>>>>> contradictory > > > >>>>>>>>>> to > > > >>>>>>>>>>>> the > > > >>>>>>>>>>>> Watermark DDL from my perspective. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> [1]: > > > >>>>>>>>>>>> > > > >> > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes > > > >>>>>>>>>>>> . > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Best, > > > >>>>>>>>>>>> Jark > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On Wed, 5 Dec 2018 at 17:58, Shuyi Chen < > suez1...@gmail.com > > > > > > >>>>>> wrote: > > > >>>>>>>>>>>>> Hi Jark and Shaoxuan, > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Thanks a lot for the summary. I think we are making great > > > >>>>>> progress > > > >>>>>>>>>> here. > > > >>>>>>>>>>>>> Below are my thoughts. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> *(1) watermark definition > > > >>>>>>>>>>>>> IMO, it's better to keep it consistent with the rowtime > > > >>>>>> extractors > > > >>>>>>>> and > > > >>>>>>>>>>>>> watermark strategies defined in > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >> > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes > > > >>>>>>>>>>>>> . > > > >>>>>>>>>>>>> Using built-in functions seems to be too much for most of > > the > > > >>>>>>> common > > > >>>>>>>>>>>>> scenarios. > > > >>>>>>>>>>>>> *(2) CREATE SOURCE/SINK TABLE or CREATE TABLE > > > >>>>>>>>>>>>> Actually, I think we can put the source/sink type info > into > > > >> the > > > >>>>>>> table > > > >>>>>>>>>>>>> properties, so we can use CREATE TABLE. > > > >>>>>>>>>>>>> (3) View DDL with properties > > > >>>>>>>>>>>>> We can remove the view properties section now for the MVP > > and > > > >>> add > > > >>>>>>> it > > > >>>>>>>>>>> back > > > >>>>>>>>>>>>> later if needed. > > > >>>>>>>>>>>>> (4) Type Definition > > > >>>>>>>>>>>>> I agree we can put the type length or precision into > future > > > >>>>>>> versions. > > > >>>>>>>>>> As > > > >>>>>>>>>>>>> for the grammar difference, currently, I am using the > > grammar > > > >>> in > > > >>>>>>>>>> Calcite > > > >>>>>>>>>>>>> type DDL, but since we'll extend the parser in Flink, so > we > > > >> can > > > >>>>>>>>>>> definitely > > > >>>>>>>>>>>>> change if needed. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Shuyi > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> On Tue, Dec 4, 2018 at 10:48 PM Jark Wu < > imj...@gmail.com> > > > >>>>>> wrote: > > > >>>>>>>>>>>>>> Hi Shaoxuan, > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Thanks for pointing that out. Yes, the source/sink tag > on > > > >>> create > > > >>>>>>>>>> table > > > >>>>>>>>>>> is > > > >>>>>>>>>>>>>> the another major difference. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Summarize the main differences again: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> *(1) watermark definition > > > >>>>>>>>>>>>>> *(2) CREATE SOURCE/SINK TABLE or CREATE TABLE > > > >>>>>>>>>>>>>> (3) View DDL with properties > > > >>>>>>>>>>>>>> (4) Type Definition > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>> Jark > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On Wed, 5 Dec 2018 at 14:08, Shaoxuan Wang < > > > >>> wshaox...@gmail.com > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>> Hi Jark, > > > >>>>>>>>>>>>>>> Thanks for the summary. Your plan for the 1st round > > > >>>>>>> implementation > > > >>>>>>>>>> of > > > >>>>>>>>>>>>> DDL > > > >>>>>>>>>>>>>>> looks good to me. > > > >>>>>>>>>>>>>>> Have we reached the agreement on simplifying/unifying > > > >> "create > > > >>>>>>>>>>>>>> [source/sink] > > > >>>>>>>>>>>>>>> table" to "create table"? "Watermark definition" and > > > >> "create > > > >>>>>>> table" > > > >>>>>>>>>>> are > > > >>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>> major obstacles on the way to merge two design > proposals > > > >>> FMPOV. > > > >>>>>>>>>>> @Shuyi, > > > >>>>>>>>>>>>>> It > > > >>>>>>>>>>>>>>> would be great if you can spend time and respond to > these > > > >> two > > > >>>>>>> parts > > > >>>>>>>>>>>>>> first. > > > >>>>>>>>>>>>>>> Regards, > > > >>>>>>>>>>>>>>> Shaoxuan > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> On Wed, Dec 5, 2018 at 12:20 PM Jark Wu < > > imj...@gmail.com> > > > >>>>>>> wrote: > > > >>>>>>>>>>>>>>>> Hi Shuyi, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> It seems that you have reviewed the DDL doc [1] that > Lin > > > >>> and I > > > >>>>>>>>>>>>> drafted. > > > >>>>>>>>>>>>>>>> This doc covers all the features running in Alibaba. > > > >>>>>>>>>>>>>>>> But some of features might be not needed in the first > > > >>> version > > > >>>>>> of > > > >>>>>>>>>>>>> Flink > > > >>>>>>>>>>>>>>> SQL > > > >>>>>>>>>>>>>>>> DDL. > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> So my suggestion would be to focus on the MVP DDLs and > > > >> reach > > > >>>>>>>>>>>>> agreement > > > >>>>>>>>>>>>>>> ASAP > > > >>>>>>>>>>>>>>>> based on the DDL draft [1] and the DDL design [2] > Shuyi > > > >>>>>>> proposed. > > > >>>>>>>>>>>>>>>> And we can discuss on the main differences one by one. > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> The following is the MVP DDLs should be included in > the > > > >>> first > > > >>>>>>>>>> version > > > >>>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>> my > > > >>>>>>>>>>>>>>>> opinion (feedbacks are welcome): > > > >>>>>>>>>>>>>>>> (1) Table DDL: > > > >>>>>>>>>>>>>>>> (1.1) Type definition > > > >>>>>>>>>>>>>>>> (1.2) computed column definition > > > >>>>>>>>>>>>>>>> (1.3) watermark definition > > > >>>>>>>>>>>>>>>> (1.4) with properties > > > >>>>>>>>>>>>>>>> (1.5) table constraint (primary key/unique) > > > >>>>>>>>>>>>>>>> (1.6) column nullability (nice to have) > > > >>>>>>>>>>>>>>>> (2) View DDL > > > >>>>>>>>>>>>>>>> (3) Function DDL > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> The main differences from two DDL docs (sth maybe > > missed, > > > >>>>>>> welcome > > > >>>>>>>>>> to > > > >>>>>>>>>>>>>>> point > > > >>>>>>>>>>>>>>>> out): > > > >>>>>>>>>>>>>>>> *(1.3) watermark*: this is the main and the most > > important > > > >>>>>>>>>>>>> difference, > > > >>>>>>>>>>>>>> it > > > >>>>>>>>>>>>>>>> would be great if @Timo Walther <twal...@apache.org> > > > >>> @Fabian > > > >>>>>>>>>> Hueske > > > >>>>>>>>>>>>>>>> <fhue...@gmail.com> give some feedbacks. > > > >>>>>>>>>>>>>>>> (1.1) Type definition: > > > >>>>>>>>>>>>>>>> (a) Should VARCHAR carry a length, e.g. > > > >>> VARCHAR(128) > > > >>>> ? > > > >>>>>>>>>>>>>>>> In most cases, the varchar length is > not > > > >> used > > > >>>>>>> because > > > >>>>>>>>>>> they > > > >>>>>>>>>>>>>> are > > > >>>>>>>>>>>>>>>> stored as String in Flink. But it can be used to > > optimize > > > >> in > > > >>>>>> the > > > >>>>>>>>>>>>> future > > > >>>>>>>>>>>>>>> if > > > >>>>>>>>>>>>>>>> we know the column is a fixed length VARCHAR. > > > >>>>>>>>>>>>>>>> So IMO, we can support VARCHAR with > > length > > > >> in > > > >>>> the > > > >>>>>>>>>> future, > > > >>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>> just VARCHAR in this version. > > > >>>>>>>>>>>>>>>> (b) Should DECIMAL support custom scale and > > > >>>> precision, > > > >>>>>>>> e.g. > > > >>>>>>>>>>>>>>>> DECIMAL(12, 5)? > > > >>>>>>>>>>>>>>>> If we clearly know the scale and > > precision > > > >> of > > > >>>> the > > > >>>>>>>>>>> Decimal, > > > >>>>>>>>>>>>>> we > > > >>>>>>>>>>>>>>>> can have some optimization on > > > >> serialization/deserialization. > > > >>>>>>> IMO, > > > >>>>>>>>>> we > > > >>>>>>>>>>>>>> can > > > >