Sounds great, thanks for the effort, Shuyi. Best, Kurt
On Wed, Dec 12, 2018 at 5:14 PM Shuyi Chen <suez1...@gmail.com> wrote: > Hi all, > > I summarize the MVP based on the features that we agreed upon. For table > update mode and custom watermark strategy and ts extractor, I found there > are some discussions, so I decided to leave them out for the MVP. > For row/map/array data type, I think we can add it as well if everyone > agrees. > > > 4) Event-Time Attributes and Watermarks > Cited from SQL Server 2017 document ( > > https://docs.microsoft.com/en-us/sql/relational-databases/tables/specify-computed-columns-in-a-table?view=sql-server-2017 > ), > "A > computed column is a virtual column that is not physically stored in the > table, unless the column is marked PERSISTED. A computed column expression > can use data from other columns to calculate a value for the column to > which it belongs. " I think we can also use introduce the PERSISTED keyword > for computed column to indicate that the field can be stored back to the > table, i.e. ts AS SYSTEMROWTIME() PERSISTED. > > 3) SOURCE / SINK / BOTH > GRANT/INVOKE sounds like a more standard option than adding a property to > CREATE TABLE to manage the ACL/permission. The ACL can be stored somewhere > in a database, and allow/disallow access to a dynamic table depending on > whether it's a "INSERT INTO" or "SELECT". > > I can volunteer to put the discussion as a FLIP. I can try to summarize > the current discussion, and share edit permission with you to collaborate > on the documents. After we finalized the doc, we can publish it as a FLIP. > What do you think? > > Shuyi > > > > On Tue, Dec 11, 2018 at 9:13 AM Timo Walther <twal...@apache.org> wrote: > > > Hi all, > > > > thanks for summarizing the discussion @Shuyi. I think we need to include > > the "table update mode" problem as it might not be changed easily in the > > future. Regarding "support row/map/array data type", I don't see a > > problem why we should not support them now as the data types are already > > included in the runtime. The "support custom timestamp extractor" is > > solved by the computed columns approach. The "custom watermark strategy" > > can be added by supplying a class name as paramter in my opinion. > > > > Regarding the comments of Lin and Jark: > > > > @Lin: Instantiating a TableSource/Sink should not cost much, but we > > should not mix catalog discussion and DDL at this point. > > > > 4) Event-Time Attributes and Watermarks > > 4.b) Regarding `ts AS SYSTEMROWTIME()` and Lin's comment about "will > > violate the rule": there is no explicit rule of doing so. Computed > > column are also not standard compliant, if we can use information that > > is encoded in constraints we should use it. Adding more and more > > top-level properties makes the interaction with connectors more > > difficult. An additional HEADER keyword sounds too connector-specific > > and also not SQL compliant to me. > > > > 3) SOURCE / SINK / BOTH > > GRANT/INVOKE are mutating an existing table, right? In my opinion, > > independent of SQL databases but focusing on Flink user requirements, a > > CREATE TABLE statement should be an immutable definition of a connection > > to an external system. > > > > 7) Table Update Mode > > As far as I can see, the only thing missing for enabling all table modes > > is the declaration of a change flag. We could introduce a new keyword > > here similar to WATERMARK: > > > > CREATE TABLE output_kafka_t1( > > id bigint, > > msg varchar, > > CHANGE_FLAG FOR isRetraction > > ) WITH ( > > type=kafka > > ,... > > ); > > > > CREATE TABLE output_kafka_t1( > > CHANGE_FLAG FOR isUpsert > > id bigint, > > msg varchar, > > PRIMARY_KEY(id) > > ) WITH ( > > type=kafka > > ,... > > ); > > > > What do you think? > > > > @Jark: We should definitely stage the discussions and mention the > > opinions and advantages/disadvantages that have been proposed already in > > the FLIP. > > > > Regards, > > Timo > > > > Am 10.12.18 um 08:10 schrieb Jark Wu: > > > Hi all, > > > > > > It's great to see we have an agreement on MVP. > > > > > > 4.b) Ingesting and writing timestamps to systems. > > > I would treat the field as a physical column not a virtual column. If > we > > > treat it as computed column, it will be confused that the behavior is > > > different when it is a source or sink. > > > When it is a physical column, the behavior could be unified. Then the > > > problem is how to mapping from the field to kafka message timestamp? > > > One is Lin proposed above and is also used in KSQL[1]. Another idea is > > > introducing a HEADER column which strictly map by name to the fields in > > > message header. > > > For example, > > > > > > CREATE TABLE output_kafka_t1( > > > id bigint, > > > ts timestamp HEADER, > > > msg varchar > > > ) WITH ( > > > type=kafka > > > ,... > > > ); > > > > > > This is used in Alibaba but not included in the DDL draft. It will > > further > > > extend the SQL syntax, which is we should be cautious about. What do > you > > > think about this two solutions? > > > > > > 4.d) Custom watermark strategies: > > > @Timo, I don't have a strong opinion on this. > > > > > > 3) SOURCE/SINK/BOTH > > > Agree with Lin, GRANT/INVOKE [SELECT|UPDATE] ON TABLE is a clean and > > > standard way to manage the permission, which is also adopted by HIVE[2] > > and > > > many databases. > > > > > > [1]: > https://docs.confluent.io/current/ksql/docs/tutorials/examples.html > > > [2]: > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=45876173#Hivedeprecatedauthorizationmode/LegacyMode-Grant/RevokePrivileges > > > > > > @Timo, it's great if someone can conclude the discussion and summarize > > into > > > a FLIP. > > > @Shuyi, Thanks a lot for putting it all together. The google doc looks > > good > > > to me, and I left some minor comments there. > > > > > > Regarding to the FLIP, I have some suggestions: > > > 1. The FLIP can contain MILESTONE1 and FUTURE WORKS. > > > 2. The MILESTONE1 is the MVP. It describes the MVP DDL syntax. > > > 3. Separate FUTURE WORKS into two parts: UNDER DISCUSSION and ADOPTED. > We > > > can derive MILESTONE2 from this easily when it is ready. > > > > > > I summarized the Future Works based on Shuyi's work: > > > > > > Adopted: (Should detailed described here...) > > > 1. support data type nullability and precision. > > > 2. comment on table and columns. > > > > > > Under Discussion: (Should briefly describe some options...) > > > 1. Ingesting and writing timestamps to systems. > > > 2. support custom watermark strategy. > > > 3. support table update mode > > > 4. support row/map/array data type > > > 5. support schema derivation > > > 6. support system versioned temporal table > > > 7. support table index > > > > > > We can continue the further discussion here, also can separate to an > > other > > > DISCUSS topic if it is a sophisticated problem such as Table Update > Mode, > > > Temporal Table. > > > > > > Best, > > > Jark > > > > > > On Mon, 10 Dec 2018 at 11:54, Lin Li <lincoln.8...@gmail.com> wrote: > > > > > >> hi all, > > >> Thanks for your valuable input! > > >> > > >> 4) Event-Time Attributes and Watermarks: > > >> 4.b) @Fabian As you mentioned using a computed columns `ts AS > > >> SYSTEMROWTIME()` > > >> for writing out to kafka table sink will violate the rule that > computed > > >> fields are not emitted. > > >> Since the timestamp column in kafka's header area is a specific > > >> materialization protocol, > > >> why don't we treat it as an connector property? For an example: > > >> ``` > > >> CREATE TABLE output_kafka_t1( > > >> id bigint, > > >> ts timestamp, > > >> msg varchar > > >> ) WITH ( > > >> type=kafka, > > >> header.timestamp=ts > > >> ,... > > >> ); > > >> ``` > > >> > > >> 4d) For custom watermark strategies > > >> @Fabian Agree with you that opening another topic about this feature > > later. > > >> > > >> 3) SOURCE / SINK / BOTH > > >> I think the permissions and availabilities are two separately things, > > >> permissions > > >> can be managed well by using GRANT/INVOKE(you can call it DCL) > solutions > > >> which > > >> commonly used in different DBs. The permission part can be an new > topic > > for > > >> later discussion, what do you think? > > >> > > >> For the availabilities, @Fabian @Timo I've another question, > > >> does instantiate a TableSource/Sink cost much or has some other > > downsides? > > >> IMO, create a new source/sink object via the construct seems not > costly. > > >> When receiving a DDL we should associate it with the catalog object > > >> (reusing an existence or create a new one). > > >> Am I lost something important? > > >> > > >> 5. Schema declaration: > > >> @Timo yes, your concern about the user convenience is very important. > > But > > >> I haven't seen a clear way to solve this so far. > > >> Do we put it later and wait for more inputs from the community? > > >> > > >> Shuyi Chen <suez1...@gmail.com> 于2018年12月8日周六 下午4:27写道: > > >> > > >>> Hi all, > > >>> > > >>> Thanks a lot for the great discussion. I think we can continue the > > >>> discussion here while carving out a MVP so that the community can > start > > >>> working on. Based on the discussion so far, I try to summarize what > we > > >> will > > >>> do for the MVP: > > >>> > > >>> MVP > > >>> > > >>> 1. support CREATE TABLE > > >>> 2. support exisiting data type in Flink SQL, ignore nullability > and > > >>> precision > > >>> 3. support table comments and column comments > > >>> 4. support table constraint PRIMARY KEY and UNIQUE > > >>> 5. support table properties using key-value pairs > > >>> 6. support partitioned by > > >>> 7. support computed column > > >>> 8. support from-field and from-source timestamp extractors > > >>> 9. support PERIODIC-ASCENDING, PERIODIC-BOUNDED, FROM-SOURCE > > watermark > > >>> strategies. > > >>> 10. support a table property to allow explicit enforcement of > > >>> read/write(source/sink) permission of a table > > >>> > > >>> I try to put up the DDL grammar ( > > >>> > > >>> > > >> > > > https://docs.google.com/document/d/1ug1-aVBSCxZQk58kR-yaK2ETCgL3zg0eDUVGCnW2V9E/edit?usp=sharing > > >>> ) > > >>> based on the MVP features above and the previous design docs. Please > > >> take a > > >>> look and comment on it. > > >>> > > >>> > > >>> Also, I summarize the future Improvement on CREATE TABLE as the > > >> followings: > > >>> 1. support table update mode > > >>> 2. support data type nullability and precision > > >>> 3. support row/map/array data type > > >>> 4. support custom timestamp extractor and watermark strategy > > >>> 5. support schema derivation > > >>> 6. support system versioned temporal table > > >>> 7. support table index > > >>> > > >>> I suggest we first agree on the MVP feature list and the MVP grammar. > > And > > >>> then we can either continue the discussion of the future improvements > > >> here, > > >>> or create separate JIRAs for each item and discuss further in the > JIRA. > > >>> What do you guys think? > > >>> > > >>> Shuyi > > >>> > > >>> On Fri, Dec 7, 2018 at 7:54 AM Timo Walther <twal...@apache.org> > > wrote: > > >>> > > >>>> Hi all, > > >>>> > > >>>> I think we are making good progress. Thanks for all the feedback so > > >> far. > > >>>> 3. Sources/Sinks: > > >>>> It seems that I can not find supporters for explicit SOURCE/SINK > > >>>> declaration so I'm fine with not using those keywords. > > >>>> @Fabian: Maybe we don't haven have to change the TableFactory > > interface > > >>>> but just provide some helper functions in the TableFactoryService. > > This > > >>>> would solve the availability problem, but the permission problem > would > > >>>> still not be solved. If you are fine with it, we could introduce a > > >>>> property instead? > > >>>> > > >>>> 5. Schema declaration: > > >>>> @Lin: We should find an agreement on this as it requires changes to > > the > > >>>> TableFactory interface. We should minimize changes to this interface > > >>>> because it is user-facing. Especially, if format schema and table > > >> schema > > >>>> differ, the need for such a functionality is very important. Our > goal > > >> is > > >>>> to connect to existing infrastructure. For example, if we are using > > >> Avro > > >>>> and the existing Avro format has enums but Flink SQL does not > support > > >>>> enums, it would be helpful to let the Avro format derive a table > > >> schema. > > >>>> Otherwise your need to declare both schemas which leads to CREATE > > TABLE > > >>>> statements of 400 lines+. > > >>>> I think the mentioned query: > > >>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro, > > >>>> format.schema-file = "/my/avrofile.avsc") > > >>>> is fine and should only be valid if the schema contains no > > non-computed > > >>>> columns. > > >>>> > > >>>> 7. Table Update Mode: > > >>>> After thinking about it again, I agree. The mode of the sinks can be > > >>>> derived from the query and the existence of a PRIMARY KEY > declaration. > > >>>> But Fabian raised a very good point. How do we deal with sources? > > Shall > > >>>> we introduce a new keywords similar to WATERMARKS such that a > > >>>> upsert/retract flag is not part of the visible schema? > > >>>> > > >>>> 4a. How to mark a field as attribute? > > >>>> @Jark: Thanks for the explanation of the WATERMARK clause semantics. > > >>>> This is a nice way of marking existing fields. This sounds good to > me. > > >>>> > > >>>> 4c) WATERMARK as constraint > > >>>> I'm fine with leaving the WATERMARK clause in the schema definition. > > >>>> > > >>>> 4d) Custom watermark strategies: > > >>>> I would already think about custom watermark strategies as the > current > > >>>> descriptor design already supports this. ScalarFunction's don't work > > as > > >>>> a PeriodicWatermarkAssigner has different semantics. Why not simply > > >>>> entering the a full class name here as it is done in the current > > >> design? > > >>>> 4.b) Ingesting and writing timestamps to systems (like Kafka) > > >>>> @Fabian: Yes, your suggestion sounds good to me. This behavior would > > be > > >>>> similar to our current `timestamps: from-source` design. > > >>>> > > >>>> Once our discussion has found a conclusion, I would like to > volunteer > > >>>> and summarize the outcome of this mailing thread. It nicely aligns > > with > > >>>> the update work on the connector improvements document (that I > wanted > > >> to > > >>>> do anyway) and the ongoing external catalog discussion. > Furthermore, I > > >>>> would also want to propose how to change existing interfaces by > > keeping > > >>>> the DDL, connector improvements, and external catalog support in > mind. > > >>>> Would that be ok for you? > > >>>> > > >>>> Thanks, > > >>>> Timo > > >>>> > > >>>> > > >>>> > > >>>> Am 07.12.18 um 14:48 schrieb Fabian Hueske: > > >>>>> Hi all, > > >>>>> > > >>>>> Thanks for the discussion. > > >>>>> I'd like to share my point of view as well. > > >>>>> > > >>>>> 4) Event-Time Attributes and Watermarks: > > >>>>> 4.a) I agree with Lin and Jark's proposal. Declaring a watermark on > > >> an > > >>>>> attribute declares it as an event-time attribute. > > >>>>> 4.b) Ingesting and writing timestamps to systems (like Kafka). We > > >> could > > >>>> use > > >>>>> a special function like (ts AS SYSTEMROWTIME()). This function will > > >>>>> indicate that we read the timestamp directly from the system (and > not > > >>> the > > >>>>> data). We can also write the field back to the system when emitting > > >> the > > >>>>> table (violating the rule that computed fields are not emitted). > > >>>>> 4c) I would treat WATERMARK similar to a PRIMARY KEY or UNIQUE KEY > > >>>>> constraint and therefore keep it in the schema definition. > > >>>>> 4d) For custom watermark strategies, a simple expressions or > > >>>>> ScalarFunctions won't be sufficient. Sophisticated approaches could > > >>>> collect > > >>>>> histograms, etc. But I think we can leave that out for later. > > >>>>> > > >>>>> 3) SOURCE / SINK / BOTH > > >>>>> As you said, there are two things to consider here: permission and > > >>>>> availability of a TableSource/TableSink. > > >>>>> I think that neither should be a reason to add a keyword at such a > > >>>>> sensitive position. > > >>>>> However, I also see Timo's point that it would be good to know > > >> up-front > > >>>> how > > >>>>> a table can be used without trying to instantiate a > TableSource/Sink > > >>> for > > >>>> a > > >>>>> query. > > >>>>> Maybe we can extend the TableFactory such that it provides > > >> information > > >>>>> about which sources/sinks it can provide. > > >>>>> > > >>>>> 7. Table Update Mode > > >>>>> Something that we definitely need to consider is how tables are > > >>> ingested, > > >>>>> i.e., append, retract or upsert. > > >>>>> Especially, since upsert and retraction need a meta-data column > that > > >>>>> indicates whether an event is an insert (or upsert) or a delete > > >> change. > > >>>>> This column needs to be identified somehow, most likely as part of > > >> the > > >>>>> input format. Ideally, this column should not be part of the table > > >>> schema > > >>>>> (as it would be always true). > > >>>>> Emitting tables is not so much of an issue as the properties of the > > >>> table > > >>>>> tell use what to do (append-only/update, unique key y/n). > > >>>>> > > >>>>> Best, > > >>>>> Fabian > > >>>>> > > >>>>> > > >>>>> Am Fr., 7. Dez. 2018 um 10:39 Uhr schrieb Jark Wu < > imj...@gmail.com > > >>> : > > >>>>>> Hi Timo, > > >>>>>> > > >>>>>> Thanks for your quickly feedback! Here are some of my thoughts: > > >>>>>> > > >>>>>> Append, upserts, retract mode on sinks is also a very complex > > >>> problem. I > > >>>>>> think append/upserts/retract is the ability of a table, user do > not > > >>>> need to > > >>>>>> specify a table is used for append or retraction or upsert. The > > >> query > > >>>> can > > >>>>>> choose which mode the sink is. If an unbounded groupby is inserted > > >>> into > > >>>> an > > >>>>>> append sink (the sink only implements/supports append), an > exception > > >>>> can be > > >>>>>> thrown. A more complex problem is, if we want to write > > >>>> retractions/upserts > > >>>>>> to Kafka, how to encode the change flag (add or retract/delete) on > > >> the > > >>>>>> table? Maybe we should propose some protocal for the change flag > > >>>> encoding, > > >>>>>> but I don't have a clear idea about this right now. > > >>>>>> > > >>>>>> 3. Sources/Sinks: The source/sink tag is similar to the > > >>>>>> append/upsert/retract problem. Besides source/sink, actully we > have > > >>>> stream > > >>>>>> source, stream sink, batch source, batch sink, and the stream sink > > >>> also > > >>>>>> include append/upsert/retract three modes. Should we put all the > > >> tags > > >>> on > > >>>>>> the CREATE TABLE? IMO, the table's ability is defined by the table > > >>>> itself, > > >>>>>> user do not need to specify it. If it is only a readable table, an > > >>>>>> exception can be thrown when write to it. As the source/sink tag > can > > >>> be > > >>>>>> omitted in CREATE TABLE, could we skip it and only support CREATE > > >>> TABLE > > >>>> in > > >>>>>> the first version, and add it back in the future when we really > need > > >>>> it? It > > >>>>>> keeps API compatible and make sure the MVP is what we consider > > >>> clearly. > > >>>>>> 4a. How to mark a field as attribute? > > >>>>>> The watermark definition includes two parts: use which field as > time > > >>>>>> attribute and use what generate strategy. > > >>>>>> When we want to mark `ts` field as attribute: WATERMARK FOR `ts` > AS > > >>>> OFFSET > > >>>>>> '5' SECOND. > > >>>>>> If we have a POJO{id, user, ts} field named "pojo", we can mark it > > >>> like > > >>>>>> this: WATERMARK FOR pojo.ts AS OFFSET '5' SECOND > > >>>>>> > > >>>>>> 4b. timestamp write to Kafka message header > > >>>>>> Even though we can define multiple time attribute on a table, only > > >> one > > >>>> time > > >>>>>> attribute can be actived/used in a query (in a stream). When we > > >> enable > > >>>>>> `writeTiemstamp`, the only attribute actived in the stream will be > > >>>> write to > > >>>>>> Kafka message header. What I mean the timestmap in StreamRecord is > > >> the > > >>>> time > > >>>>>> attribute in the stream. > > >>>>>> > > >>>>>> 4c. Yes. We introduced the WATERMARK keyword similar to the INDEX, > > >>>> PRIMARY > > >>>>>> KEY keywords. > > >>>>>> > > >>>>>> @Timo, Do you have any other advice or questions on the watermark > > >>>> syntax ? > > >>>>>> For example, the builtin strategy name: "BOUNDED WITH OFFSET" VS > > >>>> "OFFSET" > > >>>>>> VS ... > > >>>>>> > > >>>>>> > > >>>>>> Cheers, > > >>>>>> Jark > > >>>>>> > > >>>>>> On Fri, 7 Dec 2018 at 17:13, Lin Li <lincoln.8...@gmail.com> > wrote: > > >>>>>> > > >>>>>>> Hi Timo, > > >>>>>>> Thanks for your feedback, here's some thoughts of mine: > > >>>>>>> > > >>>>>>> 3. Sources/Sinks: > > >>>>>>> "Let's assume an interactive CLI session, people should be able > to > > >>> list > > >>>>>> all > > >>>>>>> source table and sink tables to know upfront if they can use an > > >>> INSERT > > >>>>>> INTO > > >>>>>>> here or not." > > >>>>>>> This requirement can be simply resolved by a document that list > all > > >>>>>>> supported source/sink/both connectors and the sql-client can > > >> perform > > >>> a > > >>>>>>> quick check. It's only an implementation choice, not necessary > for > > >>> the > > >>>>>>> syntax. > > >>>>>>> For connector implementation, a connector may implement one or > some > > >>> or > > >>>>>> all > > >>>>>>> of the [Stream|Batch]Source/[Stream|Batch]Sink traits, we can > > >> derive > > >>>> the > > >>>>>>> availability for any give query without the SOURCE/SINk keywords > or > > >>>>>>> specific table properties in WITH clause. > > >>>>>>> Since there's still indeterminacy, shall we skip these two > keywords > > >>> for > > >>>>>> the > > >>>>>>> MVP DDL? We can make further discussion after users' feedback. > > >>>>>>> > > >>>>>>> 6. Partitioning and keys > > >>>>>>> Agree with you that raise the priority of table constraint and > > >>>>>> partitioned > > >>>>>>> table support for better connectivity to Hive and Kafka. I'll add > > >>>>>>> partitioned table syntax(compatible to hive) into the DDL Draft > doc > > >>>>>>> later[1]. > > >>>>>>> > > >>>>>>> 5. Schema declaration > > >>>>>>> "if users want to declare computed columns they have a "schema" > > >>>>>> constraints > > >>>>>>> but without columns > > >>>>>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro, > > >>>>>>> format.schema-file = "/my/avrofile.avsc") " > > >>>>>>> > > >>>>>>> From the point of my view, this ddl is invalid because the > > primary > > >>> key > > >>>>>>> constraint already references two columns but types unseen. > > >>>>>>> And Xuefu pointed a important matching problem, so let's put > schema > > >>>>>>> derivation as a follow-up extension ? > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> Timo Walther <twal...@apache.org> 于2018年12月6日周四 下午6:05写道: > > >>>>>>> > > >>>>>>>> Hi everyone, > > >>>>>>>> > > >>>>>>>> great to have such a lively discussion. My next batch of > feedback: > > >>>>>>>> > > >>>>>>>> @Jark: We don't need to align the descriptor approach with SQL. > > >> I'm > > >>>>>> open > > >>>>>>>> for different approaches as long as we can serve a broad set of > > >> use > > >>>>>>>> cases and systems. The descriptor approach was a first attempt > to > > >>>> cover > > >>>>>>>> all aspects and connector/format characteristics. Just another > > >>>> example, > > >>>>>>>> that is missing in the DDL design: How can a user decide if > > >> append, > > >>>>>>>> retraction, or upserts should be used to sink data into the > target > > >>>>>>>> system? Do we want to define all these improtant properties in > the > > >>> big > > >>>>>>>> WITH property map? If yes, we are already close to the > descriptor > > >>>>>>>> approach. Regarding the "standard way", most DDL languages have > > >> very > > >>>>>>>> custom syntax so there is not a real "standard". > > >>>>>>>> > > >>>>>>>> 3. Sources/Sinks: @Lin: If a table has both read/write access it > > >> can > > >>>> be > > >>>>>>>> created using a regular CREATE TABLE (omitting a specific > > >>> source/sink) > > >>>>>>>> declaration. Regarding the transition from source/sink to both, > > >> yes > > >>> we > > >>>>>>>> would need to update the a DDL and catalogs. But is this a > > >> problem? > > >>>> One > > >>>>>>>> also needs to add new queries that use the tables. @Xuefu: It is > > >> not > > >>>>>>>> only about security aspects. Especially for streaming use cases, > > >> not > > >>>>>>>> every connector can be used as a source easily. For example, a > > >> JDBC > > >>>>>> sink > > >>>>>>>> is easier than a JDBC source. Let's assume an interactive CLI > > >>> session, > > >>>>>>>> people should be able to list all source table and sink tables > to > > >>> know > > >>>>>>>> upfront if they can use an INSERT INTO here or not. > > >>>>>>>> > > >>>>>>>> 6. Partitioning and keys: @Lin: I would like to include this in > > >> the > > >>>>>>>> design given that Hive integration and Kafka key support are in > > >> the > > >>>>>>>> making/are on our roadmap for this release. > > >>>>>>>> > > >>>>>>>> 5. Schema declaration: @Lin: You are right it is not > conflicting. > > >> I > > >>>>>> just > > >>>>>>>> wanted to raise the point because if users want to declare > > >> computed > > >>>>>>>> columns they have a "schema" constraints but without columns. > Are > > >> we > > >>>> ok > > >>>>>>>> with a syntax like ... > > >>>>>>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro, > > >>>>>>>> format.schema-file = "/my/avrofile.avsc") ? > > >>>>>>>> @Xuefu: Yes, you are right that an external schema might not > > >> excatly > > >>>>>>>> match but this is true for both directions: > > >>>>>>>> table schema "derives" format schema and format schema "derives" > > >>> table > > >>>>>>>> schema. > > >>>>>>>> > > >>>>>>>> 7. Hive compatibility: @Xuefu: I agree that Hive is popular but > we > > >>>>>>>> should not just adopt everything from Hive as there syntax is > very > > >>>>>>>> batch-specific. We should come up with a superset of historical > > >> and > > >>>>>>>> future requirements. Supporting Hive queries can be an > > >> intermediate > > >>>>>>>> layer on top of Flink's DDL. > > >>>>>>>> > > >>>>>>>> 4. Time attributes: @Lin: I'm fine with changing the > > >>>> TimestampExtractor > > >>>>>>>> interface as this is also important for better separation of > > >>> connector > > >>>>>>>> and table module [1]. However, I'm wondering about watermark > > >>>>>> generation. > > >>>>>>>> 4a. timestamps are in the schema twice: > > >>>>>>>> @Jark: "existing field is Long/Timestamp, we can just use it as > > >>>>>>>> rowtime": yes, but we need to mark a field as such an attribute. > > >> How > > >>>>>>>> does the syntax for marking look like? Also in case of > timestamps > > >>> that > > >>>>>>>> are nested in the schema? > > >>>>>>>> > > >>>>>>>> 4b. how can we write out a timestamp into the message header?: > > >>>>>>>> I agree to simply ignore computed columns when writing out. This > > >> is > > >>>>>> like > > >>>>>>>> 'field-change: add' that I mentioned in the improvements > document. > > >>>>>>>> @Jark: "then the timestmap in StreamRecord will be write to > Kafka > > >>>>>>>> message header": Unfortunately, there is no timestamp in the > > >> stream > > >>>>>>>> record. Additionally, multiple time attributes can be in a > schema. > > >>> So > > >>>>>> we > > >>>>>>>> need a constraint that tells the sink which column to use > > >> (possibly > > >>>>>>>> computed as well)? > > >>>>>>>> > > >>>>>>>> 4c. separate all time attribute concerns into a special clause > > >> next > > >>> to > > >>>>>>>> the regular schema? > > >>>>>>>> @Jark: I don't have a strong opinion on this. I just have the > > >>> feeling > > >>>>>>>> that the "schema part" becomes quite messy because the actual > > >> schema > > >>>>>>>> with types and fields is accompanied by so much metadata about > > >>>>>>>> timestamps, watermarks, keys,... and we would need to introduce > a > > >>> new > > >>>>>>>> WATERMARK keyword within a schema that was close to standard up > to > > >>>> this > > >>>>>>>> point. > > >>>>>>>> > > >>>>>>>> Thanks everyone, > > >>>>>>>> Timo > > >>>>>>>> > > >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-9461 > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Am 06.12.18 um 07:08 schrieb Jark Wu: > > >>>>>>>>> Hi Timo, > > >>>>>>>>> > > >>>>>>>>> Thank you for the valuable feedbacks. > > >>>>>>>>> > > >>>>>>>>> First of all, I think we don't need to align the SQL > > >> functionality > > >>> to > > >>>>>>>>> Descriptor. Because SQL is a more standard API, we should be as > > >>>>>>> cautious > > >>>>>>>> as > > >>>>>>>>> possible to extend the SQL syntax. If something can be done in > a > > >>>>>>> standard > > >>>>>>>>> way, we shouldn't introduce something new. > > >>>>>>>>> > > >>>>>>>>> Here are some of my thoughts: > > >>>>>>>>> > > >>>>>>>>> 1. Scope: Agree. > > >>>>>>>>> 2. Constraints: Agree. > > >>>>>>>>> 4. Time attributes: > > >>>>>>>>> 4a. timestamps are in the schema twice. > > >>>>>>>>> If an existing field is Long/Timestamp, we can just use > it > > >> as > > >>>>>>>> rowtime, > > >>>>>>>>> no twice defined. If it is not a Long/Timestamp, we use > computed > > >>>>>> column > > >>>>>>>> to > > >>>>>>>>> get an expected timestamp column to be rowtime, is this what > you > > >>> mean > > >>>>>>>>> defined twice? But I don't think it is a problem, but an > > >>> advantages, > > >>>>>>>>> because it is easy to use, user do not need to consider whether > > >> to > > >>>>>>>> "replace > > >>>>>>>>> the existing column" or "add a new column", he will not be > > >> confused > > >>>>>>>> what's > > >>>>>>>>> the real schema is, what's the index of rowtime in the schema? > > >>>>>>> Regarding > > >>>>>>>> to > > >>>>>>>>> the optimization, even if timestamps are in schema twice, when > > >> the > > >>>>>>>> original > > >>>>>>>>> timestamp is never used in query, then the projection pushdown > > >>>>>>>> optimization > > >>>>>>>>> can cut this field as early as possible, which is exactly the > > >> same > > >>> as > > >>>>>>>>> "replacing the existing column" in runtime. > > >>>>>>>>> > > >>>>>>>>> 4b. how can we write out a timestamp into the message > > >> header? > > >>>>>>>>> That's a good point. I think computed column is just a > > >>> virtual > > >>>>>>>> column > > >>>>>>>>> on table which is only relative to reading. If we want to write > > >> to > > >>> a > > >>>>>>>> table > > >>>>>>>>> with computed column defined, we only need to provide the > columns > > >>>>>>> except > > >>>>>>>>> computed columns (see SQL Server [1]). The computed column is > > >>> ignored > > >>>>>>> in > > >>>>>>>>> the insert statement. Get back to the question, how can we > write > > >>> out > > >>>>>> a > > >>>>>>>>> timestamp into the message header? IMO, we can provide a > > >>>>>> configuration > > >>>>>>> to > > >>>>>>>>> support this, such as `kafka.writeTimestamp=true`, then the > > >>> timestmap > > >>>>>>> in > > >>>>>>>>> StreamRecord will be write to Kafka message header. What do you > > >>>>>> think? > > >>>>>>>>> 4c. separate all time attribute concerns into a special > > >>> clause > > >>>>>>> next > > >>>>>>>> to > > >>>>>>>>> the regular schema? > > >>>>>>>>> Separating watermark into a special clause similar to > > >>>>>> PARTITIONED > > >>>>>>>> BY is > > >>>>>>>>> also a good idea. Conceptually, it's fine to put watermark in > > >>> schema > > >>>>>>> part > > >>>>>>>>> or out schema part. But if we want to support multiple > watermark > > >>>>>>>>> definition, maybe it would be better to put it in schema part. > It > > >>> is > > >>>>>>>>> similar to Index Definition that we can define several indexes > > >> on a > > >>>>>>> table > > >>>>>>>>> in schema part. > > >>>>>>>>> > > >>>>>>>>> 4d. How can people come up with a custom watermark > > >> strategy? > > >>>>>>>>> In most cases, the built-in strategy can works good. If > we > > >>> need > > >>>>>> a > > >>>>>>>>> custom one, we can use a scalar function which restrict to only > > >>>>>> return > > >>>>>>> a > > >>>>>>>>> nullable Long, and use it in SQL like: WATERMARK for rowtime AS > > >>>>>>>>> watermarkUdf(a, b, c). The `watermarkUdf` is a user-defined > > >> scalar > > >>>>>>>> function > > >>>>>>>>> accepts 3 parameters and return a nullable Long which can be > used > > >>> as > > >>>>>>>>> punctuated watermark assigner. Another choice is implementing a > > >>> class > > >>>>>>>>> extending the > > >>>>>>>>> `org.apache.flink.table.sources.wmstrategies.WatermarkStrategy` > > >> and > > >>>>>> use > > >>>>>>>> it > > >>>>>>>>> in SQL: WATERMARK for rowtime AS 'com.my.MyWatermarkStrategy'. > > >> But > > >>> if > > >>>>>>>>> scalar function can cover the requirements here, I would prefer > > >> it > > >>>>>>> here, > > >>>>>>>>> because it keeps standard compliant. BTW, this feature is not > in > > >>> MVP, > > >>>>>>> we > > >>>>>>>>> can discuss it more depth in the future when we need it. > > >>>>>>>>> > > >>>>>>>>> 5. Schema declaration: > > >>>>>>>>> I like the proposal to omit the schema if we can get the schema > > >>> from > > >>>>>>>>> external storage or something schema file. Actually, we have > > >>> already > > >>>>>>>>> encountered this requirement in out company. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> +1 to @Xuefu that we should be as close as possible to Hive > > >> syntax > > >>>>>>> while > > >>>>>>>>> keeping SQL ANSI standard. This will make it more acceptable > and > > >>>>>> reduce > > >>>>>>>> the > > >>>>>>>>> learning cost for user. > > >>>>>>>>> > > >>>>>>>>> [1]: > > >>>>>>>>> > > >> > > > https://docs.microsoft.com/en-us/sql/relational-databases/partitions/create-partitioned-tables-and-indexes?view=sql-server-2017 > > >>>>>>>>> Best, > > >>>>>>>>> Jark > > >>>>>>>>> > > >>>>>>>>> On Thu, 6 Dec 2018 at 12:09, Zhang, Xuefu < > > >> xuef...@alibaba-inc.com > > >>>>>>>> wrote: > > >>>>>>>>>> Hi Timo/Shuyi/Lin, > > >>>>>>>>>> > > >>>>>>>>>> Thanks for the discussions. It seems that we are converging to > > >>>>>>> something > > >>>>>>>>>> meaningful. Here are some of my thoughts: > > >>>>>>>>>> > > >>>>>>>>>> 1. +1 on MVP DDL > > >>>>>>>>>> 3. Markers for source or sink seem more about permissions on > > >>> tables > > >>>>>>> that > > >>>>>>>>>> belong to a security component. Unless the table is created > > >>>>>>> differently > > >>>>>>>>>> based on source, sink, or both, it doesn't seem necessary to > use > > >>>>>> these > > >>>>>>>>>> keywords to enforce permissions. > > >>>>>>>>>> 5. It might be okay if schema declaration is always needed. > > >> While > > >>>>>>> there > > >>>>>>>>>> might be some duplication sometimes, it's not always true. For > > >>>>>>> example, > > >>>>>>>>>> external schema may not be exactly matching Flink schema. For > > >>>>>>> instance, > > >>>>>>>>>> data types. Even if so, perfect match is not required. For > > >>> instance, > > >>>>>>> the > > >>>>>>>>>> external schema file may evolve while table schema in Flink > may > > >>> stay > > >>>>>>>>>> unchanged. A responsible reader should be able to scan the > file > > >>>>>> based > > >>>>>>> on > > >>>>>>>>>> file schema and return the data based on table schema. > > >>>>>>>>>> > > >>>>>>>>>> Other aspects: > > >>>>>>>>>> > > >>>>>>>>>> 7. Hive compatibility. Since Flink SQL will soon be able to > > >>> operate > > >>>>>> on > > >>>>>>>>>> Hive metadata and data, it's an add-on benefit if we can be > > >>>>>> compatible > > >>>>>>>> with > > >>>>>>>>>> Hive syntax/semantics while following ANSI standard. At least > we > > >>>>>>> should > > >>>>>>>> be > > >>>>>>>>>> as close as possible. Hive DDL can found at > > >>>>>>>>>> > > >>> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL > > >>>>>>>>>> Thanks, > > >>>>>>>>>> Xuefu > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >> ------------------------------------------------------------------ > > >>>>>>>>>> Sender:Lin Li <lincoln.8...@gmail.com> > > >>>>>>>>>> Sent at:2018 Dec 6 (Thu) 10:49 > > >>>>>>>>>> Recipient:dev <dev@flink.apache.org> > > >>>>>>>>>> Subject:Re: [DISCUSS] Flink SQL DDL Design > > >>>>>>>>>> > > >>>>>>>>>> Hi Timo and Shuyi, > > >>>>>>>>>> thanks for your feedback. > > >>>>>>>>>> > > >>>>>>>>>> 1. Scope > > >>>>>>>>>> agree with you we should focus on the MVP DDL first. > > >>>>>>>>>> > > >>>>>>>>>> 2. Constraints > > >>>>>>>>>> yes, this can be a follow-up issue. > > >>>>>>>>>> > > >>>>>>>>>> 3. Sources/Sinks > > >>>>>>>>>> If a TABLE has both read/write access requirements, should we > > >>>>>> declare > > >>>>>>> it > > >>>>>>>>>> using > > >>>>>>>>>> `CREATE [SOURCE_SINK|BOTH] TABLE tableName ...` ? A further > > >>>>>> question, > > >>>>>>>> if a > > >>>>>>>>>> TABLE > > >>>>>>>>>> t1 firstly declared as read only (as a source table), then for > > >>> some > > >>>>>>> new > > >>>>>>>>>> requirements > > >>>>>>>>>> t1 will change to a sink table, in this case we need updating > > >>> both > > >>>>>>> the > > >>>>>>>> DDL > > >>>>>>>>>> and catalogs. > > >>>>>>>>>> Further more, let's think about the BATCH query, update one > > >> table > > >>>>>>>> in-place > > >>>>>>>>>> can be a common case. > > >>>>>>>>>> e.g., > > >>>>>>>>>> ``` > > >>>>>>>>>> CREATE TABLE t1 ( > > >>>>>>>>>> col1 varchar, > > >>>>>>>>>> col2 int, > > >>>>>>>>>> col3 varchar > > >>>>>>>>>> ... > > >>>>>>>>>> ); > > >>>>>>>>>> > > >>>>>>>>>> INSERT [OVERWRITE] TABLE t1 > > >>>>>>>>>> AS > > >>>>>>>>>> SELECT > > >>>>>>>>>> (some computing ...) > > >>>>>>>>>> FROM t1; > > >>>>>>>>>> ``` > > >>>>>>>>>> So, let's forget these SOURCE/SINK keywords in DDL. For the > > >>>>>> validation > > >>>>>>>>>> purpose, we can find out other ways. > > >>>>>>>>>> > > >>>>>>>>>> 4. Time attributes > > >>>>>>>>>> As Shuyi mentioned before, there exists an > > >>>>>>>>>> > `org.apache.flink.table.sources.tsextractors.TimestampExtractor` > > >>> for > > >>>>>>>> custom > > >>>>>>>>>> defined time attributes usage, but this expression based class > > >> is > > >>>>>> more > > >>>>>>>>>> friendly for table api not the SQL. > > >>>>>>>>>> ``` > > >>>>>>>>>> /** > > >>>>>>>>>> * Provides the an expression to extract the timestamp > for a > > >>>>>> rowtime > > >>>>>>>>>> attribute. > > >>>>>>>>>> */ > > >>>>>>>>>> abstract class TimestampExtractor extends FieldComputer[Long] > > >> with > > >>>>>>>>>> Serializable { > > >>>>>>>>>> > > >>>>>>>>>> /** Timestamp extractors compute the timestamp as Long. > */ > > >>>>>>>>>> override def getReturnType: TypeInformation[Long] = > > >>>>>>>>>> Types.LONG.asInstanceOf[TypeInformation[Long]] > > >>>>>>>>>> } > > >>>>>>>>>> ``` > > >>>>>>>>>> BTW, I think both the Scalar function and the > TimestampExtractor > > >>> are > > >>>>>>>>>> expressing computing logic, the TimestampExtractor has no more > > >>>>>>>> advantage in > > >>>>>>>>>> SQL scenarios. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> 6. Partitioning and keys > > >>>>>>>>>> Primary Key is included in Constraint part, and partitioned > > >> table > > >>>>>>>> support > > >>>>>>>>>> can be another topic later. > > >>>>>>>>>> > > >>>>>>>>>> 5. Schema declaration > > >>>>>>>>>> Agree with you that we can do better schema derivation for > user > > >>>>>>>>>> convenience, but this is not conflict with the syntax. > > >>>>>>>>>> Table properties can carry any useful informations both for > the > > >>>>>> users > > >>>>>>>> and > > >>>>>>>>>> the framework, I like your `contract name` proposal, > > >>>>>>>>>> e.g., `WITH (format.type = avro)`, the framework can recognize > > >>> some > > >>>>>>>>>> `contract name` like `format.type`, `connector.type` and etc. > > >>>>>>>>>> And also derive the table schema from an existing schema file > > >> can > > >>> be > > >>>>>>>> handy > > >>>>>>>>>> especially one with too many table columns. > > >>>>>>>>>> > > >>>>>>>>>> Regards, > > >>>>>>>>>> Lin > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> Timo Walther <twal...@apache.org> 于2018年12月5日周三 下午10:40写道: > > >>>>>>>>>> > > >>>>>>>>>>> Hi Jark and Shuyi, > > >>>>>>>>>>> > > >>>>>>>>>>> thanks for pushing the DDL efforts forward. I agree that we > > >>> should > > >>>>>>> aim > > >>>>>>>>>>> to combine both Shuyi's design and your design. > > >>>>>>>>>>> > > >>>>>>>>>>> Here are a couple of concerns that I think we should address > in > > >>> the > > >>>>>>>>>> design: > > >>>>>>>>>>> 1. Scope: Let's focuses on a MVP DDL for CREATE TABLE > > >> statements > > >>>>>>> first. > > >>>>>>>>>>> I think this topic has already enough potential for long > > >>>>>> discussions > > >>>>>>>> and > > >>>>>>>>>>> is very helpful for users. We can discuss CREATE VIEW and > > >> CREATE > > >>>>>>>>>>> FUNCTION afterwards as they are not related to each other. > > >>>>>>>>>>> > > >>>>>>>>>>> 2. Constraints: I think we should consider things like > > >>> nullability, > > >>>>>>>>>>> VARCHAR length, and decimal scale and precision in the future > > >> as > > >>>>>> they > > >>>>>>>>>>> allow for nice optimizations. However, since both the > > >> translation > > >>>>>> and > > >>>>>>>>>>> runtime operators do not support those features. I would not > > >>>>>>> introduce > > >>>>>>>> a > > >>>>>>>>>>> arbitrary default value but omit those parameters for now. > This > > >>> can > > >>>>>>> be > > >>>>>>>> a > > >>>>>>>>>>> follow-up issue once the basic DDL has been merged. > > >>>>>>>>>>> > > >>>>>>>>>>> 3. Sources/Sinks: We had a discussion about CREATE TABLE vs > > >>> CREATE > > >>>>>>>>>>> [SOURCE|SINK|] TABLE before. In my opinion we should allow > for > > >>>>>> these > > >>>>>>>>>>> explicit declaration because in most production scenarios, > > >> teams > > >>>>>> have > > >>>>>>>>>>> strict read/write access requirements. For example, a data > > >>> science > > >>>>>>> team > > >>>>>>>>>>> should only consume from a event Kafka topic but should not > > >>>>>>> accidently > > >>>>>>>>>>> write back to the single source of truth. > > >>>>>>>>>>> > > >>>>>>>>>>> 4. Time attributes: In general, I like your computed columns > > >>>>>> approach > > >>>>>>>>>>> because it makes defining a rowtime attributes transparent > and > > >>>>>>> simple. > > >>>>>>>>>>> However, there are downsides that we should discuss. > > >>>>>>>>>>> 4a. Jarks current design means that timestamps are in the > > >> schema > > >>>>>>> twice. > > >>>>>>>>>>> The design that is mentioned in [1] makes this more flexible > as > > >>> it > > >>>>>>>>>>> either allows to replace an existing column or add a computed > > >>>>>> column. > > >>>>>>>>>>> 4b. We need to consider the zoo of storage systems that is > out > > >>>>>> there > > >>>>>>>>>>> right now. Take Kafka as an example, how can we write out a > > >>>>>> timestamp > > >>>>>>>>>>> into the message header? We need to think of a reverse > > >> operation > > >>>>>> to a > > >>>>>>>>>>> computed column. > > >>>>>>>>>>> 4c. Does defining a watermark really fit into the schema part > > >> of > > >>> a > > >>>>>>>>>>> table? Shouldn't we separate all time attribute concerns > into a > > >>>>>>> special > > >>>>>>>>>>> clause next to the regular schema, similar how PARTITIONED BY > > >>> does > > >>>>>> it > > >>>>>>>> in > > >>>>>>>>>>> Hive? > > >>>>>>>>>>> 4d. How can people come up with a custom watermark strategy? > I > > >>>>>> guess > > >>>>>>>>>>> this can not be implemented in a scalar function and would > > >>> require > > >>>>>>> some > > >>>>>>>>>>> new type of UDF? > > >>>>>>>>>>> > > >>>>>>>>>>> 6. Partitioning and keys: Another question that the DDL > design > > >>>>>> should > > >>>>>>>>>>> answer is how do we express primary keys (for upserts), > > >>>>>> partitioning > > >>>>>>>>>>> keys (for Hive, Kafka message keys). All part of the table > > >>> schema? > > >>>>>>>>>>> 5. Schema declaration: I find it very annoying that we want > to > > >>>>>> force > > >>>>>>>>>>> people to declare all columns and types again even though > this > > >> is > > >>>>>>>>>>> usually already defined in some company-wide format. I know > > >> that > > >>>>>>>> catalog > > >>>>>>>>>>> support will greatly improve this. But if no catalog is used, > > >>>>>> people > > >>>>>>>>>>> need to manually define a schema with 50+ fields in a Flink > > >> DDL. > > >>>>>>> What I > > >>>>>>>>>>> actually promoted having two ways of reading data: > > >>>>>>>>>>> > > >>>>>>>>>>> 1. Either the format derives its schema from the table > schema. > > >>>>>>>>>>> CREATE TABLE (col INT) WITH (format.type = avro) > > >>>>>>>>>>> > > >>>>>>>>>>> 2. Or the table schema can be omitted and the format schema > > >>> defines > > >>>>>>> the > > >>>>>>>>>>> table schema (+ time attributes). > > >>>>>>>>>>> CREATE TABLE WITH (format.type = avro, format.schema-file = > > >>>>>>>>>>> "/my/avrofile.avsc") > > >>>>>>>>>>> > > >>>>>>>>>>> Please let me know what you think about each item. I will try > > >> to > > >>>>>>>>>>> incorporate your feedback in [1] this week. > > >>>>>>>>>>> > > >>>>>>>>>>> Regards, > > >>>>>>>>>>> Timo > > >>>>>>>>>>> > > >>>>>>>>>>> [1] > > >>>>>>>>>>> > > >>>>>>>>>>> > > >> > > > https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf > > >>>>>>>>>>> Am 05.12.18 um 13:01 schrieb Jark Wu: > > >>>>>>>>>>>> Hi Shuyi, > > >>>>>>>>>>>> > > >>>>>>>>>>>> It's exciting to see we can make such a great progress here. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Regarding to the watermark: > > >>>>>>>>>>>> > > >>>>>>>>>>>> Watermarks can be defined on any columns (including > > >>>>>> computed-column) > > >>>>>>>> in > > >>>>>>>>>>>> table schema. > > >>>>>>>>>>>> The computed column can be computed from existing columns > > >> using > > >>>>>>>> builtin > > >>>>>>>>>>>> functions and *UserDefinedFunctions* (ScalarFunction). > > >>>>>>>>>>>> So IMO, it can work out for almost all the scenarios not > only > > >>>>>> common > > >>>>>>>>>>>> scenarios. > > >>>>>>>>>>>> > > >>>>>>>>>>>> I don't think using a `TimestampExtractor` to support custom > > >>>>>>> timestamp > > >>>>>>>>>>>> extractor in SQL is a good idea. Because > `TimestampExtractor` > > >>>>>>>>>>>> is not a SQL standard function. If we support > > >>> `TimestampExtractor` > > >>>>>>> in > > >>>>>>>>>>> SQL, > > >>>>>>>>>>>> do we need to support CREATE FUNCTION for > > >> `TimestampExtractor`? > > >>>>>>>>>>>> I think `ScalarFunction` can do the same thing with > > >>>>>>>>>> `TimestampExtractor` > > >>>>>>>>>>>> but more powerful and standard. > > >>>>>>>>>>>> > > >>>>>>>>>>>> The core idea of the watermark definition syntax is that the > > >>>>>> schema > > >>>>>>>>>> part > > >>>>>>>>>>>> defines all the columns of the table, it is exactly what the > > >>> query > > >>>>>>>>>> sees. > > >>>>>>>>>>>> The watermark part is something like a primary key > definition > > >> or > > >>>>>>>>>>> constraint > > >>>>>>>>>>>> on SQL Table, it has no side effect on the schema, only > > >> defines > > >>>>>> what > > >>>>>>>>>>>> watermark strategy is and makes which field as the rowtime > > >>>>>> attribute > > >>>>>>>>>>> field. > > >>>>>>>>>>>> If the rowtime field is not in the existing fields, we can > use > > >>>>>>>> computed > > >>>>>>>>>>>> column > > >>>>>>>>>>>> to generate it from other existing fields. The Descriptor > > >>> Pattern > > >>>>>>> API > > >>>>>>>>>> [1] > > >>>>>>>>>>>> is very useful when writing a Table API job, but is not > > >>>>>>> contradictory > > >>>>>>>>>> to > > >>>>>>>>>>>> the > > >>>>>>>>>>>> Watermark DDL from my perspective. > > >>>>>>>>>>>> > > >>>>>>>>>>>> [1]: > > >>>>>>>>>>>> > > >> > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes > > >>>>>>>>>>>> . > > >>>>>>>>>>>> > > >>>>>>>>>>>> Best, > > >>>>>>>>>>>> Jark > > >>>>>>>>>>>> > > >>>>>>>>>>>> On Wed, 5 Dec 2018 at 17:58, Shuyi Chen <suez1...@gmail.com > > > > >>>>>> wrote: > > >>>>>>>>>>>>> Hi Jark and Shaoxuan, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Thanks a lot for the summary. I think we are making great > > >>>>>> progress > > >>>>>>>>>> here. > > >>>>>>>>>>>>> Below are my thoughts. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> *(1) watermark definition > > >>>>>>>>>>>>> IMO, it's better to keep it consistent with the rowtime > > >>>>>> extractors > > >>>>>>>> and > > >>>>>>>>>>>>> watermark strategies defined in > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >> > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes > > >>>>>>>>>>>>> . > > >>>>>>>>>>>>> Using built-in functions seems to be too much for most of > the > > >>>>>>> common > > >>>>>>>>>>>>> scenarios. > > >>>>>>>>>>>>> *(2) CREATE SOURCE/SINK TABLE or CREATE TABLE > > >>>>>>>>>>>>> Actually, I think we can put the source/sink type info into > > >> the > > >>>>>>> table > > >>>>>>>>>>>>> properties, so we can use CREATE TABLE. > > >>>>>>>>>>>>> (3) View DDL with properties > > >>>>>>>>>>>>> We can remove the view properties section now for the MVP > and > > >>> add > > >>>>>>> it > > >>>>>>>>>>> back > > >>>>>>>>>>>>> later if needed. > > >>>>>>>>>>>>> (4) Type Definition > > >>>>>>>>>>>>> I agree we can put the type length or precision into future > > >>>>>>> versions. > > >>>>>>>>>> As > > >>>>>>>>>>>>> for the grammar difference, currently, I am using the > grammar > > >>> in > > >>>>>>>>>> Calcite > > >>>>>>>>>>>>> type DDL, but since we'll extend the parser in Flink, so we > > >> can > > >>>>>>>>>>> definitely > > >>>>>>>>>>>>> change if needed. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Shuyi > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Tue, Dec 4, 2018 at 10:48 PM Jark Wu <imj...@gmail.com> > > >>>>>> wrote: > > >>>>>>>>>>>>>> Hi Shaoxuan, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Thanks for pointing that out. Yes, the source/sink tag on > > >>> create > > >>>>>>>>>> table > > >>>>>>>>>>> is > > >>>>>>>>>>>>>> the another major difference. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Summarize the main differences again: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> *(1) watermark definition > > >>>>>>>>>>>>>> *(2) CREATE SOURCE/SINK TABLE or CREATE TABLE > > >>>>>>>>>>>>>> (3) View DDL with properties > > >>>>>>>>>>>>>> (4) Type Definition > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>> Jark > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Wed, 5 Dec 2018 at 14:08, Shaoxuan Wang < > > >>> wshaox...@gmail.com > > >>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>> Hi Jark, > > >>>>>>>>>>>>>>> Thanks for the summary. Your plan for the 1st round > > >>>>>>> implementation > > >>>>>>>>>> of > > >>>>>>>>>>>>> DDL > > >>>>>>>>>>>>>>> looks good to me. > > >>>>>>>>>>>>>>> Have we reached the agreement on simplifying/unifying > > >> "create > > >>>>>>>>>>>>>> [source/sink] > > >>>>>>>>>>>>>>> table" to "create table"? "Watermark definition" and > > >> "create > > >>>>>>> table" > > >>>>>>>>>>> are > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> major obstacles on the way to merge two design proposals > > >>> FMPOV. > > >>>>>>>>>>> @Shuyi, > > >>>>>>>>>>>>>> It > > >>>>>>>>>>>>>>> would be great if you can spend time and respond to these > > >> two > > >>>>>>> parts > > >>>>>>>>>>>>>> first. > > >>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>> Shaoxuan > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Wed, Dec 5, 2018 at 12:20 PM Jark Wu < > imj...@gmail.com> > > >>>>>>> wrote: > > >>>>>>>>>>>>>>>> Hi Shuyi, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> It seems that you have reviewed the DDL doc [1] that Lin > > >>> and I > > >>>>>>>>>>>>> drafted. > > >>>>>>>>>>>>>>>> This doc covers all the features running in Alibaba. > > >>>>>>>>>>>>>>>> But some of features might be not needed in the first > > >>> version > > >>>>>> of > > >>>>>>>>>>>>> Flink > > >>>>>>>>>>>>>>> SQL > > >>>>>>>>>>>>>>>> DDL. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> So my suggestion would be to focus on the MVP DDLs and > > >> reach > > >>>>>>>>>>>>> agreement > > >>>>>>>>>>>>>>> ASAP > > >>>>>>>>>>>>>>>> based on the DDL draft [1] and the DDL design [2] Shuyi > > >>>>>>> proposed. > > >>>>>>>>>>>>>>>> And we can discuss on the main differences one by one. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> The following is the MVP DDLs should be included in the > > >>> first > > >>>>>>>>>> version > > >>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>> my > > >>>>>>>>>>>>>>>> opinion (feedbacks are welcome): > > >>>>>>>>>>>>>>>> (1) Table DDL: > > >>>>>>>>>>>>>>>> (1.1) Type definition > > >>>>>>>>>>>>>>>> (1.2) computed column definition > > >>>>>>>>>>>>>>>> (1.3) watermark definition > > >>>>>>>>>>>>>>>> (1.4) with properties > > >>>>>>>>>>>>>>>> (1.5) table constraint (primary key/unique) > > >>>>>>>>>>>>>>>> (1.6) column nullability (nice to have) > > >>>>>>>>>>>>>>>> (2) View DDL > > >>>>>>>>>>>>>>>> (3) Function DDL > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> The main differences from two DDL docs (sth maybe > missed, > > >>>>>>> welcome > > >>>>>>>>>> to > > >>>>>>>>>>>>>>> point > > >>>>>>>>>>>>>>>> out): > > >>>>>>>>>>>>>>>> *(1.3) watermark*: this is the main and the most > important > > >>>>>>>>>>>>> difference, > > >>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>> would be great if @Timo Walther <twal...@apache.org> > > >>> @Fabian > > >>>>>>>>>> Hueske > > >>>>>>>>>>>>>>>> <fhue...@gmail.com> give some feedbacks. > > >>>>>>>>>>>>>>>> (1.1) Type definition: > > >>>>>>>>>>>>>>>> (a) Should VARCHAR carry a length, e.g. > > >>> VARCHAR(128) > > >>>> ? > > >>>>>>>>>>>>>>>> In most cases, the varchar length is not > > >> used > > >>>>>>> because > > >>>>>>>>>>> they > > >>>>>>>>>>>>>> are > > >>>>>>>>>>>>>>>> stored as String in Flink. But it can be used to > optimize > > >> in > > >>>>>> the > > >>>>>>>>>>>>> future > > >>>>>>>>>>>>>>> if > > >>>>>>>>>>>>>>>> we know the column is a fixed length VARCHAR. > > >>>>>>>>>>>>>>>> So IMO, we can support VARCHAR with > length > > >> in > > >>>> the > > >>>>>>>>>> future, > > >>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>> just VARCHAR in this version. > > >>>>>>>>>>>>>>>> (b) Should DECIMAL support custom scale and > > >>>> precision, > > >>>>>>>> e.g. > > >>>>>>>>>>>>>>>> DECIMAL(12, 5)? > > >>>>>>>>>>>>>>>> If we clearly know the scale and > precision > > >> of > > >>>> the > > >>>>>>>>>>> Decimal, > > >>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>> can have some optimization on > > >> serialization/deserialization. > > >>>>>>> IMO, > > >>>>>>>>>> we > > >>>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>> support just support DECIMAL in this version, > > >>>>>>>>>>>>>>>> which means DECIMAL(38, 18) as default. > And > > >>>>>> support > > >>>>>>>>>>> custom > > >>>>>>>>>>>>>>> scale > > >>>>>>>>>>>>>>>> and precision in the future. > > >>>>>>>>>>>>>>>> (2) View DDL: Do we need WITH properties in View > DDL > > >>>>>>> (proposed > > >>>>>>>> in > > >>>>>>>>>>>>>>> doc[2])? > > >>>>>>>>>>>>>>>> What are the properties on the view used for? > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> The features could be supported and discussed in the > > >> future: > > >>>>>>>>>>>>>>>> (1) period definition on table > > >>>>>>>>>>>>>>>> (2) Type DDL > > >>>>>>>>>>>>>>>> (3) Index DDL > > >>>>>>>>>>>>>>>> (4) Library DDL > > >>>>>>>>>>>>>>>> (5) Drop statement > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> [1] Flink DDL draft by Lin and Jark: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >> > > > https://docs.google.com/document/d/1o16jC-AxnZoxMfHQptkKQkSC6ZDDBRhKg6gm8VGnY-k/edit# > > >>>>>>>>>>>>>>>> [2] Flink SQL DDL design by Shuyi: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >> > > > https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit# > > >>>>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>>>> Jark > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> On Thu, 29 Nov 2018 at 16:13, Shaoxuan Wang < > > >>>>>>> wshaox...@gmail.com> > > >>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>> Sure Shuyu, > > >>>>>>>>>>>>>>>>> What I hope is that we can reach an agreement on DDL > > >> gramma > > >>>>>> as > > >>>>>>>>>> soon > > >>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>> possible. There are a few differences between your > > >> proposal > > >>>>>> and > > >>>>>>>>>>>>> ours. > > >>>>>>>>>>>>>>>> Once > > >>>>>>>>>>>>>>>>> Lin and Jark propose our design, we can quickly discuss > > >> on > > >>>>>> the > > >>>>>>>>>>>>> those > > >>>>>>>>>>>>>>>>> differences, and see how far away towards a unified > > >> design. > > >>>>>>>>>>>>>>>>> WRT the external catalog, I think it is an orthogonal > > >>> topic, > > >>>>>> we > > >>>>>>>>>> can > > >>>>>>>>>>>>>>>> design > > >>>>>>>>>>>>>>>>> it in parallel. I believe @Xuefu, @Bowen are already > > >>> working > > >>>>>>> on. > > >>>>>>>>>> We > > >>>>>>>>>>>>>>>>> should/will definitely involve them to review the final > > >>>>>> design > > >>>>>>> of > > >>>>>>>>>>>>> DDL > > >>>>>>>>>>>>>>>>> implementation. I would suggest that we should give it > a > > >>>>>> higher > > >>>>>>>>>>>>>>> priority > > >>>>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>> the DDL implementation, as it is a crucial component > for > > >>> the > > >>>>>>> user > > >>>>>>>>>>>>>>>>> experience of SQL_CLI. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>>>> Shaoxuan > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On Thu, Nov 29, 2018 at 6:56 AM Shuyi Chen < > > >>>>>> suez1...@gmail.com > > >>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>> Thanks a lot, Shaoxuan, Jack and Lin. We should > > >> definitely > > >>>>>>>>>>>>>>> collaborate > > >>>>>>>>>>>>>>>>>> here, we have also our own DDL implementation running > in > > >>>>>>>>>>>>> production > > >>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>> almost 2 years at Uber. With the joint experience from > > >>> both > > >>>>>>>>>>>>>>> companies, > > >>>>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>>> can definitely make the Flink SQL DDL better. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> As @shaoxuan suggest, Jark can come up with a doc that > > >>> talks > > >>>>>>>>>>>>> about > > >>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> current DDL design in Alibaba, and we can discuss and > > >>> merge > > >>>>>>> them > > >>>>>>>>>>>>>> into > > >>>>>>>>>>>>>>>>> one, > > >>>>>>>>>>>>>>>>>> make it as a FLIP, and plan the tasks for > > >> implementation. > > >>>>>>> Also, > > >>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>> should > > >>>>>>>>>>>>>>>>>> take into account the new external catalog effort in > the > > >>>>>>> design. > > >>>>>>>>>>>>>> What > > >>>>>>>>>>>>>>>> do > > >>>>>>>>>>>>>>>>>> you guys think? > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Shuyi > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> On Wed, Nov 28, 2018 at 6:45 AM Jark Wu < > > >> imj...@gmail.com > > >>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>> Hi Shaoxuan, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> I think summarizing it into a google doc is a good > > >> idea. > > >>> We > > >>>>>>>>>>>>> will > > >>>>>>>>>>>>>>>>> prepare > > >>>>>>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>>>> in the next few days. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>> Jark > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Shaoxuan Wang <wshaox...@gmail.com> 于2018年11月28日周三 > > >>>>>> 下午9:17写道: > > >>>>>>>>>>>>>>>>>>>> Hi Lin and Jark, > > >>>>>>>>>>>>>>>>>>>> Thanks for sharing those details. Can you please > > >>> consider > > >>>>>>>>>>>>>>>> summarizing > > >>>>>>>>>>>>>>>>>>> your > > >>>>>>>>>>>>>>>>>>>> DDL design into a google doc. > > >>>>>>>>>>>>>>>>>>>> We can still continue the discussions on Shuyi's > > >>> proposal. > > >>>>>>>>>>>>> But > > >>>>>>>>>>>>>>>>> having a > > >>>>>>>>>>>>>>>>>>>> separate google doc will be easy for the DEV to > > >>>>>>>>>>>>>>>>>>> understand/comment/discuss > > >>>>>>>>>>>>>>>>>>>> on your proposed DDL implementation. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>>>>>>> Shaoxuan > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> On Wed, Nov 28, 2018 at 7:39 PM Jark Wu < > > >>> imj...@gmail.com > > >>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>> Hi Shuyi, > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Thanks for bringing up this discussion and the > > >> awesome > > >>>>>>>>>>>>> work! > > >>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>> have > > >>>>>>>>>>>>>>>>>>> left > > >>>>>>>>>>>>>>>>>>>>> some comments in the doc. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> I want to share something more about the watermark > > >>>>>>>>>>>>> definition > > >>>>>>>>>>>>>>>>> learned > > >>>>>>>>>>>>>>>>>>>> from > > >>>>>>>>>>>>>>>>>>>>> Alibaba. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> 1. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Table should be able to accept multiple > > >> watermark > > >>>>>>>>>>>>>>> definition. > > >>>>>>>>>>>>>>>>>>>>> Because a table may have more than one > rowtime > > >>>>>> field. > > >>>>>>>>>>>>> For > > >>>>>>>>>>>>>>>>> example, > > >>>>>>>>>>>>>>>>>>> one > > >>>>>>>>>>>>>>>>>>>>> rowtime field is from existing field but > > >> missing > > >>> in > > >>>>>>> some > > >>>>>>>>>>>>>>>>> records, > > >>>>>>>>>>>>>>>>>>>>> another > > >>>>>>>>>>>>>>>>>>>>> is the ingestion timestamp in Kafka but not > > >> very > > >>>>>>>>>>>>> accurate. > > >>>>>>>>>>>>>>> In > > >>>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>>>> case, > > >>>>>>>>>>>>>>>>>>>>> user may define two rowtime fields with > > >>> watermarks > > >>>>>> in > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>> Table > > >>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>> choose > > >>>>>>>>>>>>>>>>>>>>> one in different situation. > > >>>>>>>>>>>>>>>>>>>>> 2. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Watermark stragety always work with rowtime > > >> field > > >>>>>>>>>>>>>> together. > > >>>>>>>>>>>>>>>>>>>>> Based on the two points metioned above, I think we > > >>> should > > >>>>>>>>>>>>>>> combine > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>> watermark strategy and rowtime field selection > (i.e. > > >>>>>> which > > >>>>>>>>>>>>>>>> existing > > >>>>>>>>>>>>>>>>>>> field > > >>>>>>>>>>>>>>>>>>>>> used to generate watermark) in one clause, so that > we > > >>> can > > >>>>>>>>>>>>>>> define > > >>>>>>>>>>>>>>>>>>> multiple > > >>>>>>>>>>>>>>>>>>>>> watermarks in one Table. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Here I will share the watermark syntax used in > > >> Alibaba > > >>>>>>>>>>>>>> (simply > > >>>>>>>>>>>>>>>>>>> modified): > > >>>>>>>>>>>>>>>>>>>>> watermarkDefinition: > > >>>>>>>>>>>>>>>>>>>>> WATERMARK [watermarkName] FOR <rowtime_field> AS > > >>>>>>>>>>>>> wm_strategy > > >>>>>>>>>>>>>>>>>>>>> wm_strategy: > > >>>>>>>>>>>>>>>>>>>>> BOUNDED WITH OFFSET 'string' timeUnit > > >>>>>>>>>>>>>>>>>>>>> | > > >>>>>>>>>>>>>>>>>>>>> ASCENDING > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> The “WATERMARK” keyword starts a watermark > > >> definition. > > >>>>>> The > > >>>>>>>>>>>>>>> “FOR” > > >>>>>>>>>>>>>>>>>>> keyword > > >>>>>>>>>>>>>>>>>>>>> defines which existing field used to generate > > >>> watermark, > > >>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>> field > > >>>>>>>>>>>>>>>>>>>> should > > >>>>>>>>>>>>>>>>>>>>> already exist in the schema (we can use > > >> computed-column > > >>>>>> to > > >>>>>>>>>>>>>>> derive > > >>>>>>>>>>>>>>>>>> from > > >>>>>>>>>>>>>>>>>>>>> other fields). The “AS” keyword defines watermark > > >>>>>> strategy, > > >>>>>>>>>>>>>>> such > > >>>>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>>>>> BOUNDED > > >>>>>>>>>>>>>>>>>>>>> WITH OFFSET (covers almost all the requirements) > and > > >>>>>>>>>>>>>> ASCENDING. > > >>>>>>>>>>>>>>>>>>>>> When the expected rowtime field does not exist in > the > > >>>>>>>>>>>>> schema, > > >>>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>>>>> use > > >>>>>>>>>>>>>>>>>>>>> computed-column syntax to derive it from other > > >> existing > > >>>>>>>>>>>>>> fields > > >>>>>>>>>>>>>>>>> using > > >>>>>>>>>>>>>>>>>>>>> built-in functions or user defined functions. So > the > > >>>>>>>>>>>>>>>>>> rowtime/watermark > > >>>>>>>>>>>>>>>>>>>>> definition doesn’t need to care about > “field-change” > > >>>>>>>>>>>>> strategy > > >>>>>>>>>>>>>>>>>>>>> (replace/add/from-field). And the proctime field > > >>>>>> definition > > >>>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>>> also > > >>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>> defined using computed-column. Such as pt as > > >> PROCTIME() > > >>>>>>>>>>>>> which > > >>>>>>>>>>>>>>>>>> defines a > > >>>>>>>>>>>>>>>>>>>>> proctime field named “pt” in the schema. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Looking forward to working with you guys! > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>>> Jark Wu > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Lin Li <lincoln.8...@gmail.com> 于2018年11月28日周三 > > >>> 下午6:33写道: > > >>>>>>>>>>>>>>>>>>>>>> @Shuyi > > >>>>>>>>>>>>>>>>>>>>>> Thanks for the proposal! We have a simple DDL > > >>>>>>>>>>>>>> implementation > > >>>>>>>>>>>>>>>>>>> (extends > > >>>>>>>>>>>>>>>>>>>>>> Calcite's parser) which been running for almost > two > > >>>>>> years > > >>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>>>> production > > >>>>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>> works well. > > >>>>>>>>>>>>>>>>>>>>>> I think the most valued things we'd learned is > > >> keeping > > >>>>>>>>>>>>>>>> simplicity > > >>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>> standard compliance. > > >>>>>>>>>>>>>>>>>>>>>> Here's the approximate grammar, FYI > > >>>>>>>>>>>>>>>>>>>>>> CREATE TABLE > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> CREATE TABLE tableName( > > >>>>>>>>>>>>>>>>>>>>>> columnDefinition [, columnDefinition]* > > >>>>>>>>>>>>>>>>>>>>>> [ computedColumnDefinition [, > > >>>>>>>>>>>>>>>> computedColumnDefinition]* > > >>>>>>>>>>>>>>>>> ] > > >>>>>>>>>>>>>>>>>>>>>> [ tableConstraint [, > tableConstraint]* ] > > >>>>>>>>>>>>>>>>>>>>>> [ tableIndex [, tableIndex]* ] > > >>>>>>>>>>>>>>>>>>>>>> [ PERIOD FOR SYSTEM_TIME ] > > >>>>>>>>>>>>>>>>>>>>>> [ WATERMARK watermarkName FOR > > >> rowTimeColumn > > >>>> AS > > >>>>>>>>>>>>>>>>>>>>>> withOffset(rowTimeColumn, offset) ] ) [ WITH ( > > >>>>>>>>>>>>>>> tableOption > > >>>>>>>>>>>>>>>> [ > > >>>>>>>>>>>>>>>>> , > > >>>>>>>>>>>>>>>>>>>>>> tableOption]* ) ] [ ; ] > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> columnDefinition ::= > > >>>>>>>>>>>>>>>>>>>>>> columnName dataType [ NOT NULL ] > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> dataType ::= > > >>>>>>>>>>>>>>>>>>>>>> { > > >>>>>>>>>>>>>>>>>>>>>> [ VARCHAR ] > > >>>>>>>>>>>>>>>>>>>>>> | [ BOOLEAN ] > > >>>>>>>>>>>>>>>>>>>>>> | [ TINYINT ] > > >>>>>>>>>>>>>>>>>>>>>> | [ SMALLINT ] > > >>>>>>>>>>>>>>>>>>>>>> | [ INT ] > > >>>>>>>>>>>>>>>>>>>>>> | [ BIGINT ] > > >>>>>>>>>>>>>>>>>>>>>> | [ FLOAT ] > > >>>>>>>>>>>>>>>>>>>>>> | [ DECIMAL ] > > >>>>>>>>>>>>>>>>>>>>>> | [ DOUBLE ] > > >>>>>>>>>>>>>>>>>>>>>> | [ DATE ] > > >>>>>>>>>>>>>>>>>>>>>> | [ TIME ] > > >>>>>>>>>>>>>>>>>>>>>> | [ TIMESTAMP ] > > >>>>>>>>>>>>>>>>>>>>>> | [ VARBINARY ] > > >>>>>>>>>>>>>>>>>>>>>> } > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> computedColumnDefinition ::= > > >>>>>>>>>>>>>>>>>>>>>> columnName AS computedColumnExpression > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> tableConstraint ::= > > >>>>>>>>>>>>>>>>>>>>>> { PRIMARY KEY | UNIQUE } > > >>>>>>>>>>>>>>>>>>>>>> (columnName [, columnName]* ) > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> tableIndex ::= > > >>>>>>>>>>>>>>>>>>>>>> [ UNIQUE ] INDEX indexName > > >>>>>>>>>>>>>>>>>>>>>> (columnName [, columnName]* ) > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> rowTimeColumn ::= > > >>>>>>>>>>>>>>>>>>>>>> columnName > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> tableOption ::= > > >>>>>>>>>>>>>>>>>>>>>> property=value > > >>>>>>>>>>>>>>>>>>>>>> offset ::= > > >>>>>>>>>>>>>>>>>>>>>> positive integer (unit: ms) > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> CREATE VIEW > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> CREATE VIEW viewName > > >>>>>>>>>>>>>>>>>>>>>> [ > > >>>>>>>>>>>>>>>>>>>>>> ( columnName [, columnName]* ) > > >>>>>>>>>>>>>>>>>>>>>> ] > > >>>>>>>>>>>>>>>>>>>>>> AS queryStatement; > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> CREATE FUNCTION > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> CREATE FUNCTION functionName > > >>>>>>>>>>>>>>>>>>>>>> AS 'className'; > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> className ::= > > >>>>>>>>>>>>>>>>>>>>>> fully qualified name > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Shuyi Chen <suez1...@gmail.com> 于2018年11月28日周三 > > >>>>>> 上午3:28写道: > > >>>>>>>>>>>>>>>>>>>>>>> Thanks a lot, Timo and Xuefu. Yes, I think we can > > >>>>>>>>>>>>>> finalize > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> design > > >>>>>>>>>>>>>>>>>>>>> doc > > >>>>>>>>>>>>>>>>>>>>>>> first and start implementation w/o the unified > > >>>>>>>>>>>>> connector > > >>>>>>>>>>>>>>> API > > >>>>>>>>>>>>>>>>>> ready > > >>>>>>>>>>>>>>>>>>> by > > >>>>>>>>>>>>>>>>>>>>>>> skipping some featue. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Xuefu, I like the idea of making Flink specific > > >>>>>>>>>>>>>> properties > > >>>>>>>>>>>>>>>> into > > >>>>>>>>>>>>>>>>>>>> generic > > >>>>>>>>>>>>>>>>>>>>>>> key-value pairs, so that it will make integration > > >>> with > > >>>>>>>>>>>>>> Hive > > >>>>>>>>>>>>>>>> DDL > > >>>>>>>>>>>>>>>>>> (or > > >>>>>>>>>>>>>>>>>>>>>> others, > > >>>>>>>>>>>>>>>>>>>>>>> e.g. Beam DDL) easier. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> I'll run a final pass over the design doc and > > >>> finalize > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> design > > >>>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>> next few days. And we can start creating tasks > and > > >>>>>>>>>>>>>>>> collaborate > > >>>>>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>> implementation. Thanks a lot for all the comments > > >> and > > >>>>>>>>>>>>>>> inputs. > > >>>>>>>>>>>>>>>>>>>>>>> Cheers! > > >>>>>>>>>>>>>>>>>>>>>>> Shuyi > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> On Tue, Nov 27, 2018 at 7:02 AM Zhang, Xuefu < > > >>>>>>>>>>>>>>>>>>>> xuef...@alibaba-inc.com> > > >>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Yeah! I agree with Timo that DDL can actually > > >>> proceed > > >>>>>>>>>>>>>> w/o > > >>>>>>>>>>>>>>>>> being > > >>>>>>>>>>>>>>>>>>>>> blocked > > >>>>>>>>>>>>>>>>>>>>>>> by > > >>>>>>>>>>>>>>>>>>>>>>>> connector API. We can leave the unknown out > while > > >>>>>>>>>>>>>>> defining > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> basic > > >>>>>>>>>>>>>>>>>>>>>>> syntax. > > >>>>>>>>>>>>>>>>>>>>>>>> @Shuyi > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> As commented in the doc, I think we can probably > > >>>>>>>>>>>>> stick > > >>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>>> simple > > >>>>>>>>>>>>>>>>>>>>>> syntax > > >>>>>>>>>>>>>>>>>>>>>>>> with general properties, without extending the > > >>> syntax > > >>>>>>>>>>>>>> too > > >>>>>>>>>>>>>>>>> much > > >>>>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>>>>>>>>> mimics the descriptor API. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Part of our effort on Flink-Hive integration is > > >> also > > >>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>> make > > >>>>>>>>>>>>>>>>>> DDL > > >>>>>>>>>>>>>>>>>>>>> syntax > > >>>>>>>>>>>>>>>>>>>>>>>> compatible with Hive's. The one in the current > > >>>>>>>>>>>>> proposal > > >>>>>>>>>>>>>>>> seems > > >>>>>>>>>>>>>>>>>>>> making > > >>>>>>>>>>>>>>>>>>>>>> our > > >>>>>>>>>>>>>>>>>>>>>>>> effort more challenging. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> We can help and collaborate. At this moment, I > > >> think > > >>>>>>>>>>>>> we > > >>>>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>>>>>> finalize > > >>>>>>>>>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>>>>>>>>> the proposal and then we can divide the tasks > for > > >>>>>>>>>>>>>> better > > >>>>>>>>>>>>>>>>>>>>> collaboration. > > >>>>>>>>>>>>>>>>>>>>>>>> Please let me know if there are any questions > or > > >>>>>>>>>>>>>>>>> suggestions. > > >>>>>>>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>>>>>>> Xuefu > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>> ------------------------------------------------------------------ > > >>>>>>>>>>>>>>>>>>>>>>>> Sender:Timo Walther <twal...@apache.org> > > >>>>>>>>>>>>>>>>>>>>>>>> Sent at:2018 Nov 27 (Tue) 16:21 > > >>>>>>>>>>>>>>>>>>>>>>>> Recipient:dev <dev@flink.apache.org> > > >>>>>>>>>>>>>>>>>>>>>>>> Subject:Re: [DISCUSS] Flink SQL DDL Design > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Thanks for offering your help here, Xuefu. It > > >> would > > >>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>> great > > >>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>> move > > >>>>>>>>>>>>>>>>>>>>>>>> these efforts forward. I agree that the DDL is > > >>>>>>>>>>>>> somehow > > >>>>>>>>>>>>>>>>> releated > > >>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> unified connector API design but we can also > start > > >>>>>>>>>>>>> with > > >>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> basic > > >>>>>>>>>>>>>>>>>>>>>>>> functionality now and evolve the DDL during this > > >>>>>>>>>>>>>> release > > >>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>> next > > >>>>>>>>>>>>>>>>>>>>>>> releases. > > >>>>>>>>>>>>>>>>>>>>>>>> For example, we could identify the MVP DDL > syntax > > >>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>> skips > > >>>>>>>>>>>>>>>>>>>> defining > > >>>>>>>>>>>>>>>>>>>>>>>> key constraints and maybe even time attributes. > > >> This > > >>>>>>>>>>>>>> DDL > > >>>>>>>>>>>>>>>>> could > > >>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>> used > > >>>>>>>>>>>>>>>>>>>>>>>> for batch usecases, ETL, and materializing SQL > > >>>>>>>>>>>>> queries > > >>>>>>>>>>>>>>> (no > > >>>>>>>>>>>>>>>>> time > > >>>>>>>>>>>>>>>>>>>>>>>> operations like windows). > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> The unified connector API is high on our > priority > > >>>>>>>>>>>>> list > > >>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> 1.8 > > >>>>>>>>>>>>>>>>>>>>>>>> release. I will try to update the document until > > >> mid > > >>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>> next > > >>>>>>>>>>>>>>>>>>> week. > > >>>>>>>>>>>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Timo > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Am 27.11.18 um 08:08 schrieb Shuyi Chen: > > >>>>>>>>>>>>>>>>>>>>>>>>> Thanks a lot, Xuefu. I was busy for some other > > >>>>>>>>>>>>> stuff > > >>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> last 2 > > >>>>>>>>>>>>>>>>>>>>>>>> weeks, > > >>>>>>>>>>>>>>>>>>>>>>>>> but we are definitely interested in moving this > > >>>>>>>>>>>>>>> forward. > > >>>>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>>> think > > >>>>>>>>>>>>>>>>>>>>> once > > >>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>> unified connector API design [1] is done, we > can > > >>>>>>>>>>>>>>> finalize > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> DDL > > >>>>>>>>>>>>>>>>>>>>>>> design > > >>>>>>>>>>>>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>>>>>>>>>> well and start creating concrete subtasks to > > >>>>>>>>>>>>>>> collaborate > > >>>>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>> implementation with the community. > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> Shuyi > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> [1] > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >> > > > https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing > > >>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Nov 26, 2018 at 7:01 PM Zhang, Xuefu < > > >>>>>>>>>>>>>>>>>>>>>> xuef...@alibaba-inc.com> > > >>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Shuyi, > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> I'm wondering if you folks still have the > > >>>>>>>>>>>>> bandwidth > > >>>>>>>>>>>>>>>>> working > > >>>>>>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>>>>>> this. > > >>>>>>>>>>>>>>>>>>>>>>>>>> We have some dedicated resource and like to > move > > >>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>> forward. > > >>>>>>>>>>>>>>>>>>>> We > > >>>>>>>>>>>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>>>>>>>>>>>> collaborate. > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> Xuefu > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>> > ------------------------------------------------------------------ > > >>>>>>>>>>>>>>>>>>>>>>>>>> 发件人:wenlong.lwl<wenlong88....@gmail.com> > > >>>>>>>>>>>>>>>>>>>>>>>>>> 日 期:2018年11月05日 11:15:35 > > >>>>>>>>>>>>>>>>>>>>>>>>>> 收件人:<dev@flink.apache.org> > > >>>>>>>>>>>>>>>>>>>>>>>>>> 主 题:Re: [DISCUSS] Flink SQL DDL Design > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi, Shuyi, thanks for the proposal. > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> I have two concerns about the table ddl: > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> 1. how about remove the source/sink mark from > > >> the > > >>>>>>>>>>>>>> ddl, > > >>>>>>>>>>>>>>>>>> because > > >>>>>>>>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>>>>>> necessary, the framework determine the table > > >>>>>>>>>>>>>> referred > > >>>>>>>>>>>>>>>> is a > > >>>>>>>>>>>>>>>>>>>> source > > >>>>>>>>>>>>>>>>>>>>>> or a > > >>>>>>>>>>>>>>>>>>>>>>>> sink > > >>>>>>>>>>>>>>>>>>>>>>>>>> according to the context of the query using > the > > >>>>>>>>>>>>>> table. > > >>>>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>>> will > > >>>>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>>> more > > >>>>>>>>>>>>>>>>>>>>>>>>>> convenient for use defining a table which can > be > > >>>>>>>>>>>>>> both > > >>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>> source > > >>>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>> sink, > > >>>>>>>>>>>>>>>>>>>>>>>>>> and more convenient for catalog to persistent > > >> and > > >>>>>>>>>>>>>>> manage > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> meta > > >>>>>>>>>>>>>>>>>>>>>>> infos. > > >>>>>>>>>>>>>>>>>>>>>>>>>> 2. how about just keeping one pure string map > as > > >>>>>>>>>>>>>>>>> parameters > > >>>>>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>>>> table, > > >>>>>>>>>>>>>>>>>>>>>>>> like > > >>>>>>>>>>>>>>>>>>>>>>>>>> create tabe Kafka10SourceTable ( > > >>>>>>>>>>>>>>>>>>>>>>>>>> intField INTEGER, > > >>>>>>>>>>>>>>>>>>>>>>>>>> stringField VARCHAR(128), > > >>>>>>>>>>>>>>>>>>>>>>>>>> longField BIGINT, > > >>>>>>>>>>>>>>>>>>>>>>>>>> rowTimeField TIMESTAMP > > >>>>>>>>>>>>>>>>>>>>>>>>>> ) with ( > > >>>>>>>>>>>>>>>>>>>>>>>>>> connector.type = ’kafka’, > > >>>>>>>>>>>>>>>>>>>>>>>>>> connector.property-version = ’1’, > > >>>>>>>>>>>>>>>>>>>>>>>>>> connector.version = ’0.10’, > > >>>>>>>>>>>>>>>>>>>>>>>>>> connector.properties.topic = > ‘test-kafka-topic’, > > >>>>>>>>>>>>>>>>>>>>>>>>>> connector.properties.startup-mode = > > >>>>>>>>>>>>> ‘latest-offset’, > > >>>>>>>>>>>>>>>>>>>>>>>>>> connector.properties.specific-offset = > ‘offset’, > > >>>>>>>>>>>>>>>>>>>>>>>>>> format.type = 'json' > > >>>>>>>>>>>>>>>>>>>>>>>>>> format.prperties.version=’1’, > > >>>>>>>>>>>>>>>>>>>>>>>>>> format.derive-schema = 'true' > > >>>>>>>>>>>>>>>>>>>>>>>>>> ); > > >>>>>>>>>>>>>>>>>>>>>>>>>> Because: > > >>>>>>>>>>>>>>>>>>>>>>>>>> 1. in TableFactory, what user use is a string > > >> map > > >>>>>>>>>>>>>>>>>> properties, > > >>>>>>>>>>>>>>>>>>>>>> defining > > >>>>>>>>>>>>>>>>>>>>>>>>>> parameters by string-map can be the closest > way > > >> to > > >>>>>>>>>>>>>>>> mapping > > >>>>>>>>>>>>>>>>>> how > > >>>>>>>>>>>>>>>>>>>>> user > > >>>>>>>>>>>>>>>>>>>>>>> use > > >>>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>> parameters. > > >>>>>>>>>>>>>>>>>>>>>>>>>> 2. The table descriptor can be extended by > user, > > >>>>>>>>>>>>>> like > > >>>>>>>>>>>>>>>> what > > >>>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>> done > > >>>>>>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>>>>> Kafka > > >>>>>>>>>>>>>>>>>>>>>>>>>> and Json, it means that the parameter keys in > > >>>>>>>>>>>>>>> connector > > >>>>>>>>>>>>>>>> or > > >>>>>>>>>>>>>>>>>>>> format > > >>>>>>>>>>>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>>>>>>> different in different implementation, we can > > >> not > > >>>>>>>>>>>>>>>> restrict > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> key > > >>>>>>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>>>>>> specified set, so we need a map in connector > > >> scope > > >>>>>>>>>>>>>>> and a > > >>>>>>>>>>>>>>>>> map > > >>>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>>>>>>> connector.properties scope. why not just give > > >>>>>>>>>>>>> user a > > >>>>>>>>>>>>>>>>> single > > >>>>>>>>>>>>>>>>>>> map, > > >>>>>>>>>>>>>>>>>>>>> let > > >>>>>>>>>>>>>>>>>>>>>>>> them > > >>>>>>>>>>>>>>>>>>>>>>>>>> put parameters in a format they like, which is > > >>>>>>>>>>>>> also > > >>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> simplest > > >>>>>>>>>>>>>>>>>>>>> way > > >>>>>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>>>> implement DDL parser. > > >>>>>>>>>>>>>>>>>>>>>>>>>> 3. whether we can define a format clause or > not, > > >>>>>>>>>>>>>>> depends > > >>>>>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>> implementation of the connector, using > different > > >>>>>>>>>>>>>>> clause > > >>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>> DDL > > >>>>>>>>>>>>>>>>>>>> may > > >>>>>>>>>>>>>>>>>>>>>>> make > > >>>>>>>>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>>>>>> misunderstanding that we can combine the > > >>>>>>>>>>>>> connectors > > >>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>>>> arbitrary > > >>>>>>>>>>>>>>>>>>>>>>>> formats, > > >>>>>>>>>>>>>>>>>>>>>>>>>> which may not work actually. > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, 4 Nov 2018 at 18:25, Dominik Wosiński > < > > >>>>>>>>>>>>>>>>>>> wos...@gmail.com > > >>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>> +1, Thanks for the proposal. > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> I guess this is a long-awaited change. This > can > > >>>>>>>>>>>>>>> vastly > > >>>>>>>>>>>>>>>>>>> increase > > >>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>> functionalities of the SQL Client as it will > be > > >>>>>>>>>>>>>>>> possible > > >>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>> use > > >>>>>>>>>>>>>>>>>>>>>>> complex > > >>>>>>>>>>>>>>>>>>>>>>>>>>> extensions like for example those provided by > > >>>>>>>>>>>>>> Apache > > >>>>>>>>>>>>>>>>>>> Bahir[1]. > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Best Regards, > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Dom. > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> [1] > > >>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/bahir-flink > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> sob., 3 lis 2018 o 17:17 Rong Rong < > > >>>>>>>>>>>>>>>> walter...@gmail.com> > > >>>>>>>>>>>>>>>>>>>>>> napisał(a): > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> +1. Thanks for putting the proposal together > > >>>>>>>>>>>>>> Shuyi. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> DDL has been brought up in a couple of times > > >>>>>>>>>>>>>>>> previously > > >>>>>>>>>>>>>>>>>>> [1,2]. > > >>>>>>>>>>>>>>>>>>>>>>>>>> Utilizing > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> DDL will definitely be a great extension to > > >> the > > >>>>>>>>>>>>>>>> current > > >>>>>>>>>>>>>>>>>>> Flink > > >>>>>>>>>>>>>>>>>>>>> SQL > > >>>>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> systematically support some of the > previously > > >>>>>>>>>>>>>>> brought > > >>>>>>>>>>>>>>>> up > > >>>>>>>>>>>>>>>>>>>>> features > > >>>>>>>>>>>>>>>>>>>>>>> such > > >>>>>>>>>>>>>>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> [3]. And it will also be beneficial to see > the > > >>>>>>>>>>>>>>>> document > > >>>>>>>>>>>>>>>>>>>> closely > > >>>>>>>>>>>>>>>>>>>>>>>> aligned > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> with the previous discussion for unified SQL > > >>>>>>>>>>>>>>> connector > > >>>>>>>>>>>>>>>>> API > > >>>>>>>>>>>>>>>>>>>> [4]. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> I also left a few comments on the doc. > Looking > > >>>>>>>>>>>>>>> forward > > >>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> alignment > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> with the other couple of efforts and > > >>>>>>>>>>>>> contributing > > >>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> them! > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Rong > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> > > > http://mail-archives.apache.org/mod_mbox/flink-dev/201805.mbox/%3CCAMZk55ZTJA7MkCK1Qu4gLPu1P9neqCfHZtTcgLfrFjfO4Xv5YQ%40mail.gmail.com%3E > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> [2] > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> > > > http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3CDC070534-0782-4AFD-8A85-8A82B384B8F7%40gmail.com%3E > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> [3] > > >>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-8003 > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> [4] > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> > > > http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3c6676cb66-6f31-23e1-eff5-2e9c19f88...@apache.org%3E > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 2, 2018 at 10:22 AM Bowen Li < > > >>>>>>>>>>>>>>>>>>> bowenl...@gmail.com > > >>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Shuyi! > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I left some comments there. I think the > > >> design > > >>>>>>>>>>>>> of > > >>>>>>>>>>>>>>> SQL > > >>>>>>>>>>>>>>>>> DDL > > >>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Flink-Hive > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> integration/External catalog enhancements > > >> will > > >>>>>>>>>>>>>> work > > >>>>>>>>>>>>>>>>>> closely > > >>>>>>>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>>>>>>> each > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> other. Hope we are well aligned on the > > >>>>>>>>>>>>> directions > > >>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> two > > >>>>>>>>>>>>>>>>>>>>>>> designs, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> and I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> look forward to working with you guys on > > >> both! > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bowen > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 1, 2018 at 10:57 PM Shuyi Chen > < > > >>>>>>>>>>>>>>>>>>>> suez1...@gmail.com > > >>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> SQL DDL support has been a long-time ask > > >> from > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> community. > > >>>>>>>>>>>>>>>>>>>>>>>>>> Current > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> SQL support only DML (e.g. SELECT and > INSERT > > >>>>>>>>>>>>>>>>>> statements). > > >>>>>>>>>>>>>>>>>>> In > > >>>>>>>>>>>>>>>>>>>>> its > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> current > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> form, Flink SQL users still need to > > >>>>>>>>>>>>>> define/create > > >>>>>>>>>>>>>>>>> table > > >>>>>>>>>>>>>>>>>>>>> sources > > >>>>>>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> sinks > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> programmatically in Java/Scala. Also, in > SQL > > >>>>>>>>>>>>>>> Client, > > >>>>>>>>>>>>>>>>>>> without > > >>>>>>>>>>>>>>>>>>>>> DDL > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> support, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the current implementation does not allow > > >>>>>>>>>>>>>>> dynamical > > >>>>>>>>>>>>>>>>>>> creation > > >>>>>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>>>>>>>> table, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> type > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or functions with SQL, this adds friction > > >> for > > >>>>>>>>>>>>>> its > > >>>>>>>>>>>>>>>>>>> adoption. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I drafted a design doc [1] with a few > other > > >>>>>>>>>>>>>>>> community > > >>>>>>>>>>>>>>>>>>>> members > > >>>>>>>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposes > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the design and implementation for adding > DDL > > >>>>>>>>>>>>>>> support > > >>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>> Flink. > > >>>>>>>>>>>>>>>>>>>>>> The > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> initial > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> design considers DDL for table, view, > type, > > >>>>>>>>>>>>>>> library > > >>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>> function. > > >>>>>>>>>>>>>>>>>>>>>>>>>> It > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> will > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be great to get feedback on the design > from > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> community, > > >>>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>>>>> align > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> latest effort in unified SQL connector API > > >> [2] > > >>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>> Flink > > >>>>>>>>>>>>>>>>>>>> Hive > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> integration > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [3]. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Any feedback is highly appreciated. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Shuyi Chen > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> > > > https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit?usp=sharing > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2] > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> > > > https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [3] > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> > > > https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "So you have to trust that the dots will > > >>>>>>>>>>>>> somehow > > >>>>>>>>>>>>>>>>> connect > > >>>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>> your > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> future." > > >>>>>>>>>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>>>>>>>>> "So you have to trust that the dots will somehow > > >>>>>>>>>>>>> connect > > >>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>> your > > >>>>>>>>>>>>>>>>>>>>> future." > > >>>>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>>>> "So you have to trust that the dots will somehow > connect > > >>> in > > >>>>>>> your > > >>>>>>>>>>>>>>>> future." > > >>>>>>>>>>>>> -- > > >>>>>>>>>>>>> "So you have to trust that the dots will somehow connect in > > >>> your > > >>>>>>>>>>> future." > > >>>>>>>>>>> > > >>>> > > >>> -- > > >>> "So you have to trust that the dots will somehow connect in your > > future." > > >>> > > > > > > -- > "So you have to trust that the dots will somehow connect in your future." >