hi all, Thanks for your valuable input! 4) Event-Time Attributes and Watermarks: 4.b) @Fabian As you mentioned using a computed columns `ts AS SYSTEMROWTIME()` for writing out to kafka table sink will violate the rule that computed fields are not emitted. Since the timestamp column in kafka's header area is a specific materialization protocol, why don't we treat it as an connector property? For an example: ``` CREATE TABLE output_kafka_t1( id bigint, ts timestamp, msg varchar ) WITH ( type=kafka, header.timestamp=ts ,... ); ```
4d) For custom watermark strategies @Fabian Agree with you that opening another topic about this feature later. 3) SOURCE / SINK / BOTH I think the permissions and availabilities are two separately things, permissions can be managed well by using GRANT/INVOKE(you can call it DCL) solutions which commonly used in different DBs. The permission part can be an new topic for later discussion, what do you think? For the availabilities, @Fabian @Timo I've another question, does instantiate a TableSource/Sink cost much or has some other downsides? IMO, create a new source/sink object via the construct seems not costly. When receiving a DDL we should associate it with the catalog object (reusing an existence or create a new one). Am I lost something important? 5. Schema declaration: @Timo yes, your concern about the user convenience is very important. But I haven't seen a clear way to solve this so far. Do we put it later and wait for more inputs from the community? Shuyi Chen <suez1...@gmail.com> 于2018年12月8日周六 下午4:27写道: > Hi all, > > Thanks a lot for the great discussion. I think we can continue the > discussion here while carving out a MVP so that the community can start > working on. Based on the discussion so far, I try to summarize what we will > do for the MVP: > > MVP > > 1. support CREATE TABLE > 2. support exisiting data type in Flink SQL, ignore nullability and > precision > 3. support table comments and column comments > 4. support table constraint PRIMARY KEY and UNIQUE > 5. support table properties using key-value pairs > 6. support partitioned by > 7. support computed column > 8. support from-field and from-source timestamp extractors > 9. support PERIODIC-ASCENDING, PERIODIC-BOUNDED, FROM-SOURCE watermark > strategies. > 10. support a table property to allow explicit enforcement of > read/write(source/sink) permission of a table > > I try to put up the DDL grammar ( > > https://docs.google.com/document/d/1ug1-aVBSCxZQk58kR-yaK2ETCgL3zg0eDUVGCnW2V9E/edit?usp=sharing > ) > based on the MVP features above and the previous design docs. Please take a > look and comment on it. > > > Also, I summarize the future Improvement on CREATE TABLE as the followings: > > 1. support table update mode > 2. support data type nullability and precision > 3. support row/map/array data type > 4. support custom timestamp extractor and watermark strategy > 5. support schema derivation > 6. support system versioned temporal table > 7. support table index > > I suggest we first agree on the MVP feature list and the MVP grammar. And > then we can either continue the discussion of the future improvements here, > or create separate JIRAs for each item and discuss further in the JIRA. > What do you guys think? > > Shuyi > > On Fri, Dec 7, 2018 at 7:54 AM Timo Walther <twal...@apache.org> wrote: > > > Hi all, > > > > I think we are making good progress. Thanks for all the feedback so far. > > > > 3. Sources/Sinks: > > It seems that I can not find supporters for explicit SOURCE/SINK > > declaration so I'm fine with not using those keywords. > > @Fabian: Maybe we don't haven have to change the TableFactory interface > > but just provide some helper functions in the TableFactoryService. This > > would solve the availability problem, but the permission problem would > > still not be solved. If you are fine with it, we could introduce a > > property instead? > > > > 5. Schema declaration: > > @Lin: We should find an agreement on this as it requires changes to the > > TableFactory interface. We should minimize changes to this interface > > because it is user-facing. Especially, if format schema and table schema > > differ, the need for such a functionality is very important. Our goal is > > to connect to existing infrastructure. For example, if we are using Avro > > and the existing Avro format has enums but Flink SQL does not support > > enums, it would be helpful to let the Avro format derive a table schema. > > Otherwise your need to declare both schemas which leads to CREATE TABLE > > statements of 400 lines+. > > I think the mentioned query: > > CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro, > > format.schema-file = "/my/avrofile.avsc") > > is fine and should only be valid if the schema contains no non-computed > > columns. > > > > 7. Table Update Mode: > > After thinking about it again, I agree. The mode of the sinks can be > > derived from the query and the existence of a PRIMARY KEY declaration. > > But Fabian raised a very good point. How do we deal with sources? Shall > > we introduce a new keywords similar to WATERMARKS such that a > > upsert/retract flag is not part of the visible schema? > > > > 4a. How to mark a field as attribute? > > @Jark: Thanks for the explanation of the WATERMARK clause semantics. > > This is a nice way of marking existing fields. This sounds good to me. > > > > 4c) WATERMARK as constraint > > I'm fine with leaving the WATERMARK clause in the schema definition. > > > > 4d) Custom watermark strategies: > > I would already think about custom watermark strategies as the current > > descriptor design already supports this. ScalarFunction's don't work as > > a PeriodicWatermarkAssigner has different semantics. Why not simply > > entering the a full class name here as it is done in the current design? > > > > 4.b) Ingesting and writing timestamps to systems (like Kafka) > > @Fabian: Yes, your suggestion sounds good to me. This behavior would be > > similar to our current `timestamps: from-source` design. > > > > Once our discussion has found a conclusion, I would like to volunteer > > and summarize the outcome of this mailing thread. It nicely aligns with > > the update work on the connector improvements document (that I wanted to > > do anyway) and the ongoing external catalog discussion. Furthermore, I > > would also want to propose how to change existing interfaces by keeping > > the DDL, connector improvements, and external catalog support in mind. > > Would that be ok for you? > > > > Thanks, > > Timo > > > > > > > > Am 07.12.18 um 14:48 schrieb Fabian Hueske: > > > Hi all, > > > > > > Thanks for the discussion. > > > I'd like to share my point of view as well. > > > > > > 4) Event-Time Attributes and Watermarks: > > > 4.a) I agree with Lin and Jark's proposal. Declaring a watermark on an > > > attribute declares it as an event-time attribute. > > > 4.b) Ingesting and writing timestamps to systems (like Kafka). We could > > use > > > a special function like (ts AS SYSTEMROWTIME()). This function will > > > indicate that we read the timestamp directly from the system (and not > the > > > data). We can also write the field back to the system when emitting the > > > table (violating the rule that computed fields are not emitted). > > > 4c) I would treat WATERMARK similar to a PRIMARY KEY or UNIQUE KEY > > > constraint and therefore keep it in the schema definition. > > > 4d) For custom watermark strategies, a simple expressions or > > > ScalarFunctions won't be sufficient. Sophisticated approaches could > > collect > > > histograms, etc. But I think we can leave that out for later. > > > > > > 3) SOURCE / SINK / BOTH > > > As you said, there are two things to consider here: permission and > > > availability of a TableSource/TableSink. > > > I think that neither should be a reason to add a keyword at such a > > > sensitive position. > > > However, I also see Timo's point that it would be good to know up-front > > how > > > a table can be used without trying to instantiate a TableSource/Sink > for > > a > > > query. > > > Maybe we can extend the TableFactory such that it provides information > > > about which sources/sinks it can provide. > > > > > > 7. Table Update Mode > > > Something that we definitely need to consider is how tables are > ingested, > > > i.e., append, retract or upsert. > > > Especially, since upsert and retraction need a meta-data column that > > > indicates whether an event is an insert (or upsert) or a delete change. > > > This column needs to be identified somehow, most likely as part of the > > > input format. Ideally, this column should not be part of the table > schema > > > (as it would be always true). > > > Emitting tables is not so much of an issue as the properties of the > table > > > tell use what to do (append-only/update, unique key y/n). > > > > > > Best, > > > Fabian > > > > > > > > > Am Fr., 7. Dez. 2018 um 10:39 Uhr schrieb Jark Wu <imj...@gmail.com>: > > > > > >> Hi Timo, > > >> > > >> Thanks for your quickly feedback! Here are some of my thoughts: > > >> > > >> Append, upserts, retract mode on sinks is also a very complex > problem. I > > >> think append/upserts/retract is the ability of a table, user do not > > need to > > >> specify a table is used for append or retraction or upsert. The query > > can > > >> choose which mode the sink is. If an unbounded groupby is inserted > into > > an > > >> append sink (the sink only implements/supports append), an exception > > can be > > >> thrown. A more complex problem is, if we want to write > > retractions/upserts > > >> to Kafka, how to encode the change flag (add or retract/delete) on the > > >> table? Maybe we should propose some protocal for the change flag > > encoding, > > >> but I don't have a clear idea about this right now. > > >> > > >> 3. Sources/Sinks: The source/sink tag is similar to the > > >> append/upsert/retract problem. Besides source/sink, actully we have > > stream > > >> source, stream sink, batch source, batch sink, and the stream sink > also > > >> include append/upsert/retract three modes. Should we put all the tags > on > > >> the CREATE TABLE? IMO, the table's ability is defined by the table > > itself, > > >> user do not need to specify it. If it is only a readable table, an > > >> exception can be thrown when write to it. As the source/sink tag can > be > > >> omitted in CREATE TABLE, could we skip it and only support CREATE > TABLE > > in > > >> the first version, and add it back in the future when we really need > > it? It > > >> keeps API compatible and make sure the MVP is what we consider > clearly. > > >> > > >> 4a. How to mark a field as attribute? > > >> The watermark definition includes two parts: use which field as time > > >> attribute and use what generate strategy. > > >> When we want to mark `ts` field as attribute: WATERMARK FOR `ts` AS > > OFFSET > > >> '5' SECOND. > > >> If we have a POJO{id, user, ts} field named "pojo", we can mark it > like > > >> this: WATERMARK FOR pojo.ts AS OFFSET '5' SECOND > > >> > > >> 4b. timestamp write to Kafka message header > > >> Even though we can define multiple time attribute on a table, only one > > time > > >> attribute can be actived/used in a query (in a stream). When we enable > > >> `writeTiemstamp`, the only attribute actived in the stream will be > > write to > > >> Kafka message header. What I mean the timestmap in StreamRecord is the > > time > > >> attribute in the stream. > > >> > > >> 4c. Yes. We introduced the WATERMARK keyword similar to the INDEX, > > PRIMARY > > >> KEY keywords. > > >> > > >> @Timo, Do you have any other advice or questions on the watermark > > syntax ? > > >> For example, the builtin strategy name: "BOUNDED WITH OFFSET" VS > > "OFFSET" > > >> VS ... > > >> > > >> > > >> Cheers, > > >> Jark > > >> > > >> On Fri, 7 Dec 2018 at 17:13, Lin Li <lincoln.8...@gmail.com> wrote: > > >> > > >>> Hi Timo, > > >>> Thanks for your feedback, here's some thoughts of mine: > > >>> > > >>> 3. Sources/Sinks: > > >>> "Let's assume an interactive CLI session, people should be able to > list > > >> all > > >>> source table and sink tables to know upfront if they can use an > INSERT > > >> INTO > > >>> here or not." > > >>> This requirement can be simply resolved by a document that list all > > >>> supported source/sink/both connectors and the sql-client can perform > a > > >>> quick check. It's only an implementation choice, not necessary for > the > > >>> syntax. > > >>> For connector implementation, a connector may implement one or some > or > > >> all > > >>> of the [Stream|Batch]Source/[Stream|Batch]Sink traits, we can derive > > the > > >>> availability for any give query without the SOURCE/SINk keywords or > > >>> specific table properties in WITH clause. > > >>> Since there's still indeterminacy, shall we skip these two keywords > for > > >> the > > >>> MVP DDL? We can make further discussion after users' feedback. > > >>> > > >>> 6. Partitioning and keys > > >>> Agree with you that raise the priority of table constraint and > > >> partitioned > > >>> table support for better connectivity to Hive and Kafka. I'll add > > >>> partitioned table syntax(compatible to hive) into the DDL Draft doc > > >>> later[1]. > > >>> > > >>> 5. Schema declaration > > >>> "if users want to declare computed columns they have a "schema" > > >> constraints > > >>> but without columns > > >>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro, > > >>> format.schema-file = "/my/avrofile.avsc") " > > >>> > > >>> From the point of my view, this ddl is invalid because the primary > key > > >>> constraint already references two columns but types unseen. > > >>> And Xuefu pointed a important matching problem, so let's put schema > > >>> derivation as a follow-up extension ? > > >>> > > >>> > > >>> > > >>> > > >>> Timo Walther <twal...@apache.org> 于2018年12月6日周四 下午6:05写道: > > >>> > > >>>> Hi everyone, > > >>>> > > >>>> great to have such a lively discussion. My next batch of feedback: > > >>>> > > >>>> @Jark: We don't need to align the descriptor approach with SQL. I'm > > >> open > > >>>> for different approaches as long as we can serve a broad set of use > > >>>> cases and systems. The descriptor approach was a first attempt to > > cover > > >>>> all aspects and connector/format characteristics. Just another > > example, > > >>>> that is missing in the DDL design: How can a user decide if append, > > >>>> retraction, or upserts should be used to sink data into the target > > >>>> system? Do we want to define all these improtant properties in the > big > > >>>> WITH property map? If yes, we are already close to the descriptor > > >>>> approach. Regarding the "standard way", most DDL languages have very > > >>>> custom syntax so there is not a real "standard". > > >>>> > > >>>> 3. Sources/Sinks: @Lin: If a table has both read/write access it can > > be > > >>>> created using a regular CREATE TABLE (omitting a specific > source/sink) > > >>>> declaration. Regarding the transition from source/sink to both, yes > we > > >>>> would need to update the a DDL and catalogs. But is this a problem? > > One > > >>>> also needs to add new queries that use the tables. @Xuefu: It is not > > >>>> only about security aspects. Especially for streaming use cases, not > > >>>> every connector can be used as a source easily. For example, a JDBC > > >> sink > > >>>> is easier than a JDBC source. Let's assume an interactive CLI > session, > > >>>> people should be able to list all source table and sink tables to > know > > >>>> upfront if they can use an INSERT INTO here or not. > > >>>> > > >>>> 6. Partitioning and keys: @Lin: I would like to include this in the > > >>>> design given that Hive integration and Kafka key support are in the > > >>>> making/are on our roadmap for this release. > > >>>> > > >>>> 5. Schema declaration: @Lin: You are right it is not conflicting. I > > >> just > > >>>> wanted to raise the point because if users want to declare computed > > >>>> columns they have a "schema" constraints but without columns. Are we > > ok > > >>>> with a syntax like ... > > >>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro, > > >>>> format.schema-file = "/my/avrofile.avsc") ? > > >>>> @Xuefu: Yes, you are right that an external schema might not excatly > > >>>> match but this is true for both directions: > > >>>> table schema "derives" format schema and format schema "derives" > table > > >>>> schema. > > >>>> > > >>>> 7. Hive compatibility: @Xuefu: I agree that Hive is popular but we > > >>>> should not just adopt everything from Hive as there syntax is very > > >>>> batch-specific. We should come up with a superset of historical and > > >>>> future requirements. Supporting Hive queries can be an intermediate > > >>>> layer on top of Flink's DDL. > > >>>> > > >>>> 4. Time attributes: @Lin: I'm fine with changing the > > TimestampExtractor > > >>>> interface as this is also important for better separation of > connector > > >>>> and table module [1]. However, I'm wondering about watermark > > >> generation. > > >>>> 4a. timestamps are in the schema twice: > > >>>> @Jark: "existing field is Long/Timestamp, we can just use it as > > >>>> rowtime": yes, but we need to mark a field as such an attribute. How > > >>>> does the syntax for marking look like? Also in case of timestamps > that > > >>>> are nested in the schema? > > >>>> > > >>>> 4b. how can we write out a timestamp into the message header?: > > >>>> I agree to simply ignore computed columns when writing out. This is > > >> like > > >>>> 'field-change: add' that I mentioned in the improvements document. > > >>>> @Jark: "then the timestmap in StreamRecord will be write to Kafka > > >>>> message header": Unfortunately, there is no timestamp in the stream > > >>>> record. Additionally, multiple time attributes can be in a schema. > So > > >> we > > >>>> need a constraint that tells the sink which column to use (possibly > > >>>> computed as well)? > > >>>> > > >>>> 4c. separate all time attribute concerns into a special clause next > to > > >>>> the regular schema? > > >>>> @Jark: I don't have a strong opinion on this. I just have the > feeling > > >>>> that the "schema part" becomes quite messy because the actual schema > > >>>> with types and fields is accompanied by so much metadata about > > >>>> timestamps, watermarks, keys,... and we would need to introduce a > new > > >>>> WATERMARK keyword within a schema that was close to standard up to > > this > > >>>> point. > > >>>> > > >>>> Thanks everyone, > > >>>> Timo > > >>>> > > >>>> [1] https://issues.apache.org/jira/browse/FLINK-9461 > > >>>> > > >>>> > > >>>> > > >>>> Am 06.12.18 um 07:08 schrieb Jark Wu: > > >>>>> Hi Timo, > > >>>>> > > >>>>> Thank you for the valuable feedbacks. > > >>>>> > > >>>>> First of all, I think we don't need to align the SQL functionality > to > > >>>>> Descriptor. Because SQL is a more standard API, we should be as > > >>> cautious > > >>>> as > > >>>>> possible to extend the SQL syntax. If something can be done in a > > >>> standard > > >>>>> way, we shouldn't introduce something new. > > >>>>> > > >>>>> Here are some of my thoughts: > > >>>>> > > >>>>> 1. Scope: Agree. > > >>>>> 2. Constraints: Agree. > > >>>>> 4. Time attributes: > > >>>>> 4a. timestamps are in the schema twice. > > >>>>> If an existing field is Long/Timestamp, we can just use it as > > >>>> rowtime, > > >>>>> no twice defined. If it is not a Long/Timestamp, we use computed > > >> column > > >>>> to > > >>>>> get an expected timestamp column to be rowtime, is this what you > mean > > >>>>> defined twice? But I don't think it is a problem, but an > advantages, > > >>>>> because it is easy to use, user do not need to consider whether to > > >>>> "replace > > >>>>> the existing column" or "add a new column", he will not be confused > > >>>> what's > > >>>>> the real schema is, what's the index of rowtime in the schema? > > >>> Regarding > > >>>> to > > >>>>> the optimization, even if timestamps are in schema twice, when the > > >>>> original > > >>>>> timestamp is never used in query, then the projection pushdown > > >>>> optimization > > >>>>> can cut this field as early as possible, which is exactly the same > as > > >>>>> "replacing the existing column" in runtime. > > >>>>> > > >>>>> 4b. how can we write out a timestamp into the message header? > > >>>>> That's a good point. I think computed column is just a > virtual > > >>>> column > > >>>>> on table which is only relative to reading. If we want to write to > a > > >>>> table > > >>>>> with computed column defined, we only need to provide the columns > > >>> except > > >>>>> computed columns (see SQL Server [1]). The computed column is > ignored > > >>> in > > >>>>> the insert statement. Get back to the question, how can we write > out > > >> a > > >>>>> timestamp into the message header? IMO, we can provide a > > >> configuration > > >>> to > > >>>>> support this, such as `kafka.writeTimestamp=true`, then the > timestmap > > >>> in > > >>>>> StreamRecord will be write to Kafka message header. What do you > > >> think? > > >>>>> 4c. separate all time attribute concerns into a special > clause > > >>> next > > >>>> to > > >>>>> the regular schema? > > >>>>> Separating watermark into a special clause similar to > > >> PARTITIONED > > >>>> BY is > > >>>>> also a good idea. Conceptually, it's fine to put watermark in > schema > > >>> part > > >>>>> or out schema part. But if we want to support multiple watermark > > >>>>> definition, maybe it would be better to put it in schema part. It > is > > >>>>> similar to Index Definition that we can define several indexes on a > > >>> table > > >>>>> in schema part. > > >>>>> > > >>>>> 4d. How can people come up with a custom watermark strategy? > > >>>>> In most cases, the built-in strategy can works good. If we > need > > >> a > > >>>>> custom one, we can use a scalar function which restrict to only > > >> return > > >>> a > > >>>>> nullable Long, and use it in SQL like: WATERMARK for rowtime AS > > >>>>> watermarkUdf(a, b, c). The `watermarkUdf` is a user-defined scalar > > >>>> function > > >>>>> accepts 3 parameters and return a nullable Long which can be used > as > > >>>>> punctuated watermark assigner. Another choice is implementing a > class > > >>>>> extending the > > >>>>> `org.apache.flink.table.sources.wmstrategies.WatermarkStrategy` and > > >> use > > >>>> it > > >>>>> in SQL: WATERMARK for rowtime AS 'com.my.MyWatermarkStrategy'. But > if > > >>>>> scalar function can cover the requirements here, I would prefer it > > >>> here, > > >>>>> because it keeps standard compliant. BTW, this feature is not in > MVP, > > >>> we > > >>>>> can discuss it more depth in the future when we need it. > > >>>>> > > >>>>> 5. Schema declaration: > > >>>>> I like the proposal to omit the schema if we can get the schema > from > > >>>>> external storage or something schema file. Actually, we have > already > > >>>>> encountered this requirement in out company. > > >>>>> > > >>>>> > > >>>>> +1 to @Xuefu that we should be as close as possible to Hive syntax > > >>> while > > >>>>> keeping SQL ANSI standard. This will make it more acceptable and > > >> reduce > > >>>> the > > >>>>> learning cost for user. > > >>>>> > > >>>>> [1]: > > >>>>> > > >> > > > https://docs.microsoft.com/en-us/sql/relational-databases/partitions/create-partitioned-tables-and-indexes?view=sql-server-2017 > > >>>>> Best, > > >>>>> Jark > > >>>>> > > >>>>> On Thu, 6 Dec 2018 at 12:09, Zhang, Xuefu <xuef...@alibaba-inc.com > > > > >>>> wrote: > > >>>>>> Hi Timo/Shuyi/Lin, > > >>>>>> > > >>>>>> Thanks for the discussions. It seems that we are converging to > > >>> something > > >>>>>> meaningful. Here are some of my thoughts: > > >>>>>> > > >>>>>> 1. +1 on MVP DDL > > >>>>>> 3. Markers for source or sink seem more about permissions on > tables > > >>> that > > >>>>>> belong to a security component. Unless the table is created > > >>> differently > > >>>>>> based on source, sink, or both, it doesn't seem necessary to use > > >> these > > >>>>>> keywords to enforce permissions. > > >>>>>> 5. It might be okay if schema declaration is always needed. While > > >>> there > > >>>>>> might be some duplication sometimes, it's not always true. For > > >>> example, > > >>>>>> external schema may not be exactly matching Flink schema. For > > >>> instance, > > >>>>>> data types. Even if so, perfect match is not required. For > instance, > > >>> the > > >>>>>> external schema file may evolve while table schema in Flink may > stay > > >>>>>> unchanged. A responsible reader should be able to scan the file > > >> based > > >>> on > > >>>>>> file schema and return the data based on table schema. > > >>>>>> > > >>>>>> Other aspects: > > >>>>>> > > >>>>>> 7. Hive compatibility. Since Flink SQL will soon be able to > operate > > >> on > > >>>>>> Hive metadata and data, it's an add-on benefit if we can be > > >> compatible > > >>>> with > > >>>>>> Hive syntax/semantics while following ANSI standard. At least we > > >>> should > > >>>> be > > >>>>>> as close as possible. Hive DDL can found at > > >>>>>> > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL > > >>>>>> > > >>>>>> Thanks, > > >>>>>> Xuefu > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> ------------------------------------------------------------------ > > >>>>>> Sender:Lin Li <lincoln.8...@gmail.com> > > >>>>>> Sent at:2018 Dec 6 (Thu) 10:49 > > >>>>>> Recipient:dev <dev@flink.apache.org> > > >>>>>> Subject:Re: [DISCUSS] Flink SQL DDL Design > > >>>>>> > > >>>>>> Hi Timo and Shuyi, > > >>>>>> thanks for your feedback. > > >>>>>> > > >>>>>> 1. Scope > > >>>>>> agree with you we should focus on the MVP DDL first. > > >>>>>> > > >>>>>> 2. Constraints > > >>>>>> yes, this can be a follow-up issue. > > >>>>>> > > >>>>>> 3. Sources/Sinks > > >>>>>> If a TABLE has both read/write access requirements, should we > > >> declare > > >>> it > > >>>>>> using > > >>>>>> `CREATE [SOURCE_SINK|BOTH] TABLE tableName ...` ? A further > > >> question, > > >>>> if a > > >>>>>> TABLE > > >>>>>> t1 firstly declared as read only (as a source table), then for > some > > >>> new > > >>>>>> requirements > > >>>>>> t1 will change to a sink table, in this case we need updating > both > > >>> the > > >>>> DDL > > >>>>>> and catalogs. > > >>>>>> Further more, let's think about the BATCH query, update one table > > >>>> in-place > > >>>>>> can be a common case. > > >>>>>> e.g., > > >>>>>> ``` > > >>>>>> CREATE TABLE t1 ( > > >>>>>> col1 varchar, > > >>>>>> col2 int, > > >>>>>> col3 varchar > > >>>>>> ... > > >>>>>> ); > > >>>>>> > > >>>>>> INSERT [OVERWRITE] TABLE t1 > > >>>>>> AS > > >>>>>> SELECT > > >>>>>> (some computing ...) > > >>>>>> FROM t1; > > >>>>>> ``` > > >>>>>> So, let's forget these SOURCE/SINK keywords in DDL. For the > > >> validation > > >>>>>> purpose, we can find out other ways. > > >>>>>> > > >>>>>> 4. Time attributes > > >>>>>> As Shuyi mentioned before, there exists an > > >>>>>> `org.apache.flink.table.sources.tsextractors.TimestampExtractor` > for > > >>>> custom > > >>>>>> defined time attributes usage, but this expression based class is > > >> more > > >>>>>> friendly for table api not the SQL. > > >>>>>> ``` > > >>>>>> /** > > >>>>>> * Provides the an expression to extract the timestamp for a > > >> rowtime > > >>>>>> attribute. > > >>>>>> */ > > >>>>>> abstract class TimestampExtractor extends FieldComputer[Long] with > > >>>>>> Serializable { > > >>>>>> > > >>>>>> /** Timestamp extractors compute the timestamp as Long. */ > > >>>>>> override def getReturnType: TypeInformation[Long] = > > >>>>>> Types.LONG.asInstanceOf[TypeInformation[Long]] > > >>>>>> } > > >>>>>> ``` > > >>>>>> BTW, I think both the Scalar function and the TimestampExtractor > are > > >>>>>> expressing computing logic, the TimestampExtractor has no more > > >>>> advantage in > > >>>>>> SQL scenarios. > > >>>>>> > > >>>>>> > > >>>>>> 6. Partitioning and keys > > >>>>>> Primary Key is included in Constraint part, and partitioned table > > >>>> support > > >>>>>> can be another topic later. > > >>>>>> > > >>>>>> 5. Schema declaration > > >>>>>> Agree with you that we can do better schema derivation for user > > >>>>>> convenience, but this is not conflict with the syntax. > > >>>>>> Table properties can carry any useful informations both for the > > >> users > > >>>> and > > >>>>>> the framework, I like your `contract name` proposal, > > >>>>>> e.g., `WITH (format.type = avro)`, the framework can recognize > some > > >>>>>> `contract name` like `format.type`, `connector.type` and etc. > > >>>>>> And also derive the table schema from an existing schema file can > be > > >>>> handy > > >>>>>> especially one with too many table columns. > > >>>>>> > > >>>>>> Regards, > > >>>>>> Lin > > >>>>>> > > >>>>>> > > >>>>>> Timo Walther <twal...@apache.org> 于2018年12月5日周三 下午10:40写道: > > >>>>>> > > >>>>>>> Hi Jark and Shuyi, > > >>>>>>> > > >>>>>>> thanks for pushing the DDL efforts forward. I agree that we > should > > >>> aim > > >>>>>>> to combine both Shuyi's design and your design. > > >>>>>>> > > >>>>>>> Here are a couple of concerns that I think we should address in > the > > >>>>>> design: > > >>>>>>> 1. Scope: Let's focuses on a MVP DDL for CREATE TABLE statements > > >>> first. > > >>>>>>> I think this topic has already enough potential for long > > >> discussions > > >>>> and > > >>>>>>> is very helpful for users. We can discuss CREATE VIEW and CREATE > > >>>>>>> FUNCTION afterwards as they are not related to each other. > > >>>>>>> > > >>>>>>> 2. Constraints: I think we should consider things like > nullability, > > >>>>>>> VARCHAR length, and decimal scale and precision in the future as > > >> they > > >>>>>>> allow for nice optimizations. However, since both the translation > > >> and > > >>>>>>> runtime operators do not support those features. I would not > > >>> introduce > > >>>> a > > >>>>>>> arbitrary default value but omit those parameters for now. This > can > > >>> be > > >>>> a > > >>>>>>> follow-up issue once the basic DDL has been merged. > > >>>>>>> > > >>>>>>> 3. Sources/Sinks: We had a discussion about CREATE TABLE vs > CREATE > > >>>>>>> [SOURCE|SINK|] TABLE before. In my opinion we should allow for > > >> these > > >>>>>>> explicit declaration because in most production scenarios, teams > > >> have > > >>>>>>> strict read/write access requirements. For example, a data > science > > >>> team > > >>>>>>> should only consume from a event Kafka topic but should not > > >>> accidently > > >>>>>>> write back to the single source of truth. > > >>>>>>> > > >>>>>>> 4. Time attributes: In general, I like your computed columns > > >> approach > > >>>>>>> because it makes defining a rowtime attributes transparent and > > >>> simple. > > >>>>>>> However, there are downsides that we should discuss. > > >>>>>>> 4a. Jarks current design means that timestamps are in the schema > > >>> twice. > > >>>>>>> The design that is mentioned in [1] makes this more flexible as > it > > >>>>>>> either allows to replace an existing column or add a computed > > >> column. > > >>>>>>> 4b. We need to consider the zoo of storage systems that is out > > >> there > > >>>>>>> right now. Take Kafka as an example, how can we write out a > > >> timestamp > > >>>>>>> into the message header? We need to think of a reverse operation > > >> to a > > >>>>>>> computed column. > > >>>>>>> 4c. Does defining a watermark really fit into the schema part of > a > > >>>>>>> table? Shouldn't we separate all time attribute concerns into a > > >>> special > > >>>>>>> clause next to the regular schema, similar how PARTITIONED BY > does > > >> it > > >>>> in > > >>>>>>> Hive? > > >>>>>>> 4d. How can people come up with a custom watermark strategy? I > > >> guess > > >>>>>>> this can not be implemented in a scalar function and would > require > > >>> some > > >>>>>>> new type of UDF? > > >>>>>>> > > >>>>>>> 6. Partitioning and keys: Another question that the DDL design > > >> should > > >>>>>>> answer is how do we express primary keys (for upserts), > > >> partitioning > > >>>>>>> keys (for Hive, Kafka message keys). All part of the table > schema? > > >>>>>>> > > >>>>>>> 5. Schema declaration: I find it very annoying that we want to > > >> force > > >>>>>>> people to declare all columns and types again even though this is > > >>>>>>> usually already defined in some company-wide format. I know that > > >>>> catalog > > >>>>>>> support will greatly improve this. But if no catalog is used, > > >> people > > >>>>>>> need to manually define a schema with 50+ fields in a Flink DDL. > > >>> What I > > >>>>>>> actually promoted having two ways of reading data: > > >>>>>>> > > >>>>>>> 1. Either the format derives its schema from the table schema. > > >>>>>>> CREATE TABLE (col INT) WITH (format.type = avro) > > >>>>>>> > > >>>>>>> 2. Or the table schema can be omitted and the format schema > defines > > >>> the > > >>>>>>> table schema (+ time attributes). > > >>>>>>> CREATE TABLE WITH (format.type = avro, format.schema-file = > > >>>>>>> "/my/avrofile.avsc") > > >>>>>>> > > >>>>>>> Please let me know what you think about each item. I will try to > > >>>>>>> incorporate your feedback in [1] this week. > > >>>>>>> > > >>>>>>> Regards, > > >>>>>>> Timo > > >>>>>>> > > >>>>>>> [1] > > >>>>>>> > > >>>>>>> > > >> > > > https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf > > >>>>>>> Am 05.12.18 um 13:01 schrieb Jark Wu: > > >>>>>>>> Hi Shuyi, > > >>>>>>>> > > >>>>>>>> It's exciting to see we can make such a great progress here. > > >>>>>>>> > > >>>>>>>> Regarding to the watermark: > > >>>>>>>> > > >>>>>>>> Watermarks can be defined on any columns (including > > >> computed-column) > > >>>> in > > >>>>>>>> table schema. > > >>>>>>>> The computed column can be computed from existing columns using > > >>>> builtin > > >>>>>>>> functions and *UserDefinedFunctions* (ScalarFunction). > > >>>>>>>> So IMO, it can work out for almost all the scenarios not only > > >> common > > >>>>>>>> scenarios. > > >>>>>>>> > > >>>>>>>> I don't think using a `TimestampExtractor` to support custom > > >>> timestamp > > >>>>>>>> extractor in SQL is a good idea. Because `TimestampExtractor` > > >>>>>>>> is not a SQL standard function. If we support > `TimestampExtractor` > > >>> in > > >>>>>>> SQL, > > >>>>>>>> do we need to support CREATE FUNCTION for `TimestampExtractor`? > > >>>>>>>> I think `ScalarFunction` can do the same thing with > > >>>>>> `TimestampExtractor` > > >>>>>>>> but more powerful and standard. > > >>>>>>>> > > >>>>>>>> The core idea of the watermark definition syntax is that the > > >> schema > > >>>>>> part > > >>>>>>>> defines all the columns of the table, it is exactly what the > query > > >>>>>> sees. > > >>>>>>>> The watermark part is something like a primary key definition or > > >>>>>>> constraint > > >>>>>>>> on SQL Table, it has no side effect on the schema, only defines > > >> what > > >>>>>>>> watermark strategy is and makes which field as the rowtime > > >> attribute > > >>>>>>> field. > > >>>>>>>> If the rowtime field is not in the existing fields, we can use > > >>>> computed > > >>>>>>>> column > > >>>>>>>> to generate it from other existing fields. The Descriptor > Pattern > > >>> API > > >>>>>> [1] > > >>>>>>>> is very useful when writing a Table API job, but is not > > >>> contradictory > > >>>>>> to > > >>>>>>>> the > > >>>>>>>> Watermark DDL from my perspective. > > >>>>>>>> > > >>>>>>>> [1]: > > >>>>>>>> > > >> > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes > > >>>>>>>> . > > >>>>>>>> > > >>>>>>>> Best, > > >>>>>>>> Jark > > >>>>>>>> > > >>>>>>>> On Wed, 5 Dec 2018 at 17:58, Shuyi Chen <suez1...@gmail.com> > > >> wrote: > > >>>>>>>>> Hi Jark and Shaoxuan, > > >>>>>>>>> > > >>>>>>>>> Thanks a lot for the summary. I think we are making great > > >> progress > > >>>>>> here. > > >>>>>>>>> Below are my thoughts. > > >>>>>>>>> > > >>>>>>>>> *(1) watermark definition > > >>>>>>>>> IMO, it's better to keep it consistent with the rowtime > > >> extractors > > >>>> and > > >>>>>>>>> watermark strategies defined in > > >>>>>>>>> > > >>>>>>>>> > > >> > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes > > >>>>>>>>> . > > >>>>>>>>> Using built-in functions seems to be too much for most of the > > >>> common > > >>>>>>>>> scenarios. > > >>>>>>>>> *(2) CREATE SOURCE/SINK TABLE or CREATE TABLE > > >>>>>>>>> Actually, I think we can put the source/sink type info into the > > >>> table > > >>>>>>>>> properties, so we can use CREATE TABLE. > > >>>>>>>>> (3) View DDL with properties > > >>>>>>>>> We can remove the view properties section now for the MVP and > add > > >>> it > > >>>>>>> back > > >>>>>>>>> later if needed. > > >>>>>>>>> (4) Type Definition > > >>>>>>>>> I agree we can put the type length or precision into future > > >>> versions. > > >>>>>> As > > >>>>>>>>> for the grammar difference, currently, I am using the grammar > in > > >>>>>> Calcite > > >>>>>>>>> type DDL, but since we'll extend the parser in Flink, so we can > > >>>>>>> definitely > > >>>>>>>>> change if needed. > > >>>>>>>>> > > >>>>>>>>> Shuyi > > >>>>>>>>> > > >>>>>>>>> On Tue, Dec 4, 2018 at 10:48 PM Jark Wu <imj...@gmail.com> > > >> wrote: > > >>>>>>>>>> Hi Shaoxuan, > > >>>>>>>>>> > > >>>>>>>>>> Thanks for pointing that out. Yes, the source/sink tag on > create > > >>>>>> table > > >>>>>>> is > > >>>>>>>>>> the another major difference. > > >>>>>>>>>> > > >>>>>>>>>> Summarize the main differences again: > > >>>>>>>>>> > > >>>>>>>>>> *(1) watermark definition > > >>>>>>>>>> *(2) CREATE SOURCE/SINK TABLE or CREATE TABLE > > >>>>>>>>>> (3) View DDL with properties > > >>>>>>>>>> (4) Type Definition > > >>>>>>>>>> > > >>>>>>>>>> Best, > > >>>>>>>>>> Jark > > >>>>>>>>>> > > >>>>>>>>>> On Wed, 5 Dec 2018 at 14:08, Shaoxuan Wang < > wshaox...@gmail.com > > >>>>>>> wrote: > > >>>>>>>>>>> Hi Jark, > > >>>>>>>>>>> Thanks for the summary. Your plan for the 1st round > > >>> implementation > > >>>>>> of > > >>>>>>>>> DDL > > >>>>>>>>>>> looks good to me. > > >>>>>>>>>>> Have we reached the agreement on simplifying/unifying "create > > >>>>>>>>>> [source/sink] > > >>>>>>>>>>> table" to "create table"? "Watermark definition" and "create > > >>> table" > > >>>>>>> are > > >>>>>>>>>> the > > >>>>>>>>>>> major obstacles on the way to merge two design proposals > FMPOV. > > >>>>>>> @Shuyi, > > >>>>>>>>>> It > > >>>>>>>>>>> would be great if you can spend time and respond to these two > > >>> parts > > >>>>>>>>>> first. > > >>>>>>>>>>> Regards, > > >>>>>>>>>>> Shaoxuan > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> On Wed, Dec 5, 2018 at 12:20 PM Jark Wu <imj...@gmail.com> > > >>> wrote: > > >>>>>>>>>>>> Hi Shuyi, > > >>>>>>>>>>>> > > >>>>>>>>>>>> It seems that you have reviewed the DDL doc [1] that Lin > and I > > >>>>>>>>> drafted. > > >>>>>>>>>>>> This doc covers all the features running in Alibaba. > > >>>>>>>>>>>> But some of features might be not needed in the first > version > > >> of > > >>>>>>>>> Flink > > >>>>>>>>>>> SQL > > >>>>>>>>>>>> DDL. > > >>>>>>>>>>>> > > >>>>>>>>>>>> So my suggestion would be to focus on the MVP DDLs and reach > > >>>>>>>>> agreement > > >>>>>>>>>>> ASAP > > >>>>>>>>>>>> based on the DDL draft [1] and the DDL design [2] Shuyi > > >>> proposed. > > >>>>>>>>>>>> And we can discuss on the main differences one by one. > > >>>>>>>>>>>> > > >>>>>>>>>>>> The following is the MVP DDLs should be included in the > first > > >>>>>> version > > >>>>>>>>>> in > > >>>>>>>>>>> my > > >>>>>>>>>>>> opinion (feedbacks are welcome): > > >>>>>>>>>>>> (1) Table DDL: > > >>>>>>>>>>>> (1.1) Type definition > > >>>>>>>>>>>> (1.2) computed column definition > > >>>>>>>>>>>> (1.3) watermark definition > > >>>>>>>>>>>> (1.4) with properties > > >>>>>>>>>>>> (1.5) table constraint (primary key/unique) > > >>>>>>>>>>>> (1.6) column nullability (nice to have) > > >>>>>>>>>>>> (2) View DDL > > >>>>>>>>>>>> (3) Function DDL > > >>>>>>>>>>>> > > >>>>>>>>>>>> The main differences from two DDL docs (sth maybe missed, > > >>> welcome > > >>>>>> to > > >>>>>>>>>>> point > > >>>>>>>>>>>> out): > > >>>>>>>>>>>> *(1.3) watermark*: this is the main and the most important > > >>>>>>>>> difference, > > >>>>>>>>>> it > > >>>>>>>>>>>> would be great if @Timo Walther <twal...@apache.org> > @Fabian > > >>>>>> Hueske > > >>>>>>>>>>>> <fhue...@gmail.com> give some feedbacks. > > >>>>>>>>>>>> (1.1) Type definition: > > >>>>>>>>>>>> (a) Should VARCHAR carry a length, e.g. > VARCHAR(128) > > ? > > >>>>>>>>>>>> In most cases, the varchar length is not used > > >>> because > > >>>>>>> they > > >>>>>>>>>> are > > >>>>>>>>>>>> stored as String in Flink. But it can be used to optimize in > > >> the > > >>>>>>>>> future > > >>>>>>>>>>> if > > >>>>>>>>>>>> we know the column is a fixed length VARCHAR. > > >>>>>>>>>>>> So IMO, we can support VARCHAR with length in > > the > > >>>>>> future, > > >>>>>>>>>> and > > >>>>>>>>>>>> just VARCHAR in this version. > > >>>>>>>>>>>> (b) Should DECIMAL support custom scale and > > precision, > > >>>> e.g. > > >>>>>>>>>>>> DECIMAL(12, 5)? > > >>>>>>>>>>>> If we clearly know the scale and precision of > > the > > >>>>>>> Decimal, > > >>>>>>>>>> we > > >>>>>>>>>>>> can have some optimization on serialization/deserialization. > > >>> IMO, > > >>>>>> we > > >>>>>>>>>> can > > >>>>>>>>>>>> support just support DECIMAL in this version, > > >>>>>>>>>>>> which means DECIMAL(38, 18) as default. And > > >> support > > >>>>>>> custom > > >>>>>>>>>>> scale > > >>>>>>>>>>>> and precision in the future. > > >>>>>>>>>>>> (2) View DDL: Do we need WITH properties in View DDL > > >>> (proposed > > >>>> in > > >>>>>>>>>>> doc[2])? > > >>>>>>>>>>>> What are the properties on the view used for? > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> The features could be supported and discussed in the future: > > >>>>>>>>>>>> (1) period definition on table > > >>>>>>>>>>>> (2) Type DDL > > >>>>>>>>>>>> (3) Index DDL > > >>>>>>>>>>>> (4) Library DDL > > >>>>>>>>>>>> (5) Drop statement > > >>>>>>>>>>>> > > >>>>>>>>>>>> [1] Flink DDL draft by Lin and Jark: > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >> > > > https://docs.google.com/document/d/1o16jC-AxnZoxMfHQptkKQkSC6ZDDBRhKg6gm8VGnY-k/edit# > > >>>>>>>>>>>> [2] Flink SQL DDL design by Shuyi: > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >> > > > https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit# > > >>>>>>>>>>>> Cheers, > > >>>>>>>>>>>> Jark > > >>>>>>>>>>>> > > >>>>>>>>>>>> On Thu, 29 Nov 2018 at 16:13, Shaoxuan Wang < > > >>> wshaox...@gmail.com> > > >>>>>>>>>> wrote: > > >>>>>>>>>>>>> Sure Shuyu, > > >>>>>>>>>>>>> What I hope is that we can reach an agreement on DDL gramma > > >> as > > >>>>>> soon > > >>>>>>>>>> as > > >>>>>>>>>>>>> possible. There are a few differences between your proposal > > >> and > > >>>>>>>>> ours. > > >>>>>>>>>>>> Once > > >>>>>>>>>>>>> Lin and Jark propose our design, we can quickly discuss on > > >> the > > >>>>>>>>> those > > >>>>>>>>>>>>> differences, and see how far away towards a unified design. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> WRT the external catalog, I think it is an orthogonal > topic, > > >> we > > >>>>>> can > > >>>>>>>>>>>> design > > >>>>>>>>>>>>> it in parallel. I believe @Xuefu, @Bowen are already > working > > >>> on. > > >>>>>> We > > >>>>>>>>>>>>> should/will definitely involve them to review the final > > >> design > > >>> of > > >>>>>>>>> DDL > > >>>>>>>>>>>>> implementation. I would suggest that we should give it a > > >> higher > > >>>>>>>>>>> priority > > >>>>>>>>>>>> on > > >>>>>>>>>>>>> the DDL implementation, as it is a crucial component for > the > > >>> user > > >>>>>>>>>>>>> experience of SQL_CLI. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>> Shaoxuan > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Thu, Nov 29, 2018 at 6:56 AM Shuyi Chen < > > >> suez1...@gmail.com > > >>>>>>>>>> wrote: > > >>>>>>>>>>>>>> Thanks a lot, Shaoxuan, Jack and Lin. We should definitely > > >>>>>>>>>>> collaborate > > >>>>>>>>>>>>>> here, we have also our own DDL implementation running in > > >>>>>>>>> production > > >>>>>>>>>>> for > > >>>>>>>>>>>>>> almost 2 years at Uber. With the joint experience from > both > > >>>>>>>>>>> companies, > > >>>>>>>>>>>> we > > >>>>>>>>>>>>>> can definitely make the Flink SQL DDL better. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> As @shaoxuan suggest, Jark can come up with a doc that > talks > > >>>>>>>>> about > > >>>>>>>>>>> the > > >>>>>>>>>>>>>> current DDL design in Alibaba, and we can discuss and > merge > > >>> them > > >>>>>>>>>> into > > >>>>>>>>>>>>> one, > > >>>>>>>>>>>>>> make it as a FLIP, and plan the tasks for implementation. > > >>> Also, > > >>>>>>>>> we > > >>>>>>>>>>>> should > > >>>>>>>>>>>>>> take into account the new external catalog effort in the > > >>> design. > > >>>>>>>>>> What > > >>>>>>>>>>>> do > > >>>>>>>>>>>>>> you guys think? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Shuyi > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Wed, Nov 28, 2018 at 6:45 AM Jark Wu <imj...@gmail.com > > > > >>>>>>>>> wrote: > > >>>>>>>>>>>>>>> Hi Shaoxuan, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I think summarizing it into a google doc is a good idea. > We > > >>>>>>>>> will > > >>>>>>>>>>>>> prepare > > >>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>> in the next few days. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>> Jark > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Shaoxuan Wang <wshaox...@gmail.com> 于2018年11月28日周三 > > >> 下午9:17写道: > > >>>>>>>>>>>>>>>> Hi Lin and Jark, > > >>>>>>>>>>>>>>>> Thanks for sharing those details. Can you please > consider > > >>>>>>>>>>>> summarizing > > >>>>>>>>>>>>>>> your > > >>>>>>>>>>>>>>>> DDL design into a google doc. > > >>>>>>>>>>>>>>>> We can still continue the discussions on Shuyi's > proposal. > > >>>>>>>>> But > > >>>>>>>>>>>>> having a > > >>>>>>>>>>>>>>>> separate google doc will be easy for the DEV to > > >>>>>>>>>>>>>>> understand/comment/discuss > > >>>>>>>>>>>>>>>> on your proposed DDL implementation. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>>> Shaoxuan > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> On Wed, Nov 28, 2018 at 7:39 PM Jark Wu < > imj...@gmail.com > > >>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>> Hi Shuyi, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Thanks for bringing up this discussion and the awesome > > >>>>>>>>> work! > > >>>>>>>>>> I > > >>>>>>>>>>>> have > > >>>>>>>>>>>>>>> left > > >>>>>>>>>>>>>>>>> some comments in the doc. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> I want to share something more about the watermark > > >>>>>>>>> definition > > >>>>>>>>>>>>> learned > > >>>>>>>>>>>>>>>> from > > >>>>>>>>>>>>>>>>> Alibaba. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> 1. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Table should be able to accept multiple watermark > > >>>>>>>>>>> definition. > > >>>>>>>>>>>>>>>>> Because a table may have more than one rowtime > > >> field. > > >>>>>>>>> For > > >>>>>>>>>>>>> example, > > >>>>>>>>>>>>>>> one > > >>>>>>>>>>>>>>>>> rowtime field is from existing field but missing > in > > >>> some > > >>>>>>>>>>>>> records, > > >>>>>>>>>>>>>>>>> another > > >>>>>>>>>>>>>>>>> is the ingestion timestamp in Kafka but not very > > >>>>>>>>> accurate. > > >>>>>>>>>>> In > > >>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>> case, > > >>>>>>>>>>>>>>>>> user may define two rowtime fields with > watermarks > > >> in > > >>>>>>>>> the > > >>>>>>>>>>>> Table > > >>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>> choose > > >>>>>>>>>>>>>>>>> one in different situation. > > >>>>>>>>>>>>>>>>> 2. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Watermark stragety always work with rowtime field > > >>>>>>>>>> together. > > >>>>>>>>>>>>>>>>> Based on the two points metioned above, I think we > should > > >>>>>>>>>>> combine > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> watermark strategy and rowtime field selection (i.e. > > >> which > > >>>>>>>>>>>> existing > > >>>>>>>>>>>>>>> field > > >>>>>>>>>>>>>>>>> used to generate watermark) in one clause, so that we > can > > >>>>>>>>>>> define > > >>>>>>>>>>>>>>> multiple > > >>>>>>>>>>>>>>>>> watermarks in one Table. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Here I will share the watermark syntax used in Alibaba > > >>>>>>>>>> (simply > > >>>>>>>>>>>>>>> modified): > > >>>>>>>>>>>>>>>>> watermarkDefinition: > > >>>>>>>>>>>>>>>>> WATERMARK [watermarkName] FOR <rowtime_field> AS > > >>>>>>>>> wm_strategy > > >>>>>>>>>>>>>>>>> wm_strategy: > > >>>>>>>>>>>>>>>>> BOUNDED WITH OFFSET 'string' timeUnit > > >>>>>>>>>>>>>>>>> | > > >>>>>>>>>>>>>>>>> ASCENDING > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> The “WATERMARK” keyword starts a watermark definition. > > >> The > > >>>>>>>>>>> “FOR” > > >>>>>>>>>>>>>>> keyword > > >>>>>>>>>>>>>>>>> defines which existing field used to generate > watermark, > > >>>>>>>>> this > > >>>>>>>>>>>> field > > >>>>>>>>>>>>>>>> should > > >>>>>>>>>>>>>>>>> already exist in the schema (we can use computed-column > > >> to > > >>>>>>>>>>> derive > > >>>>>>>>>>>>>> from > > >>>>>>>>>>>>>>>>> other fields). The “AS” keyword defines watermark > > >> strategy, > > >>>>>>>>>>> such > > >>>>>>>>>>>> as > > >>>>>>>>>>>>>>>> BOUNDED > > >>>>>>>>>>>>>>>>> WITH OFFSET (covers almost all the requirements) and > > >>>>>>>>>> ASCENDING. > > >>>>>>>>>>>>>>>>> When the expected rowtime field does not exist in the > > >>>>>>>>> schema, > > >>>>>>>>>>> we > > >>>>>>>>>>>>> can > > >>>>>>>>>>>>>>> use > > >>>>>>>>>>>>>>>>> computed-column syntax to derive it from other existing > > >>>>>>>>>> fields > > >>>>>>>>>>>>> using > > >>>>>>>>>>>>>>>>> built-in functions or user defined functions. So the > > >>>>>>>>>>>>>> rowtime/watermark > > >>>>>>>>>>>>>>>>> definition doesn’t need to care about “field-change” > > >>>>>>>>> strategy > > >>>>>>>>>>>>>>>>> (replace/add/from-field). And the proctime field > > >> definition > > >>>>>>>>>> can > > >>>>>>>>>>>>> also > > >>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>> defined using computed-column. Such as pt as PROCTIME() > > >>>>>>>>> which > > >>>>>>>>>>>>>> defines a > > >>>>>>>>>>>>>>>>> proctime field named “pt” in the schema. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Looking forward to working with you guys! > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>> Jark Wu > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Lin Li <lincoln.8...@gmail.com> 于2018年11月28日周三 > 下午6:33写道: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> @Shuyi > > >>>>>>>>>>>>>>>>>> Thanks for the proposal! We have a simple DDL > > >>>>>>>>>> implementation > > >>>>>>>>>>>>>>> (extends > > >>>>>>>>>>>>>>>>>> Calcite's parser) which been running for almost two > > >> years > > >>>>>>>>>> on > > >>>>>>>>>>>>>>> production > > >>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>> works well. > > >>>>>>>>>>>>>>>>>> I think the most valued things we'd learned is keeping > > >>>>>>>>>>>> simplicity > > >>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>> standard compliance. > > >>>>>>>>>>>>>>>>>> Here's the approximate grammar, FYI > > >>>>>>>>>>>>>>>>>> CREATE TABLE > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> CREATE TABLE tableName( > > >>>>>>>>>>>>>>>>>> columnDefinition [, columnDefinition]* > > >>>>>>>>>>>>>>>>>> [ computedColumnDefinition [, > > >>>>>>>>>>>> computedColumnDefinition]* > > >>>>>>>>>>>>> ] > > >>>>>>>>>>>>>>>>>> [ tableConstraint [, tableConstraint]* ] > > >>>>>>>>>>>>>>>>>> [ tableIndex [, tableIndex]* ] > > >>>>>>>>>>>>>>>>>> [ PERIOD FOR SYSTEM_TIME ] > > >>>>>>>>>>>>>>>>>> [ WATERMARK watermarkName FOR rowTimeColumn > > AS > > >>>>>>>>>>>>>>>>>> withOffset(rowTimeColumn, offset) ] ) [ WITH ( > > >>>>>>>>>>> tableOption > > >>>>>>>>>>>> [ > > >>>>>>>>>>>>> , > > >>>>>>>>>>>>>>>>>> tableOption]* ) ] [ ; ] > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> columnDefinition ::= > > >>>>>>>>>>>>>>>>>> columnName dataType [ NOT NULL ] > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> dataType ::= > > >>>>>>>>>>>>>>>>>> { > > >>>>>>>>>>>>>>>>>> [ VARCHAR ] > > >>>>>>>>>>>>>>>>>> | [ BOOLEAN ] > > >>>>>>>>>>>>>>>>>> | [ TINYINT ] > > >>>>>>>>>>>>>>>>>> | [ SMALLINT ] > > >>>>>>>>>>>>>>>>>> | [ INT ] > > >>>>>>>>>>>>>>>>>> | [ BIGINT ] > > >>>>>>>>>>>>>>>>>> | [ FLOAT ] > > >>>>>>>>>>>>>>>>>> | [ DECIMAL ] > > >>>>>>>>>>>>>>>>>> | [ DOUBLE ] > > >>>>>>>>>>>>>>>>>> | [ DATE ] > > >>>>>>>>>>>>>>>>>> | [ TIME ] > > >>>>>>>>>>>>>>>>>> | [ TIMESTAMP ] > > >>>>>>>>>>>>>>>>>> | [ VARBINARY ] > > >>>>>>>>>>>>>>>>>> } > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> computedColumnDefinition ::= > > >>>>>>>>>>>>>>>>>> columnName AS computedColumnExpression > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> tableConstraint ::= > > >>>>>>>>>>>>>>>>>> { PRIMARY KEY | UNIQUE } > > >>>>>>>>>>>>>>>>>> (columnName [, columnName]* ) > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> tableIndex ::= > > >>>>>>>>>>>>>>>>>> [ UNIQUE ] INDEX indexName > > >>>>>>>>>>>>>>>>>> (columnName [, columnName]* ) > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> rowTimeColumn ::= > > >>>>>>>>>>>>>>>>>> columnName > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> tableOption ::= > > >>>>>>>>>>>>>>>>>> property=value > > >>>>>>>>>>>>>>>>>> offset ::= > > >>>>>>>>>>>>>>>>>> positive integer (unit: ms) > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> CREATE VIEW > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> CREATE VIEW viewName > > >>>>>>>>>>>>>>>>>> [ > > >>>>>>>>>>>>>>>>>> ( columnName [, columnName]* ) > > >>>>>>>>>>>>>>>>>> ] > > >>>>>>>>>>>>>>>>>> AS queryStatement; > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> CREATE FUNCTION > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> CREATE FUNCTION functionName > > >>>>>>>>>>>>>>>>>> AS 'className'; > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> className ::= > > >>>>>>>>>>>>>>>>>> fully qualified name > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Shuyi Chen <suez1...@gmail.com> 于2018年11月28日周三 > > >> 上午3:28写道: > > >>>>>>>>>>>>>>>>>>> Thanks a lot, Timo and Xuefu. Yes, I think we can > > >>>>>>>>>> finalize > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>> design > > >>>>>>>>>>>>>>>>> doc > > >>>>>>>>>>>>>>>>>>> first and start implementation w/o the unified > > >>>>>>>>> connector > > >>>>>>>>>>> API > > >>>>>>>>>>>>>> ready > > >>>>>>>>>>>>>>> by > > >>>>>>>>>>>>>>>>>>> skipping some featue. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Xuefu, I like the idea of making Flink specific > > >>>>>>>>>> properties > > >>>>>>>>>>>> into > > >>>>>>>>>>>>>>>> generic > > >>>>>>>>>>>>>>>>>>> key-value pairs, so that it will make integration > with > > >>>>>>>>>> Hive > > >>>>>>>>>>>> DDL > > >>>>>>>>>>>>>> (or > > >>>>>>>>>>>>>>>>>> others, > > >>>>>>>>>>>>>>>>>>> e.g. Beam DDL) easier. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> I'll run a final pass over the design doc and > finalize > > >>>>>>>>>> the > > >>>>>>>>>>>>> design > > >>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> next few days. And we can start creating tasks and > > >>>>>>>>>>>> collaborate > > >>>>>>>>>>>>> on > > >>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> implementation. Thanks a lot for all the comments and > > >>>>>>>>>>> inputs. > > >>>>>>>>>>>>>>>>>>> Cheers! > > >>>>>>>>>>>>>>>>>>> Shuyi > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> On Tue, Nov 27, 2018 at 7:02 AM Zhang, Xuefu < > > >>>>>>>>>>>>>>>> xuef...@alibaba-inc.com> > > >>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Yeah! I agree with Timo that DDL can actually > proceed > > >>>>>>>>>> w/o > > >>>>>>>>>>>>> being > > >>>>>>>>>>>>>>>>> blocked > > >>>>>>>>>>>>>>>>>>> by > > >>>>>>>>>>>>>>>>>>>> connector API. We can leave the unknown out while > > >>>>>>>>>>> defining > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>> basic > > >>>>>>>>>>>>>>>>>>> syntax. > > >>>>>>>>>>>>>>>>>>>> @Shuyi > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> As commented in the doc, I think we can probably > > >>>>>>>>> stick > > >>>>>>>>>>> with > > >>>>>>>>>>>>>>> simple > > >>>>>>>>>>>>>>>>>> syntax > > >>>>>>>>>>>>>>>>>>>> with general properties, without extending the > syntax > > >>>>>>>>>> too > > >>>>>>>>>>>>> much > > >>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>>>>> mimics the descriptor API. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Part of our effort on Flink-Hive integration is also > > >>>>>>>>> to > > >>>>>>>>>>>> make > > >>>>>>>>>>>>>> DDL > > >>>>>>>>>>>>>>>>> syntax > > >>>>>>>>>>>>>>>>>>>> compatible with Hive's. The one in the current > > >>>>>>>>> proposal > > >>>>>>>>>>>> seems > > >>>>>>>>>>>>>>>> making > > >>>>>>>>>>>>>>>>>> our > > >>>>>>>>>>>>>>>>>>>> effort more challenging. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> We can help and collaborate. At this moment, I think > > >>>>>>>>> we > > >>>>>>>>>>> can > > >>>>>>>>>>>>>>>> finalize > > >>>>>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>>>>> the proposal and then we can divide the tasks for > > >>>>>>>>>> better > > >>>>>>>>>>>>>>>>> collaboration. > > >>>>>>>>>>>>>>>>>>>> Please let me know if there are any questions or > > >>>>>>>>>>>>> suggestions. > > >>>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>>> Xuefu > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >> ------------------------------------------------------------------ > > >>>>>>>>>>>>>>>>>>>> Sender:Timo Walther <twal...@apache.org> > > >>>>>>>>>>>>>>>>>>>> Sent at:2018 Nov 27 (Tue) 16:21 > > >>>>>>>>>>>>>>>>>>>> Recipient:dev <dev@flink.apache.org> > > >>>>>>>>>>>>>>>>>>>> Subject:Re: [DISCUSS] Flink SQL DDL Design > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Thanks for offering your help here, Xuefu. It would > > >>>>>>>>> be > > >>>>>>>>>>>> great > > >>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>> move > > >>>>>>>>>>>>>>>>>>>> these efforts forward. I agree that the DDL is > > >>>>>>>>> somehow > > >>>>>>>>>>>>> releated > > >>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> unified connector API design but we can also start > > >>>>>>>>> with > > >>>>>>>>>>> the > > >>>>>>>>>>>>>> basic > > >>>>>>>>>>>>>>>>>>>> functionality now and evolve the DDL during this > > >>>>>>>>>> release > > >>>>>>>>>>>> and > > >>>>>>>>>>>>>> next > > >>>>>>>>>>>>>>>>>>> releases. > > >>>>>>>>>>>>>>>>>>>> For example, we could identify the MVP DDL syntax > > >>>>>>>>> that > > >>>>>>>>>>>> skips > > >>>>>>>>>>>>>>>> defining > > >>>>>>>>>>>>>>>>>>>> key constraints and maybe even time attributes. This > > >>>>>>>>>> DDL > > >>>>>>>>>>>>> could > > >>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>> used > > >>>>>>>>>>>>>>>>>>>> for batch usecases, ETL, and materializing SQL > > >>>>>>>>> queries > > >>>>>>>>>>> (no > > >>>>>>>>>>>>> time > > >>>>>>>>>>>>>>>>>>>> operations like windows). > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> The unified connector API is high on our priority > > >>>>>>>>> list > > >>>>>>>>>>> for > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> 1.8 > > >>>>>>>>>>>>>>>>>>>> release. I will try to update the document until mid > > >>>>>>>>> of > > >>>>>>>>>>>> next > > >>>>>>>>>>>>>>> week. > > >>>>>>>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Timo > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Am 27.11.18 um 08:08 schrieb Shuyi Chen: > > >>>>>>>>>>>>>>>>>>>>> Thanks a lot, Xuefu. I was busy for some other > > >>>>>>>>> stuff > > >>>>>>>>>>> for > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>> last 2 > > >>>>>>>>>>>>>>>>>>>> weeks, > > >>>>>>>>>>>>>>>>>>>>> but we are definitely interested in moving this > > >>>>>>>>>>> forward. > > >>>>>>>>>>>> I > > >>>>>>>>>>>>>>> think > > >>>>>>>>>>>>>>>>> once > > >>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>> unified connector API design [1] is done, we can > > >>>>>>>>>>> finalize > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> DDL > > >>>>>>>>>>>>>>>>>>> design > > >>>>>>>>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>>>>>> well and start creating concrete subtasks to > > >>>>>>>>>>> collaborate > > >>>>>>>>>>>> on > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>> implementation with the community. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Shuyi > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> [1] > > >>>>>>>>>>>>>>>>>>>>> > > >> > > > https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing > > >>>>>>>>>>>>>>>>>>>>> On Mon, Nov 26, 2018 at 7:01 PM Zhang, Xuefu < > > >>>>>>>>>>>>>>>>>> xuef...@alibaba-inc.com> > > >>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Hi Shuyi, > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> I'm wondering if you folks still have the > > >>>>>>>>> bandwidth > > >>>>>>>>>>>>> working > > >>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>> this. > > >>>>>>>>>>>>>>>>>>>>>> We have some dedicated resource and like to move > > >>>>>>>>>> this > > >>>>>>>>>>>>>> forward. > > >>>>>>>>>>>>>>>> We > > >>>>>>>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>>>>>>>> collaborate. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Xuefu > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>> ------------------------------------------------------------------ > > >>>>>>>>>>>>>>>>>>>>>> 发件人:wenlong.lwl<wenlong88....@gmail.com> > > >>>>>>>>>>>>>>>>>>>>>> 日 期:2018年11月05日 11:15:35 > > >>>>>>>>>>>>>>>>>>>>>> 收件人:<dev@flink.apache.org> > > >>>>>>>>>>>>>>>>>>>>>> 主 题:Re: [DISCUSS] Flink SQL DDL Design > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Hi, Shuyi, thanks for the proposal. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> I have two concerns about the table ddl: > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> 1. how about remove the source/sink mark from the > > >>>>>>>>>> ddl, > > >>>>>>>>>>>>>> because > > >>>>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>> necessary, the framework determine the table > > >>>>>>>>>> referred > > >>>>>>>>>>>> is a > > >>>>>>>>>>>>>>>> source > > >>>>>>>>>>>>>>>>>> or a > > >>>>>>>>>>>>>>>>>>>> sink > > >>>>>>>>>>>>>>>>>>>>>> according to the context of the query using the > > >>>>>>>>>> table. > > >>>>>>>>>>>> it > > >>>>>>>>>>>>>> will > > >>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>> more > > >>>>>>>>>>>>>>>>>>>>>> convenient for use defining a table which can be > > >>>>>>>>>> both > > >>>>>>>>>>> a > > >>>>>>>>>>>>>> source > > >>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>> sink, > > >>>>>>>>>>>>>>>>>>>>>> and more convenient for catalog to persistent and > > >>>>>>>>>>> manage > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>> meta > > >>>>>>>>>>>>>>>>>>> infos. > > >>>>>>>>>>>>>>>>>>>>>> 2. how about just keeping one pure string map as > > >>>>>>>>>>>>> parameters > > >>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>> table, > > >>>>>>>>>>>>>>>>>>>> like > > >>>>>>>>>>>>>>>>>>>>>> create tabe Kafka10SourceTable ( > > >>>>>>>>>>>>>>>>>>>>>> intField INTEGER, > > >>>>>>>>>>>>>>>>>>>>>> stringField VARCHAR(128), > > >>>>>>>>>>>>>>>>>>>>>> longField BIGINT, > > >>>>>>>>>>>>>>>>>>>>>> rowTimeField TIMESTAMP > > >>>>>>>>>>>>>>>>>>>>>> ) with ( > > >>>>>>>>>>>>>>>>>>>>>> connector.type = ’kafka’, > > >>>>>>>>>>>>>>>>>>>>>> connector.property-version = ’1’, > > >>>>>>>>>>>>>>>>>>>>>> connector.version = ’0.10’, > > >>>>>>>>>>>>>>>>>>>>>> connector.properties.topic = ‘test-kafka-topic’, > > >>>>>>>>>>>>>>>>>>>>>> connector.properties.startup-mode = > > >>>>>>>>> ‘latest-offset’, > > >>>>>>>>>>>>>>>>>>>>>> connector.properties.specific-offset = ‘offset’, > > >>>>>>>>>>>>>>>>>>>>>> format.type = 'json' > > >>>>>>>>>>>>>>>>>>>>>> format.prperties.version=’1’, > > >>>>>>>>>>>>>>>>>>>>>> format.derive-schema = 'true' > > >>>>>>>>>>>>>>>>>>>>>> ); > > >>>>>>>>>>>>>>>>>>>>>> Because: > > >>>>>>>>>>>>>>>>>>>>>> 1. in TableFactory, what user use is a string map > > >>>>>>>>>>>>>> properties, > > >>>>>>>>>>>>>>>>>> defining > > >>>>>>>>>>>>>>>>>>>>>> parameters by string-map can be the closest way to > > >>>>>>>>>>>> mapping > > >>>>>>>>>>>>>> how > > >>>>>>>>>>>>>>>>> user > > >>>>>>>>>>>>>>>>>>> use > > >>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> parameters. > > >>>>>>>>>>>>>>>>>>>>>> 2. The table descriptor can be extended by user, > > >>>>>>>>>> like > > >>>>>>>>>>>> what > > >>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>> done > > >>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>> Kafka > > >>>>>>>>>>>>>>>>>>>>>> and Json, it means that the parameter keys in > > >>>>>>>>>>> connector > > >>>>>>>>>>>> or > > >>>>>>>>>>>>>>>> format > > >>>>>>>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>>> different in different implementation, we can not > > >>>>>>>>>>>> restrict > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>> key > > >>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>> specified set, so we need a map in connector scope > > >>>>>>>>>>> and a > > >>>>>>>>>>>>> map > > >>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>>> connector.properties scope. why not just give > > >>>>>>>>> user a > > >>>>>>>>>>>>> single > > >>>>>>>>>>>>>>> map, > > >>>>>>>>>>>>>>>>> let > > >>>>>>>>>>>>>>>>>>>> them > > >>>>>>>>>>>>>>>>>>>>>> put parameters in a format they like, which is > > >>>>>>>>> also > > >>>>>>>>>>> the > > >>>>>>>>>>>>>>> simplest > > >>>>>>>>>>>>>>>>> way > > >>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>> implement DDL parser. > > >>>>>>>>>>>>>>>>>>>>>> 3. whether we can define a format clause or not, > > >>>>>>>>>>> depends > > >>>>>>>>>>>>> on > > >>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> implementation of the connector, using different > > >>>>>>>>>>> clause > > >>>>>>>>>>>> in > > >>>>>>>>>>>>>> DDL > > >>>>>>>>>>>>>>>> may > > >>>>>>>>>>>>>>>>>>> make > > >>>>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>> misunderstanding that we can combine the > > >>>>>>>>> connectors > > >>>>>>>>>>> with > > >>>>>>>>>>>>>>>> arbitrary > > >>>>>>>>>>>>>>>>>>>> formats, > > >>>>>>>>>>>>>>>>>>>>>> which may not work actually. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> On Sun, 4 Nov 2018 at 18:25, Dominik Wosiński < > > >>>>>>>>>>>>>>> wos...@gmail.com > > >>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>> +1, Thanks for the proposal. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> I guess this is a long-awaited change. This can > > >>>>>>>>>>> vastly > > >>>>>>>>>>>>>>> increase > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>> functionalities of the SQL Client as it will be > > >>>>>>>>>>>> possible > > >>>>>>>>>>>>> to > > >>>>>>>>>>>>>>> use > > >>>>>>>>>>>>>>>>>>> complex > > >>>>>>>>>>>>>>>>>>>>>>> extensions like for example those provided by > > >>>>>>>>>> Apache > > >>>>>>>>>>>>>>> Bahir[1]. > > >>>>>>>>>>>>>>>>>>>>>>> Best Regards, > > >>>>>>>>>>>>>>>>>>>>>>> Dom. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> [1] > > >>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/bahir-flink > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> sob., 3 lis 2018 o 17:17 Rong Rong < > > >>>>>>>>>>>> walter...@gmail.com> > > >>>>>>>>>>>>>>>>>> napisał(a): > > >>>>>>>>>>>>>>>>>>>>>>>> +1. Thanks for putting the proposal together > > >>>>>>>>>> Shuyi. > > >>>>>>>>>>>>>>>>>>>>>>>> DDL has been brought up in a couple of times > > >>>>>>>>>>>> previously > > >>>>>>>>>>>>>>> [1,2]. > > >>>>>>>>>>>>>>>>>>>>>> Utilizing > > >>>>>>>>>>>>>>>>>>>>>>>> DDL will definitely be a great extension to the > > >>>>>>>>>>>> current > > >>>>>>>>>>>>>>> Flink > > >>>>>>>>>>>>>>>>> SQL > > >>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>> systematically support some of the previously > > >>>>>>>>>>> brought > > >>>>>>>>>>>> up > > >>>>>>>>>>>>>>>>> features > > >>>>>>>>>>>>>>>>>>> such > > >>>>>>>>>>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>>>>>>>>> [3]. And it will also be beneficial to see the > > >>>>>>>>>>>> document > > >>>>>>>>>>>>>>>> closely > > >>>>>>>>>>>>>>>>>>>> aligned > > >>>>>>>>>>>>>>>>>>>>>>>> with the previous discussion for unified SQL > > >>>>>>>>>>> connector > > >>>>>>>>>>>>> API > > >>>>>>>>>>>>>>>> [4]. > > >>>>>>>>>>>>>>>>>>>>>>>> I also left a few comments on the doc. Looking > > >>>>>>>>>>> forward > > >>>>>>>>>>>>> to > > >>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> alignment > > >>>>>>>>>>>>>>>>>>>>>>>> with the other couple of efforts and > > >>>>>>>>> contributing > > >>>>>>>>>> to > > >>>>>>>>>>>>> them! > > >>>>>>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>>>>>> Rong > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> [1] > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >> > > > http://mail-archives.apache.org/mod_mbox/flink-dev/201805.mbox/%3CCAMZk55ZTJA7MkCK1Qu4gLPu1P9neqCfHZtTcgLfrFjfO4Xv5YQ%40mail.gmail.com%3E > > >>>>>>>>>>>>>>>>>>>>>>>> [2] > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >> > > > http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3CDC070534-0782-4AFD-8A85-8A82B384B8F7%40gmail.com%3E > > >>>>>>>>>>>>>>>>>>>>>>>> [3] > > >>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-8003 > > >>>>>>>>>>>>>>>>>>>>>>>> [4] > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >> > > > http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3c6676cb66-6f31-23e1-eff5-2e9c19f88...@apache.org%3E > > >>>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 2, 2018 at 10:22 AM Bowen Li < > > >>>>>>>>>>>>>>> bowenl...@gmail.com > > >>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>> Thanks Shuyi! > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> I left some comments there. I think the design > > >>>>>>>>> of > > >>>>>>>>>>> SQL > > >>>>>>>>>>>>> DDL > > >>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>> Flink-Hive > > >>>>>>>>>>>>>>>>>>>>>>>>> integration/External catalog enhancements will > > >>>>>>>>>> work > > >>>>>>>>>>>>>> closely > > >>>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>>> each > > >>>>>>>>>>>>>>>>>>>>>>>>> other. Hope we are well aligned on the > > >>>>>>>>> directions > > >>>>>>>>>>> of > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> two > > >>>>>>>>>>>>>>>>>>> designs, > > >>>>>>>>>>>>>>>>>>>>>>>> and I > > >>>>>>>>>>>>>>>>>>>>>>>>> look forward to working with you guys on both! > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> Bowen > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 1, 2018 at 10:57 PM Shuyi Chen < > > >>>>>>>>>>>>>>>> suez1...@gmail.com > > >>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> SQL DDL support has been a long-time ask from > > >>>>>>>>>> the > > >>>>>>>>>>>>>>> community. > > >>>>>>>>>>>>>>>>>>>>>> Current > > >>>>>>>>>>>>>>>>>>>>>>>>> Flink > > >>>>>>>>>>>>>>>>>>>>>>>>>> SQL support only DML (e.g. SELECT and INSERT > > >>>>>>>>>>>>>> statements). > > >>>>>>>>>>>>>>> In > > >>>>>>>>>>>>>>>>> its > > >>>>>>>>>>>>>>>>>>>>>>>> current > > >>>>>>>>>>>>>>>>>>>>>>>>>> form, Flink SQL users still need to > > >>>>>>>>>> define/create > > >>>>>>>>>>>>> table > > >>>>>>>>>>>>>>>>> sources > > >>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>>> sinks > > >>>>>>>>>>>>>>>>>>>>>>>>>> programmatically in Java/Scala. Also, in SQL > > >>>>>>>>>>> Client, > > >>>>>>>>>>>>>>> without > > >>>>>>>>>>>>>>>>> DDL > > >>>>>>>>>>>>>>>>>>>>>>>> support, > > >>>>>>>>>>>>>>>>>>>>>>>>>> the current implementation does not allow > > >>>>>>>>>>> dynamical > > >>>>>>>>>>>>>>> creation > > >>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>>>> table, > > >>>>>>>>>>>>>>>>>>>>>>>>> type > > >>>>>>>>>>>>>>>>>>>>>>>>>> or functions with SQL, this adds friction for > > >>>>>>>>>> its > > >>>>>>>>>>>>>>> adoption. > > >>>>>>>>>>>>>>>>>>>>>>>>>> I drafted a design doc [1] with a few other > > >>>>>>>>>>>> community > > >>>>>>>>>>>>>>>> members > > >>>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>>>>>> proposes > > >>>>>>>>>>>>>>>>>>>>>>>>>> the design and implementation for adding DDL > > >>>>>>>>>>> support > > >>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>> Flink. > > >>>>>>>>>>>>>>>>>> The > > >>>>>>>>>>>>>>>>>>>>>>>>> initial > > >>>>>>>>>>>>>>>>>>>>>>>>>> design considers DDL for table, view, type, > > >>>>>>>>>>> library > > >>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>> function. > > >>>>>>>>>>>>>>>>>>>>>> It > > >>>>>>>>>>>>>>>>>>>>>>>> will > > >>>>>>>>>>>>>>>>>>>>>>>>>> be great to get feedback on the design from > > >>>>>>>>> the > > >>>>>>>>>>>>>> community, > > >>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>> align > > >>>>>>>>>>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>>>>>>>>>> latest effort in unified SQL connector API [2] > > >>>>>>>>>> and > > >>>>>>>>>>>>> Flink > > >>>>>>>>>>>>>>>> Hive > > >>>>>>>>>>>>>>>>>>>>>>>>> integration > > >>>>>>>>>>>>>>>>>>>>>>>>>> [3]. > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> Any feedback is highly appreciated. > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks > > >>>>>>>>>>>>>>>>>>>>>>>>>> Shuyi Chen > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> [1] > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >> > > > https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit?usp=sharing > > >>>>>>>>>>>>>>>>>>>>>>>>>> [2] > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >> > > > https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing > > >>>>>>>>>>>>>>>>>>>>>>>>>> [3] > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >> > > > https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing > > >>>>>>>>>>>>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>>>>>>>>>>>> "So you have to trust that the dots will > > >>>>>>>>> somehow > > >>>>>>>>>>>>> connect > > >>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>> your > > >>>>>>>>>>>>>>>>>>>>>>>> future." > > >>>>>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>>>>> "So you have to trust that the dots will somehow > > >>>>>>>>> connect > > >>>>>>>>>> in > > >>>>>>>>>>>>> your > > >>>>>>>>>>>>>>>>> future." > > >>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>> "So you have to trust that the dots will somehow connect > in > > >>> your > > >>>>>>>>>>>> future." > > >>>>>>>>> -- > > >>>>>>>>> "So you have to trust that the dots will somehow connect in > your > > >>>>>>> future." > > >>>>>>> > > >>>> > > > > > > -- > "So you have to trust that the dots will somehow connect in your future." >