Hi Timo, I have one minor suggestion. Maybe the default data type of `timestamp` can be `TIMESTAMP(3) WITH LOCAL TIME ZONE`, because this is the type that users want to use, this can avoid unnecessary casting. Besides, currently, the bigint is casted to timestamp in seconds, so the implicit cast may not work...
I don't have other objections. But maybe we should wait for the opinion from @Kurt for the new syntax. Best, Jark On Thu, 10 Sep 2020 at 16:21, Danny Chan <yuzhao....@gmail.com> wrote: > Thanks for driving this Timo, +1 for voting ~ > > Best, > Danny Chan > 在 2020年9月10日 +0800 PM3:47,Timo Walther <twal...@apache.org>,写道: > > Thanks everyone for this healthy discussion. I updated the FLIP with the > > outcome. I think the result is very powerful but also very easy to > > declare. Thanks for all the contributions. > > > > If there are no objections, I would continue with a voting. > > > > What do you think? > > > > Regards, > > Timo > > > > > > On 09.09.20 16:52, Timo Walther wrote: > > > "If virtual by default, when a user types "timestamp int" ==> persisted > > > column, then adds a "metadata" after that ==> virtual column, then adds > > > a "persisted" after that ==> persisted column." > > > > > > Thanks for this nice mental model explanation, Jark. This makes total > > > sense to me. Also making the the most common case as short at just > > > adding `METADATA` is a very good idea. Thanks, Danny! > > > > > > Let me update the FLIP again with all these ideas. > > > > > > Regards, > > > Timo > > > > > > > > > On 09.09.20 15:03, Jark Wu wrote: > > > > I'm also +1 to Danny's proposal: timestamp INT METADATA [FROM > > > > 'my-timestamp-field'] [VIRTUAL] > > > > Especially I like the shortcut: timestamp INT METADATA, this makes > the > > > > most > > > > common case to be supported in the simplest way. > > > > > > > > I also think the default should be "PERSISTED", so VIRTUAL is > optional > > > > when > > > > you are accessing a read-only metadata. Because: > > > > 1. The "timestamp INT METADATA" should be a normal column, because > > > > "METADATA" is just a modifier to indicate it is from metadata, a > normal > > > > column should be persisted. > > > > If virtual by default, when a user types "timestamp int" ==> > > > > persisted > > > > column, then adds a "metadata" after that ==> virtual column, then > adds a > > > > "persisted" after that ==> persisted column. > > > > I think this looks reversed several times and makes users > confused. > > > > Physical fields are also prefixed with "fieldName TYPE", so > "timestamp > > > > INT > > > > METADATA" is persisted is very straightforward. > > > > 2. From the collected user question [1], we can see that "timestamp" > > > > is the > > > > most common use case. "timestamp" is a read-write metadata. > Persisted by > > > > default doesn't break the reading behavior. > > > > > > > > Best, > > > > Jark > > > > > > > > [1]: https://issues.apache.org/jira/browse/FLINK-15869 > > > > > > > > On Wed, 9 Sep 2020 at 20:56, Leonard Xu <xbjt...@gmail.com> wrote: > > > > > > > > > Thanks @Dawid for the nice summary, I think you catch all opinions > of > > > > > the > > > > > long discussion well. > > > > > > > > > > @Danny > > > > > “ timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL] > > > > > Note that the "FROM 'field name'" is only needed when the name > > > > > conflict > > > > > with the declared table column name, when there are no conflicts, > > > > > we can > > > > > simplify it to > > > > > timestamp INT METADATA" > > > > > > > > > > I really like the proposal, there is no confusion with computed > > > > > column any > > > > > more, and it’s concise enough. > > > > > > > > > > > > > > > @Timo @Dawid > > > > > “We use `SYSTEM_TIME` for temporal tables. I think prefixing with > SYSTEM > > > > > makes it clearer that it comes magically from the system.” > > > > > “As for the issue of shortening the SYSTEM_METADATA to METADATA. > Here I > > > > > very much prefer the SYSTEM_ prefix.” > > > > > > > > > > I think `SYSTEM_TIME` is different with `SYSTEM_METADATA ` a lot, > > > > > First of all, the word `TIME` has broad meanings but the word > > > > > `METADATA ` > > > > > not, `METADATA ` has specific meaning, > > > > > Secondly, `FOR SYSTEM_TIME AS OF` exists in SQL standard but > > > > > `SYSTEM_METADATA ` not. > > > > > Personally, I like more simplify way,sometimes less is more. > > > > > > > > > > > > > > > Best, > > > > > Leonard > > > > > > > > > > > > > > > > > > > > > > > > > > > Timo Walther <twal...@apache.org> 于2020年9月9日周三 下午6:41写道: > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > "key" and "value" in the properties are a special case because > they > > > > > > > need > > > > > > > to configure a format. So key and value are more than just > metadata. > > > > > > > Jark's example for setting a timestamp would work but as the > FLIP > > > > > > > discusses, we have way more metadata fields like headers, > > > > > > > epoch-leader, > > > > > > > etc. Having a property for all of this metadata would mess up > the WITH > > > > > > > section entirely. Furthermore, we also want to deal with > metadata from > > > > > > > the formats. Solving this through properties as well would > further > > > > > > > complicate the property design. > > > > > > > > > > > > > > Personally, I still like the computed column design more > because it > > > > > > > allows to have full flexibility to compute the final column: > > > > > > > > > > > > > > timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS > > > > > TIMESTAMP(3))) > > > > > > > > > > > > > > Instead of having a helper column and a real column in the > table: > > > > > > > > > > > > > > helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3)) > > > > > > > realTimestamp AS adjustTimestamp(helperTimestamp) > > > > > > > > > > > > > > But I see that the discussion leans towards: > > > > > > > > > > > > > > timestamp INT SYSTEM_METADATA("ts") > > > > > > > > > > > > > > Which is fine with me. It is the shortest solution, because we > don't > > > > > > > need additional CAST. We can discuss the syntax, so that > confusion > > > > > > > with > > > > > > > computed columns can be avoided. > > > > > > > > > > > > > > timestamp INT USING SYSTEM_METADATA("ts") > > > > > > > timestamp INT FROM SYSTEM_METADATA("ts") > > > > > > > timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED > > > > > > > > > > > > > > We use `SYSTEM_TIME` for temporal tables. I think prefixing > with > > > > > > > SYSTEM > > > > > > > makes it clearer that it comes magically from the system. > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > > > Regards, > > > > > > > Timo > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 09.09.20 11:41, Jark Wu wrote: > > > > > > > > Hi Danny, > > > > > > > > > > > > > > > > This is not Oracle and MySQL computed column syntax, because > there is > > > > > no > > > > > > > > "AS" after the type. > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > If we want to use "offset INT SYSTEM_METADATA("offset")", > then I > > > > > > > > think > > > > > we > > > > > > > > must further discuss about "PERSISED" or "VIRTUAL" keyword > for > > > > > query-sink > > > > > > > > schema problem. > > > > > > > > Personally, I think we can use a shorter keyword "METADATA" > for > > > > > > > > "SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a > system > > > > > > > function > > > > > > > > and confuse users this looks like a computed column. > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > Jark > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 9 Sep 2020 at 17:23, Danny Chan < > danny0...@apache.org> wrote: > > > > > > > > > > > > > > > > > "offset INT SYSTEM_METADATA("offset")" > > > > > > > > > > > > > > > > > > This is actually Oracle or MySQL style computed column > syntax. > > > > > > > > > > > > > > > > > > "You are right that one could argue that "timestamp", > "headers" are > > > > > > > > > something like "key" and "value"" > > > > > > > > > > > > > > > > > > I have the same feeling, both key value and headers > timestamp are > > > > > *real* > > > > > > > > > data > > > > > > > > > stored in the consumed record, they are not computed or > generated. > > > > > > > > > > > > > > > > > > "Trying to solve everything via properties sounds rather > like a hack > > > > > to > > > > > > > > > me" > > > > > > > > > > > > > > > > > > Things are not that hack if we can unify the routines or > the > > > > > definitions > > > > > > > > > (all from the computed column way or all from the table > options), i > > > > > also > > > > > > > > > think that it is a hacky that we mix in 2 kinds of syntax > for > > > > > different > > > > > > > > > kinds of metadata (read-only and read-write). In this > FLIP, we > > > > > > > > > declare > > > > > > > the > > > > > > > > > Kafka key fields with table options but SYSTEM_METADATA > for other > > > > > > > metadata, > > > > > > > > > that is a hacky thing or something in-consistent. > > > > > > > > > > > > > > > > > > Kurt Young <ykt...@gmail.com> 于2020年9月9日周三 下午4:48写道: > > > > > > > > > > > > > > > > > > > I would vote for `offset INT > SYSTEM_METADATA("offset")`. > > > > > > > > > > > > > > > > > > > > I don't think we can stick with the SQL standard in DDL > part > > > > > > > > > > forever, > > > > > > > > > > especially as there are more and more > > > > > > > > > > requirements coming from different connectors and > external systems. > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Kurt > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 4:40 PM Timo Walther < > twal...@apache.org> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Jark, > > > > > > > > > > > > > > > > > > > > > > now we are back at the original design proposed by > Dawid :D > > > > > > > > > > > Yes, we > > > > > > > > > > > should be cautious about adding new syntax. But the > length of this > > > > > > > > > > > discussion shows that we are looking for a good > long-term > > > > > > > > > > > solution. > > > > > In > > > > > > > > > > > this case I would rather vote for a deep integration > into the > > > > > syntax. > > > > > > > > > > > > > > > > > > > > > > Computed columns are also not SQL standard compliant. > And our > > > > > > > > > > > DDL is > > > > > > > > > > > neither, so we have some degree of freedom here. > > > > > > > > > > > > > > > > > > > > > > Trying to solve everything via properties sounds > rather like a > > > > > > > > > > > hack > > > > > to > > > > > > > > > > > me. You are right that one could argue that > "timestamp", "headers" > > > > > are > > > > > > > > > > > something like "key" and "value". However, mixing > > > > > > > > > > > > > > > > > > > > > > `offset AS SYSTEM_METADATA("offset")` > > > > > > > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > > > > `'timestamp.field' = 'ts'` > > > > > > > > > > > > > > > > > > > > > > looks more confusing to users that an explicit > > > > > > > > > > > > > > > > > > > > > > `offset AS CAST(SYSTEM_METADATA("offset") AS INT)` > > > > > > > > > > > > > > > > > > > > > > or > > > > > > > > > > > > > > > > > > > > > > `offset INT SYSTEM_METADATA("offset")` > > > > > > > > > > > > > > > > > > > > > > that is symetric for both source and sink. > > > > > > > > > > > > > > > > > > > > > > What do others think? > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > Timo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 09.09.20 10:09, Jark Wu wrote: > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > > > I think we have a conclusion that the writable > metadata shouldn't > > > > > be > > > > > > > > > > > > defined as a computed column, but a normal column. > > > > > > > > > > > > > > > > > > > > > > > > "timestamp STRING SYSTEM_METADATA('timestamp')" is > one of the > > > > > > > > > > approaches. > > > > > > > > > > > > However, it is not SQL standard compliant, we need > to be cautious > > > > > > > > > > enough > > > > > > > > > > > > when adding new syntax. > > > > > > > > > > > > Besides, we have to introduce the `PERSISTED` or > `VIRTUAL` > > > > > > > > > > > > keyword > > > > > to > > > > > > > > > > > > resolve the query-sink schema problem if it is > read-only > > > > > > > > > > > > metadata. > > > > > > > > > That > > > > > > > > > > > > adds more stuff to learn for users. > > > > > > > > > > > > > > > > > > > > > > > > > From my point of view, the "timestamp", "headers" > are something > > > > > like > > > > > > > > > > > "key" > > > > > > > > > > > > and "value" that stores with the real data. So why > not define the > > > > > > > > > > > > "timestamp" in the same way with "key" by using a > > > > > > > > > > > > "timestamp.field" > > > > > > > > > > > > connector option? > > > > > > > > > > > > On the other side, the read-only metadata, such as > "offset", > > > > > > > > > shouldn't > > > > > > > > > > be > > > > > > > > > > > > defined as a normal column. So why not use the > existing computed > > > > > > > > > column > > > > > > > > > > > > syntax for such metadata? Then we don't have the > query-sink > > > > > > > > > > > > schema > > > > > > > > > > > problem. > > > > > > > > > > > > So here is my proposal: > > > > > > > > > > > > > > > > > > > > > > > > CREATE TABLE kafka_table ( > > > > > > > > > > > > id BIGINT, > > > > > > > > > > > > name STRING, > > > > > > > > > > > > col1 STRING, > > > > > > > > > > > > col2 STRING, > > > > > > > > > > > > ts TIMESTAMP(3) WITH LOCAL TIME ZONE, -- ts > is a normal > > > > > field, > > > > > > > > > so > > > > > > > > > > > can > > > > > > > > > > > > be read and written. > > > > > > > > > > > > offset AS SYSTEM_METADATA("offset") > > > > > > > > > > > > ) WITH ( > > > > > > > > > > > > 'connector' = 'kafka', > > > > > > > > > > > > 'topic' = 'test-topic', > > > > > > > > > > > > 'key.fields' = 'id, name', > > > > > > > > > > > > 'key.format' = 'csv', > > > > > > > > > > > > 'value.format' = 'avro', > > > > > > > > > > > > 'timestamp.field' = 'ts' -- define the > mapping of Kafka > > > > > > > > > timestamp > > > > > > > > > > > > ); > > > > > > > > > > > > > > > > > > > > > > > > INSERT INTO kafka_table > > > > > > > > > > > > SELECT id, name, col1, col2, rowtime FROM > another_table; > > > > > > > > > > > > > > > > > > > > > > > > I think this can solve all the problems without > introducing > > > > > > > > > > > > any new > > > > > > > > > > > syntax. > > > > > > > > > > > > The only minor disadvantage is that we separate the > definition > > > > > > > > > > way/syntax > > > > > > > > > > > > of read-only metadata and read-write fields. > > > > > > > > > > > > However, I don't think this is a big problem. > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > Jark > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 9 Sep 2020 at 15:09, Timo Walther < > twal...@apache.org> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi Kurt, > > > > > > > > > > > > > > > > > > > > > > > > > > thanks for sharing your opinion. I'm totally up > for not reusing > > > > > > > > > > computed > > > > > > > > > > > > > columns. I think Jark was a big supporter of this > syntax, @Jark > > > > > are > > > > > > > > > > you > > > > > > > > > > > > > fine with this as well? The non-computed column > approach was > > > > > > > > > > > > > only > > > > > a > > > > > > > > > > > > > "slightly rejected alternative". > > > > > > > > > > > > > > > > > > > > > > > > > > Furthermore, we would need to think about how such > a new design > > > > > > > > > > > > > influences the LIKE clause though. > > > > > > > > > > > > > > > > > > > > > > > > > > However, we should still keep the `PERSISTED` > keyword as it > > > > > > > > > influences > > > > > > > > > > > > > the query->sink schema. If you look at the list of > metadata for > > > > > > > > > > existing > > > > > > > > > > > > > connectors and formats, we currently offer only > two writable > > > > > > > > > metadata > > > > > > > > > > > > > fields. Otherwise, one would need to declare two > tables > > > > > > > > > > > > > whenever a > > > > > > > > > > > > > metadata columns is read (one for the source, one > for the sink). > > > > > > > > > This > > > > > > > > > > > > > can be quite inconvientient e.g. for just reading > the topic. > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > Timo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 09.09.20 08:52, Kurt Young wrote: > > > > > > > > > > > > > > I also share the concern that reusing the > computed column > > > > > > > > > > > > > > syntax > > > > > > > > > but > > > > > > > > > > > have > > > > > > > > > > > > > > different semantics > > > > > > > > > > > > > > would confuse users a lot. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Besides, I think metadata fields are > conceptually not the same > > > > > with > > > > > > > > > > > > > > computed columns. The metadata > > > > > > > > > > > > > > field is a connector specific thing and it only > contains the > > > > > > > > > > > information > > > > > > > > > > > > > > that where does the field come > > > > > > > > > > > > > > from (during source) or where does the field > need to write to > > > > > > > > > (during > > > > > > > > > > > > > > sink). It's more similar with normal > > > > > > > > > > > > > > fields, with assumption that all these fields > need going to the > > > > > > > > > data > > > > > > > > > > > > > part. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thus I'm more lean to the rejected alternative > that Timo > > > > > mentioned. > > > > > > > > > > > And I > > > > > > > > > > > > > > think we don't need the > > > > > > > > > > > > > > PERSISTED keyword, SYSTEM_METADATA should be > enough. > > > > > > > > > > > > > > > > > > > > > > > > > > > > During implementation, the framework only needs > to pass such > > > > > > > > > <field, > > > > > > > > > > > > > > metadata field> information to the > > > > > > > > > > > > > > connector, and the logic of handling such fields > inside the > > > > > > > > > connector > > > > > > > > > > > > > > should be straightforward. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regarding the downside Timo mentioned: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The disadvantage is that users cannot call > UDFs or parse > > > > > > > > > timestamps. > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think this is fairly simple to solve. Since > the metadata > > > > > > > > > > > > > > field > > > > > > > > > > isn't > > > > > > > > > > > a > > > > > > > > > > > > > > computed column anymore, we can support > > > > > > > > > > > > > > referencing such fields in the computed column. > For example: > > > > > > > > > > > > > > > > > > > > > > > > > > > > CREATE TABLE kafka_table ( > > > > > > > > > > > > > > id BIGINT, > > > > > > > > > > > > > > name STRING, > > > > > > > > > > > > > > timestamp STRING > SYSTEM_METADATA("timestamp"), // > > > > > > > > > > > > > > get the > > > > > > > > > > > > > timestamp > > > > > > > > > > > > > > field from metadata > > > > > > > > > > > > > > ts AS to_timestamp(timestamp) // normal > computed > > > > > > > > > > > > > > column, > > > > > > > > > parse > > > > > > > > > > > the > > > > > > > > > > > > > > string to TIMESTAMP type by using the metadata > field > > > > > > > > > > > > > > ) WITH ( > > > > > > > > > > > > > > ... > > > > > > > > > > > > > > ) > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > Kurt > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Sep 8, 2020 at 11:57 PM Timo Walther > > > > > > > > > > > > > > <twal...@apache.org > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Leonard, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > the only alternative I see is that we > introduce a concept that > > > > > is > > > > > > > > > > > > > > > completely different to computed columns. This > is also > > > > > > > > > > > > > > > mentioned > > > > > > > > > in > > > > > > > > > > > the > > > > > > > > > > > > > > > rejected alternative section of the FLIP. > Something like: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > CREATE TABLE kafka_table ( > > > > > > > > > > > > > > > id BIGINT, > > > > > > > > > > > > > > > name STRING, > > > > > > > > > > > > > > > timestamp INT > SYSTEM_METADATA("timestamp") PERSISTED, > > > > > > > > > > > > > > > headers MAP<STRING, BYTES> > SYSTEM_METADATA("headers") > > > > > > > > > > PERSISTED > > > > > > > > > > > > > > > ) WITH ( > > > > > > > > > > > > > > > ... > > > > > > > > > > > > > > > ) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This way we would avoid confusion at all and > can easily map > > > > > > > > > columns > > > > > > > > > > to > > > > > > > > > > > > > > > metadata columns. The disadvantage is that > users cannot call > > > > > UDFs > > > > > > > > > or > > > > > > > > > > > > > > > parse timestamps. This would need to be done > in a real > > > > > > > > > > > > > > > computed > > > > > > > > > > > column. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm happy about better alternatives. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > Timo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 08.09.20 15:37, Leonard Xu wrote: > > > > > > > > > > > > > > > > HI, Timo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for driving this FLIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Sorry but I have a concern about Writing > metadata via > > > > > > > > > > > DynamicTableSink > > > > > > > > > > > > > > > section: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > CREATE TABLE kafka_table ( > > > > > > > > > > > > > > > > id BIGINT, > > > > > > > > > > > > > > > > name STRING, > > > > > > > > > > > > > > > > timestamp AS > CAST(SYSTEM_METADATA("timestamp") AS > > > > > > > > > > > > > > > > BIGINT) > > > > > > > > > > > > > PERSISTED, > > > > > > > > > > > > > > > > headers AS > CAST(SYSTEM_METADATA("headers") AS > > > > > > > > > > > > > > > > MAP<STRING, > > > > > > > > > > > BYTES>) > > > > > > > > > > > > > > > PERSISTED > > > > > > > > > > > > > > > > ) WITH ( > > > > > > > > > > > > > > > > ... > > > > > > > > > > > > > > > > ) > > > > > > > > > > > > > > > > An insert statement could look like: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > INSERT INTO kafka_table VALUES ( > > > > > > > > > > > > > > > > (1, "ABC", 1599133672, MAP('checksum', > > > > > > > > > computeChecksum(...))) > > > > > > > > > > > > > > > > ) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The proposed INERT syntax does not make > sense to me, > > > > > > > > > > > > > > > > because it > > > > > > > > > > > > > contains > > > > > > > > > > > > > > > computed(generated) column. > > > > > > > > > > > > > > > > Both SQL server and Postgresql do not allow > to insert > > > > > > > > > > > > > > > > value to > > > > > > > > > > > computed > > > > > > > > > > > > > > > columns even they are persisted, this boke the > generated > > > > > > > > > > > > > > > column > > > > > > > > > > > > > semantics > > > > > > > > > > > > > > > and may confuse user much. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For SQL server computed column[1]: > > > > > > > > > > > > > > > > > column_name AS computed_column_expression > [ PERSISTED [ NOT > > > > > > > > > NULL ] > > > > > > > > > > > > > ]... > > > > > > > > > > > > > > > > > NOTE: A computed column cannot be the > target of an INSERT or > > > > > > > > > > UPDATE > > > > > > > > > > > > > > > statement. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For Postgresql generated column[2]: > > > > > > > > > > > > > > > > > height_in numeric GENERATED ALWAYS AS > (height_cm / > > > > > > > > > > > > > > > > > 2.54) > > > > > > > > > > STORED > > > > > > > > > > > > > > > > > NOTE: A generated column cannot be written > to directly. In > > > > > > > > > INSERT > > > > > > > > > > or > > > > > > > > > > > > > > > UPDATE commands, a value cannot be specified > for a generated > > > > > > > > > column, > > > > > > > > > > > but > > > > > > > > > > > > > > > the keyword DEFAULT may be specified. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It shouldn't be allowed to set/update value > for generated > > > > > column > > > > > > > > > > > after > > > > > > > > > > > > > > > lookup the SQL 2016: > > > > > > > > > > > > > > > > > <insert statement> ::= > > > > > > > > > > > > > > > > > INSERT INTO <insertion target> <insert > columns and source> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > If <contextually typed table value > constructor> CTTVC is > > > > > > > > > > specified, > > > > > > > > > > > > > > > then every <contextually typed row > > > > > > > > > > > > > > > > > value constructor element> simply > contained in CTTVC whose > > > > > > > > > > > > > positionally > > > > > > > > > > > > > > > corresponding <column name> > > > > > > > > > > > > > > > > > in <insert column list> references a > column of which some > > > > > > > > > > underlying > > > > > > > > > > > > > > > column is a generated column shall > > > > > > > > > > > > > > > > > be a <default specification>. > > > > > > > > > > > > > > > > > A <default specification> specifies the > default value of > > > > > > > > > > > > > > > > > some > > > > > > > > > > > > > > > associated item. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15 > > > > > > > > > > > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [2] > > > > > > > > > > https://www.postgresql.org/docs/12/ddl-generated-columns.html > > > > > > > > > > < > > > > > > > > > > > > > > > > https://www.postgresql.org/docs/12/ddl-generated-columns.html> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2020年9月8日,17:31,Timo Walther < > twal...@apache.org> > > > > > > > > > > > > > > > > > 写道: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Jark, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > according to Flink's and Calcite's casting > definition in > > > > > [1][2] > > > > > > > > > > > > > > > TIMESTAMP WITH LOCAL TIME ZONE should be > castable from BIGINT. > > > > > If > > > > > > > > > > not, > > > > > > > > > > > > > we > > > > > > > > > > > > > > > will make it possible ;-) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm aware of > DeserializationSchema.getProducedType but I > > > > > > > > > > > > > > > > > think > > > > > > > > > > that > > > > > > > > > > > > > > > this method is actually misplaced. The type > should rather be > > > > > > > > > passed > > > > > > > > > > to > > > > > > > > > > > > > the > > > > > > > > > > > > > > > source itself. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For our Kafka SQL source, we will also not > use this method > > > > > > > > > because > > > > > > > > > > > the > > > > > > > > > > > > > > > Kafka source will add own metadata in addition > to the > > > > > > > > > > > > > > > DeserializationSchema. So > > > > > > > > > > > > > > > DeserializationSchema.getProducedType > > > > > > > > > will > > > > > > > > > > > > > never > > > > > > > > > > > > > > > be read. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For now I suggest to leave out the > `DataType` from > > > > > > > > > > > > > > > DecodingFormat.applyReadableMetadata. Also > because the > > > > > > > > > > > > > > > format's > > > > > > > > > > > physical > > > > > > > > > > > > > > > type is passed later in > `createRuntimeDecoder`. If > > > > > > > > > > > > > > > necessary, it > > > > > > > > > can > > > > > > > > > > > be > > > > > > > > > > > > > > > computed manually by consumedType + metadata > types. We will > > > > > > > > > provide > > > > > > > > > > a > > > > > > > > > > > > > > > metadata utility class for that. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > > > Timo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200 > > > > > > > > > > > > > > > > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 08.09.20 10:52, Jark Wu wrote: > > > > > > > > > > > > > > > > > > Hi Timo, > > > > > > > > > > > > > > > > > > The updated CAST SYSTEM_METADATA > behavior sounds good to > > > > > > > > > > > > > > > > > > me. > > > > > I > > > > > > > > > > just > > > > > > > > > > > > > > > noticed > > > > > > > > > > > > > > > > > > that a BIGINT can't be converted to > "TIMESTAMP(3) WITH > > > > > > > > > > > > > > > > > > LOCAL > > > > > > > > > TIME > > > > > > > > > > > > > > > ZONE". > > > > > > > > > > > > > > > > > > So maybe we need to support this, or use > "TIMESTAMP(3) WITH > > > > > > > > > LOCAL > > > > > > > > > > > > > TIME > > > > > > > > > > > > > > > > > > ZONE" as the defined type of Kafka > timestamp? I think this > > > > > > > > > makes > > > > > > > > > > > > > sense, > > > > > > > > > > > > > > > > > > because it represents the milli-seconds > since epoch. > > > > > > > > > > > > > > > > > > Regarding "DeserializationSchema doesn't > need TypeInfo", I > > > > > > > > > don't > > > > > > > > > > > > > think > > > > > > > > > > > > > > > so. > > > > > > > > > > > > > > > > > > The DeserializationSchema implements > ResultTypeQueryable, > > > > > thus > > > > > > > > > > the > > > > > > > > > > > > > > > > > > implementation needs to return an output > TypeInfo. > > > > > > > > > > > > > > > > > > Besides, FlinkKafkaConsumer also > > > > > > > > > > > > > > > > > > calls > DeserializationSchema.getProducedType as the produced > > > > > > > > > type > > > > > > > > > > of > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > source function [1]. > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > Jark > > > > > > > > > > > > > > > > > > [1]: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066 > > > > > > > > > > > > > > > > > > > > > > > On Tue, 8 Sep 2020 at 16:35, Timo > Walther < > > > > > twal...@apache.org> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I updated the FLIP again and hope that > I could address the > > > > > > > > > > > mentioned > > > > > > > > > > > > > > > > > > > concerns. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > @Leonard: Thanks for the explanation. > I wasn't aware that > > > > > > > > > ts_ms > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > source.ts_ms have different semantics. > I updated the FLIP > > > > > and > > > > > > > > > > > expose > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > most commonly used properties > separately. So frequently > > > > > > > > > > > > > > > > > > > used > > > > > > > > > > > > > > > properties > > > > > > > > > > > > > > > > > > > are not hidden in the MAP anymore: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > debezium-json.ingestion-timestamp > > > > > > > > > > > > > > > > > > > debezium-json.source.timestamp > > > > > > > > > > > > > > > > > > > debezium-json.source.database > > > > > > > > > > > > > > > > > > > debezium-json.source.schema > > > > > > > > > > > > > > > > > > > debezium-json.source.table > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > However, since other properties depend > on the used > > > > > > > > > > > connector/vendor, > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > remaining options are stored in: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > debezium-json.source.properties > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > And accessed with: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > CAST(SYSTEM_METADATA('debezium-json.source.properties') AS > > > > > > > > > > > > > MAP<STRING, > > > > > > > > > > > > > > > > > > > STRING>)['table'] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Otherwise it is not possible to figure > out the value and > > > > > > > > > column > > > > > > > > > > > type > > > > > > > > > > > > > > > > > > > during validation. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > @Jark: You convinced me in relaxing > the CAST > > > > > > > > > > > > > > > > > > > constraints. I > > > > > > > > > > added > > > > > > > > > > > a > > > > > > > > > > > > > > > > > > > dedicacated sub-section to the FLIP: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For making the use of SYSTEM_METADATA > easier and avoid > > > > > nested > > > > > > > > > > > > > casting > > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > > > > allow explicit casting to a target > data type: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > rowtime AS > CAST(SYSTEM_METADATA("timestamp") AS > > > > > > > > > > > > > > > > > > > TIMESTAMP(3) > > > > > > > > > > WITH > > > > > > > > > > > > > > > LOCAL > > > > > > > > > > > > > > > > > > > TIME ZONE) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > A connector still produces and > consumes the data type > > > > > returned > > > > > > > > > > by > > > > > > > > > > > > > > > > > > > `listMetadata()`. The planner will > insert necessary > > > > > > > > > > > > > > > > > > > explicit > > > > > > > > > > > casts. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In any case, the user must provide a > CAST such that the > > > > > > > > > computed > > > > > > > > > > > > > > > column > > > > > > > > > > > > > > > > > > > receives a valid data type when > constructing the table > > > > > schema. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > "I don't see a reason why > > > > > > > > > `DecodingFormat#applyReadableMetadata` > > > > > > > > > > > > > > > needs a > > > > > > > > > > > > > > > > > > > DataType argument." > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Correct he DeserializationSchema > doesn't need TypeInfo, it > > > > > is > > > > > > > > > > > always > > > > > > > > > > > > > > > > > > > executed locally. It is the source > that needs TypeInfo for > > > > > > > > > > > > > serializing > > > > > > > > > > > > > > > > > > > the record to the next operator. And > that's this is > > > > > > > > > > > > > > > > > > > what we > > > > > > > > > > > provide. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > @Danny: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > “SYSTEM_METADATA("offset")` returns > the NULL type by > > > > > default” > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We can also use some other means to > represent an UNKNOWN > > > > > data > > > > > > > > > > > type. > > > > > > > > > > > > > In > > > > > > > > > > > > > > > > > > > the Flink type system, we use the > NullType for it. The > > > > > > > > > important > > > > > > > > > > > > > part > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > > > that the final data type is known for > the entire computed > > > > > > > > > > column. > > > > > > > > > > > > > As I > > > > > > > > > > > > > > > > > > > mentioned before, I would avoid the > suggested option b) > > > > > > > > > > > > > > > > > > > that > > > > > > > > > > would > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > > > similar to your suggestion. The CAST > should be enough and > > > > > > > > > allows > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > complex expressions in the computed > column. Option b) > > > > > > > > > > > > > > > > > > > would > > > > > > > > > need > > > > > > > > > > > > > > > parser > > > > > > > > > > > > > > > > > > > changes. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > > > > > Timo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 08.09.20 06:21, Leonard Xu wrote: > > > > > > > > > > > > > > > > > > > > Hi, Timo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for you explanation and > update, I have only one > > > > > > > > > > question > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > the latest FLIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > About the MAP<STRING, STRING> > DataType of key > > > > > > > > > > > > > > > 'debezium-json.source', if > > > > > > > > > > > > > > > > > > > user want to use the table name > metadata, they need to > > > > > write: > > > > > > > > > > > > > > > > > > > > tableName STRING AS > > > > > > > > > CAST(SYSTEM_METADATA('debeuim-json.source') > > > > > > > > > > > AS > > > > > > > > > > > > > > > > > > > MAP<STRING, STRING>)['table'] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > the expression is a little complex > for user, Could we > > > > > > > > > > > > > > > > > > > > only > > > > > > > > > > > support > > > > > > > > > > > > > > > > > > > necessary metas with simple DataType > as following? > > > > > > > > > > > > > > > > > > > > tableName STRING AS > > > > > > > > > > > > > > > > CAST(SYSTEM_METADATA('debeuim-json.source.table') AS > > > > > > > > > > > > > > > > > > > STRING), > > > > > > > > > > > > > > > > > > > > transactionTime LONG AS > > > > > > > > > > > > > > > > > > > > CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS > > > > > BIGINT), > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In this way, we can simplify the > expression, the mainly > > > > > used > > > > > > > > > > > > > > > metadata in > > > > > > > > > > > > > > > > > > > changelog format may include > > > > > > > > > > > > > > > 'database','table','source.ts_ms','ts_ms' from > > > > > > > > > > > > > > > > > > > my side, > > > > > > > > > > > > > > > > > > > > maybe we could only support them at > first version. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Both Debezium and Canal have above > four metadata, and I‘m > > > > > > > > > > willing > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > take some subtasks in next development > if necessary. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Debezium: > > > > > > > > > > > > > > > > > > > > { > > > > > > > > > > > > > > > > > > > > "before": null, > > > > > > > > > > > > > > > > > > > > "after": { "id": 101,"name": > "scooter"}, > > > > > > > > > > > > > > > > > > > > "source": { > > > > > > > > > > > > > > > > > > > > "db": > "inventory", # 1. > > > > > > > > > > > > > > > > > > > > database > > > > > > > > > name > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > changelog belongs to. > > > > > > > > > > > > > > > > > > > > "table": > "products", # 2. > > > > > > > > > > > > > > > > > > > > table name > > > > > > > > > the > > > > > > > > > > > > > > > changelog > > > > > > > > > > > > > > > > > > > belongs to. > > > > > > > > > > > > > > > > > > > > "ts_ms": > 1589355504100, # 3. > > > > > > > > > > > > > > > > > > > > timestamp > > > > > > > of > > > > > > > > > > the > > > > > > > > > > > > > > > change > > > > > > > > > > > > > > > > > > > happened in database system, i.e.: > transaction time in > > > > > > > > > database. > > > > > > > > > > > > > > > > > > > > "connector": "mysql", > > > > > > > > > > > > > > > > > > > > …. > > > > > > > > > > > > > > > > > > > > }, > > > > > > > > > > > > > > > > > > > > "ts_ms": > 1589355606100, # 4. > > > > > > > > > > > > > > > > > > > > timestamp > > > > > > > > > when > > > > > > > > > > > the > > > > > > > > > > > > > > > debezium > > > > > > > > > > > > > > > > > > > processed the changelog. > > > > > > > > > > > > > > > > > > > > "op": "c", > > > > > > > > > > > > > > > > > > > > "transaction": null > > > > > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Canal: > > > > > > > > > > > > > > > > > > > > { > > > > > > > > > > > > > > > > > > > > "data": [{ "id": "102", > "name": "car battery" }], > > > > > > > > > > > > > > > > > > > > "database": "inventory", > # 1. database > > > > > > > > > > > > > > > > > > > > name the > > > > > > > > > > > changelog > > > > > > > > > > > > > > > > > > > belongs to. > > > > > > > > > > > > > > > > > > > > "table": "products", > # 2. table name the > > > > > > > > > > changelog > > > > > > > > > > > > > > > belongs > > > > > > > > > > > > > > > > > > > to. > > > > > > > > > > > > > > > > > > > > "es": 1589374013000, > # 3. execution > > > > > > > > > > > > > > > > > > > > time of > > > > > > > > > the > > > > > > > > > > > > > change > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > > > database system, i.e.: transaction > time in database. > > > > > > > > > > > > > > > > > > > > "ts": 1589374013680, > # 4. timestamp > > > > > > > > > > > > > > > > > > > > when the > > > > > > > > > > > cannal > > > > > > > > > > > > > > > > > > > processed the changelog. > > > > > > > > > > > > > > > > > > > > "isDdl": false, > > > > > > > > > > > > > > > > > > > > "mysqlType": {}, > > > > > > > > > > > > > > > > > > > > .... > > > > > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best > > > > > > > > > > > > > > > > > > > > Leonard > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2020年9月8日,11:57,Danny Chan > > > > > > > > > > > > > > > > > > > > > <yuzhao....@gmail.com> 写道: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Timo ~ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The FLIP was already in pretty > good shape, I have only 2 > > > > > > > > > > > questions > > > > > > > > > > > > > > > here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. > “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a > > > > > > > > > valid > > > > > > > > > > > > > > > read-only > > > > > > > > > > > > > > > > > > > computed column for Kafka and can be > extracted by the > > > > > > > > > planner.” > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > What is the pros we follow the > SQL-SERVER syntax here ? > > > > > > > > > > Usually > > > > > > > > > > > an > > > > > > > > > > > > > > > > > > > expression return type can be inferred > automatically. > > > > > > > > > > > > > > > > > > > But I > > > > > > > > > > guess > > > > > > > > > > > > > > > > > > > SQL-SERVER does not have function like > SYSTEM_METADATA > > > > > > > > > > > > > > > > > > > which > > > > > > > > > > > > > actually > > > > > > > > > > > > > > > does > > > > > > > > > > > > > > > > > > > not have a specific return type. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > And why not use the Oracle or > MySQL syntax there ? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > column_name [datatype] [GENERATED > ALWAYS] AS > > > > > > > > > > > > > > > > > > > > > (expression) > > > > > > > > > > > > > [VIRTUAL] > > > > > > > > > > > > > > > > > > > > > Which is more straight-forward. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. “SYSTEM_METADATA("offset")` > returns the NULL type by > > > > > > > > > > default” > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The default type should not be > NULL because only NULL > > > > > > > > > literal > > > > > > > > > > > does > > > > > > > > > > > > > > > > > > > that. Usually we use ANY as the type > if we do not know the > > > > > > > > > > > specific > > > > > > > > > > > > > > > type in > > > > > > > > > > > > > > > > > > > the SQL context. ANY means the > physical value can be any > > > > > java > > > > > > > > > > > > > object. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > https://oracle-base.com/articles/11g/virtual-columns-11gr1 > > > > > > > > > > > > > > > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > Danny Chan > > > > > > > > > > > > > > > > > > > > > 在 2020年9月4日 +0800 PM4:48,Timo > Walther > > > > > > > > > > > > > > > > > > > > > <twal...@apache.org > > > > > > > > > > > ,写道: > > > > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I completely reworked FLIP-107. > It now covers the full > > > > > > > > > story > > > > > > > > > > > how > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > read > > > > > > > > > > > > > > > > > > > > > > and write metadata from/to > connectors and formats. It > > > > > > > > > > considers > > > > > > > > > > > > > > > all of > > > > > > > > > > > > > > > > > > > > > > the latest FLIPs, namely > FLIP-95, FLIP-132 and > > > > > > > > > > > > > > > > > > > > > > FLIP-122. > > > > > It > > > > > > > > > > > > > > > introduces > > > > > > > > > > > > > > > > > > > > > > the concept of PERSISTED > computed columns and leaves > > > > > > > > > > > > > > > > > > > > > > out > > > > > > > > > > > > > > > partitioning > > > > > > > > > > > > > > > > > > > > > > for now. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Looking forward to your feedback. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > > > > > > > > Timo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 04.03.20 09:45, Kurt Young > wrote: > > > > > > > > > > > > > > > > > > > > > > > Sorry, forgot one question. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 4. Can we make the > value.fields-include more > > > > > > > > > > > > > > > > > > > > > > > orthogonal? > > > > > > > > > > Like > > > > > > > > > > > > > one > > > > > > > > > > > > > > > can > > > > > > > > > > > > > > > > > > > > > > > specify it as "EXCEPT_KEY, > EXCEPT_TIMESTAMP". > > > > > > > > > > > > > > > > > > > > > > > With current EXCEPT_KEY and > EXCEPT_KEY_TIMESTAMP, > > > > > > > > > > > > > > > > > > > > > > > users > > > > > > > > > can > > > > > > > > > > > not > > > > > > > > > > > > > > > > > > > config to > > > > > > > > > > > > > > > > > > > > > > > just ignore timestamp but keep > key. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > > Kurt > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 4, 2020 at 4:42 PM > Kurt Young < > > > > > > > > > ykt...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Dawid, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I have a couple of questions > around key fields, > > > > > actually > > > > > > > > > I > > > > > > > > > > > also > > > > > > > > > > > > > > > have > > > > > > > > > > > > > > > > > > > some > > > > > > > > > > > > > > > > > > > > > > > > other questions but want to > be focused on key fields > > > > > > > > > first. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. I don't fully understand > the usage of > > > > > > > > > > > > > > > > > > > > > > > > "key.fields". > > > > > Is > > > > > > > > > > > this > > > > > > > > > > > > > > > > > > > option only > > > > > > > > > > > > > > > > > > > > > > > > valid during write > operation? Because for > > > > > > > > > > > > > > > > > > > > > > > > reading, I can't imagine how > such options can be > > > > > > > > > applied. I > > > > > > > > > > > > > would > > > > > > > > > > > > > > > > > > > expect > > > > > > > > > > > > > > > > > > > > > > > > that there might be a > SYSTEM_METADATA("key") > > > > > > > > > > > > > > > > > > > > > > > > to read and assign the key > to a normal field? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. If "key.fields" is only > valid in write > > > > > > > > > > > > > > > > > > > > > > > > operation, I > > > > > > > > > want > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > propose we > > > > > > > > > > > > > > > > > > > > > > > > can simplify the options to > not introducing > > > > > > > > > key.format.type > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > > > > > > other related options. I > think a single "key.field" > > > > > (not > > > > > > > > > > > > > fields) > > > > > > > > > > > > > > > > > > > would be > > > > > > > > > > > > > > > > > > > > > > > > enough, users can use UDF to > calculate whatever key > > > > > they > > > > > > > > > > > > > > > > > > > > > > > > want before sink. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. Also I don't want to > introduce "value.format.type" > > > > > and > > > > > > > > > > > > > > > > > > > > > > > > "value.format.xxx" with the > "value" prefix. Not every > > > > > > > > > > > connector > > > > > > > > > > > > > > > has a > > > > > > > > > > > > > > > > > > > > > > > > concept > > > > > > > > > > > > > > > > > > > > > > > > of key and values. The old > parameter "format.type" > > > > > > > > > already > > > > > > > > > > > good > > > > > > > > > > > > > > > > > > > enough to > > > > > > > > > > > > > > > > > > > > > > > > use. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > > > Kurt > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 3, 2020 at 10:40 > PM Jark Wu < > > > > > > > > > imj...@gmail.com> > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Dawid, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I have two more questions. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > SupportsMetadata > > > > > > > > > > > > > > > > > > > > > > > > > Introducing > SupportsMetadata sounds good to me. > > > > > > > > > > > > > > > > > > > > > > > > > But I > > > > > > > > > have > > > > > > > > > > > > > some > > > > > > > > > > > > > > > > > > > questions > > > > > > > > > > > > > > > > > > > > > > > > > regarding to this > interface. > > > > > > > > > > > > > > > > > > > > > > > > > 1) How do the source know > what the expected return > > > > > type > > > > > > > > > of > > > > > > > > > > > > > each > > > > > > > > > > > > > > > > > > > metadata? > > > > > > > > > > > > > > > > > > > > > > > > > 2) Where to put the > metadata fields? Append to the > > > > > > > > > > existing > > > > > > > > > > > > > > > physical > > > > > > > > > > > > > > > > > > > > > > > > > fields? > > > > > > > > > > > > > > > > > > > > > > > > > If yes, I would suggest to > change the signature to > > > > > > > > > > > > > `TableSource > > > > > > > > > > > > > > > > > > > > > > > > > > appendMetadataFields(String[] metadataNames, > > > > > DataType[] > > > > > > > > > > > > > > > > > > > metadataTypes)` > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > SYSTEM_METADATA("partition") > > > > > > > > > > > > > > > > > > > > > > > > > Can SYSTEM_METADATA() > function be used nested in a > > > > > > > > > > computed > > > > > > > > > > > > > > > column > > > > > > > > > > > > > > > > > > > > > > > > > expression? If yes, how to > specify the return > > > > > > > > > > > > > > > > > > > > > > > > > type of > > > > > > > > > > > > > > > > > > > SYSTEM_METADATA? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > > > > Jark > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, 3 Mar 2020 at > 17:06, Dawid Wysakowicz < > > > > > > > > > > > > > > > > > > > dwysakow...@apache.org> > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. I thought a bit more > on how the source would > > > > > > > > > > > > > > > > > > > > > > > > > > emit > > > > > > > > > the > > > > > > > > > > > > > > > columns > > > > > > > > > > > > > > > > > > > and I > > > > > > > > > > > > > > > > > > > > > > > > > > now see its not exactly > the same as regular > > > > > > > > > > > > > > > > > > > > > > > > > > columns. > > > > > I > > > > > > > > > > see > > > > > > > > > > > a > > > > > > > > > > > > > > > need > > > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > > > > > > > elaborate a bit more on > that in the FLIP as you > > > > > asked, > > > > > > > > > > > Jark. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I do agree mostly with > Danny on how we should do > > > > > that. > > > > > > > > > > One > > > > > > > > > > > > > > > > > > > additional > > > > > > > > > > > > > > > > > > > > > > > > > > things I would introduce > is an > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > interface > SupportsMetadata { > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > boolean > supportsMetadata(Set<String> > > > > > > > > > > > > > > > > > > > > > > > > > > metadataFields); > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > TableSource > generateMetadataFields(Set<String> > > > > > > > > > > > > > metadataFields); > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This way the source > would have to declare/emit only > > > > > the > > > > > > > > > > > > > > > requested > > > > > > > > > > > > > > > > > > > > > > > > > > metadata fields. In > order not to clash with user > > > > > > > > > defined > > > > > > > > > > > > > > > fields. > > > > > > > > > > > > > > > > > > > When > > > > > > > > > > > > > > > > > > > > > > > > > > emitting the metadata > field I would prepend the > > > > > column > > > > > > > > > > name > > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > > > > > > > > > > > > __system_{property_name}. Therefore when requested > > > > > > > > > > > > > > > > > > > > > > > > > > > SYSTEM_METADATA("partition") the source would > > > > > > > > > > > > > > > > > > > > > > > > > > append > > > > > a > > > > > > > > > > > field > > > > > > > > > > > > > > > > > > > > > > > > > > __system_partition to > the schema. This would be > > > > > > > > > > > > > > > > > > > > > > > > > > never > > > > > > > > > > > visible > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > > user as it would be used > only for the subsequent > > > > > > > > > computed > > > > > > > > > > > > > > > columns. > > > > > > > > > > > > > > > > > > > If > > > > > > > > > > > > > > > > > > > > > > > > > > that makes sense to you, > I will update the FLIP > > > > > > > > > > > > > > > > > > > > > > > > > > with > > > > > > > > > this > > > > > > > > > > > > > > > > > > > description. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. CAST vs explicit type > in computed columns > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Here I agree with Danny. > It is also the current > > > > > > > > > > > > > > > > > > > > > > > > > > state > > > > > > > > > of > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > proposal. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. Partitioning on > computed column vs function > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Here I also agree with > Danny. I also think those > > > > > > > > > > > > > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > orthogonal. I > > > > > > > > > > > > > > > > > > > would > > > > > > > > > > > > > > > > > > > > > > > > > > leave out the STORED > computed columns out of the > > > > > > > > > > > discussion. > > > > > > > > > > > > > I > > > > > > > > > > > > > > > > > > > don't see > > > > > > > > > > > > > > > > > > > > > > > > > > how do they relate to > the partitioning. I > > > > > > > > > > > > > > > > > > > > > > > > > > already put > > > > > > > > > > both > > > > > > > > > > > of > > > > > > > > > > > > > > > those > > > > > > > > > > > > > > > > > > > > > > > > > > cases in the document. > We can either partition on a > > > > > > > > > > > computed > > > > > > > > > > > > > > > > > > > column or > > > > > > > > > > > > > > > > > > > > > > > > > > use a udf in a partioned > by clause. I am fine with > > > > > > > > > > leaving > > > > > > > > > > > > > out > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > > partitioning by udf in > the first version if you > > > > > > > > > > > > > > > > > > > > > > > > > > still > > > > > > > > > > have > > > > > > > > > > > > > some > > > > > > > > > > > > > > > > > > > > > > > > > concerns. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > As for your question > Danny. It depends which > > > > > > > > > partitioning > > > > > > > > > > > > > > > strategy > > > > > > > > > > > > > > > > > > > you > > > > > > > > > > > > > > > > > > > > > > > > > use. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For the HASH > partitioning strategy I thought it > > > > > > > > > > > > > > > > > > > > > > > > > > would > > > > > > > > > > work > > > > > > > > > > > as > > > > > > > > > > > > > > > you > > > > > > > > > > > > > > > > > > > > > > > > > > explained. It would be N > = MOD(expr, num). I am not > > > > > > > > > sure > > > > > > > > > > > > > > > though if > > > > > > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > > > > > > > > > > > should introduce the > PARTITIONS clause. Usually > > > > > > > > > > > > > > > > > > > > > > > > > > Flink > > > > > > > > > > does > > > > > > > > > > > > > not > > > > > > > > > > > > > > > own > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > > data and the partitions > are already an intrinsic > > > > > > > > > property > > > > > > > > > > > of > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > > underlying source e.g. > for kafka we do not create > > > > > > > > > topics, > > > > > > > > > > > but > > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > > > > just > > > > > > > > > > > > > > > > > > > > > > > > > > describe pre-existing > pre-partitioned topic. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 4. timestamp vs > timestamp.field vs > > > > > > > > > > > > > > > > > > > > > > > > > > connector.field vs > > > > > > > > > ... > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am fine with changing > it to timestamp.field to be > > > > > > > > > > > > > consistent > > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > > > > > > > > > > > other value.fields and > key.fields. Actually that > > > > > > > > > > > > > > > > > > > > > > > > > > was > > > > > > > > > also > > > > > > > > > > > my > > > > > > > > > > > > > > > > > > > initial > > > > > > > > > > > > > > > > > > > > > > > > > > proposal in a first > draft I prepared. I changed it > > > > > > > > > > > afterwards > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > shorten > > > > > > > > > > > > > > > > > > > > > > > > > > the key. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Dawid > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 03/03/2020 09:00, > Danny Chan wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Dawid for > bringing up this discussion, I > > > > > think > > > > > > > > > it > > > > > > > > > > > is > > > > > > > > > > > > > a > > > > > > > > > > > > > > > > > > > useful > > > > > > > > > > > > > > > > > > > > > > > > > > feature ~ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > About how the metadata > outputs from source > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think it is > completely orthogonal, computed > > > > > > > > > > > > > > > > > > > > > > > > > > > column > > > > > > > > > > push > > > > > > > > > > > > > > > down is > > > > > > > > > > > > > > > > > > > > > > > > > > another topic, this > should not be a blocker but a > > > > > > > > > > > promotion, > > > > > > > > > > > > > > > if we > > > > > > > > > > > > > > > > > > > do > > > > > > > > > > > > > > > > > > > > > > > > > not > > > > > > > > > > > > > > > > > > > > > > > > > > have any filters on the > computed column, there > > > > > > > > > > > > > > > > > > > > > > > > > > is no > > > > > > > > > need > > > > > > > > > > > to > > > > > > > > > > > > > > > do any > > > > > > > > > > > > > > > > > > > > > > > > > > pushings; the source > node just emit the complete > > > > > record > > > > > > > > > > > with > > > > > > > > > > > > > > > full > > > > > > > > > > > > > > > > > > > > > > > > > metadata > > > > > > > > > > > > > > > > > > > > > > > > > > with the declared > physical schema, then when > > > > > generating > > > > > > > > > > the > > > > > > > > > > > > > > > virtual > > > > > > > > > > > > > > > > > > > > > > > > > > columns, we would > extract the metadata info and > > > > > output > > > > > > > > > as > > > > > > > > > > > > > full > > > > > > > > > > > > > > > > > > > > > > > > > columns(with > > > > > > > > > > > > > > > > > > > > > > > > > > full schema). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > About the type of > metadata column > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Personally i prefer > explicit type instead of CAST, > > > > > > > > > they > > > > > > > > > > > are > > > > > > > > > > > > > > > > > > > symantic > > > > > > > > > > > > > > > > > > > > > > > > > > equivalent though, > explict type is more > > > > > > > > > straight-forward > > > > > > > > > > > and > > > > > > > > > > > > > > > we can > > > > > > > > > > > > > > > > > > > > > > > > > declare > > > > > > > > > > > > > > > > > > > > > > > > > > the nullable attribute > there. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > About option A: > partitioning based on acomputed > > > > > column > > > > > > > > > > VS > > > > > > > > > > > > > > > option > > > > > > > > > > > > > > > > > > > B: > > > > > > > > > > > > > > > > > > > > > > > > > > partitioning with just a > function > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > From the FLIP, it > seems that B's > > > > > > > > > > > > > > > > > > > > > > > > > > > partitioning is > > > > > > > > > > just > > > > > > > > > > > a > > > > > > > > > > > > > > > strategy > > > > > > > > > > > > > > > > > > > when > > > > > > > > > > > > > > > > > > > > > > > > > > writing data, the > partiton column is not > > > > > > > > > > > > > > > > > > > > > > > > > > included in > > > > > > > > > the > > > > > > > > > > > > > table > > > > > > > > > > > > > > > > > > > schema, > > > > > > > > > > > > > > > > > > > > > > > > > so > > > > > > > > > > > > > > > > > > > > > > > > > > it's just useless when > reading from that. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > - Compared to A, we do > not need to generate the > > > > > > > > > > partition > > > > > > > > > > > > > > > column > > > > > > > > > > > > > > > > > > > when > > > > > > > > > > > > > > > > > > > > > > > > > > selecting from the > table(but insert into) > > > > > > > > > > > > > > > > > > > > > > > > > > > - For A we can also > mark the column as STORED when > > > > > we > > > > > > > > > > want > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > persist > > > > > > > > > > > > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > So in my opition they > are orthogonal, we can > > > > > > > > > > > > > > > > > > > > > > > > > > > support > > > > > > > > > > > both, i > > > > > > > > > > > > > > > saw > > > > > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > > > > > > > > > MySQL/Oracle[1][2] would > suggest to also define the > > > > > > > > > > > > > PARTITIONS > > > > > > > > > > > > > > > > > > > num, and > > > > > > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > > partitions are managed > under a "tablenamespace", > > > > > > > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > partition > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > > > which > > > > > > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > > record is stored is > partition number N, where N = > > > > > > > > > > MOD(expr, > > > > > > > > > > > > > > > num), > > > > > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > > > > > > > your > > > > > > > > > > > > > > > > > > > > > > > > > > design, which partiton > the record would persist ? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html > > > > > > > > > > > > > > > > > > > > > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > > > > > > Danny Chan > > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2020年3月2日 +0800 > PM6:16,Dawid Wysakowicz < > > > > > > > > > > > > > > > dwysakow...@apache.org > > > > > > > > > > > > > > > > > > > > > > > > > > ,写道: > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Jark, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ad. 2 I added a > section to discuss relation to > > > > > > > > > FLIP-63 > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ad. 3 Yes, I also > tried to somewhat keep > > > > > > > > > > > > > > > > > > > > > > > > > > > > hierarchy > > > > > of > > > > > > > > > > > > > > > properties. > > > > > > > > > > > > > > > > > > > > > > > > > > Therefore you have the > key.format.type. > > > > > > > > > > > > > > > > > > > > > > > > > > > > I also considered > exactly what you are suggesting > > > > > > > > > > > > > (prefixing > > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > > > > > > > > > > > connector or kafka). I > should've put that into an > > > > > > > > > > > > > > > Option/Rejected > > > > > > > > > > > > > > > > > > > > > > > > > > alternatives. > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree timestamp, > key.*, value.* are connector > > > > > > > > > > > properties. > > > > > > > > > > > > > > > Why I > > > > > > > > > > > > > > > > > > > > > > > > > > wanted to suggest not > adding that prefix in the > > > > > > > > > > > > > > > > > > > > > > > > > > first > > > > > > > > > > > version > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > > > > > > > > > actually all the > properties in the WITH section are > > > > > > > > > > > connector > > > > > > > > > > > > > > > > > > > > > > > > > properties. > > > > > > > > > > > > > > > > > > > > > > > > > > Even format is in the > end a connector property as > > > > > some > > > > > > > > > of > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > sources > > > > > > > > > > > > > > > > > > > > > > > > > might > > > > > > > > > > > > > > > > > > > > > > > > > > not have a format, imo. > The benefit of not > > > > > > > > > > > > > > > > > > > > > > > > > > adding the > > > > > > > > > > > prefix > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > > > that it > > > > > > > > > > > > > > > > > > > > > > > > > > makes the keys a bit > shorter. Imagine prefixing all > > > > > the > > > > > > > > > > > > > > > properties > > > > > > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > > > > > > > > > > > connector (or if we go > with FLINK-12557: > > > > > > > > > elasticsearch): > > > > > > > > > > > > > > > > > > > > > > > > > > > > > elasticsearch.key.format.type: csv > > > > > > > > > > > > > > > > > > > > > > > > > > > > > elasticsearch.key.format.field: .... > > > > > > > > > > > > > > > > > > > > > > > > > > > > > elasticsearch.key.format.delimiter: .... > > > > > > > > > > > > > > > > > > > > > > > > > > > > > elasticsearch.key.format.*: .... > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am fine with doing > it though if this is a > > > > > preferred > > > > > > > > > > > > > > > approach > > > > > > > > > > > > > > > > > > > in the > > > > > > > > > > > > > > > > > > > > > > > > > > community. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ad in-line comments: > > > > > > > > > > > > > > > > > > > > > > > > > > > > I forgot to update > the `value.fields.include` > > > > > > > > > property. > > > > > > > > > > > It > > > > > > > > > > > > > > > > > > > should be > > > > > > > > > > > > > > > > > > > > > > > > > > value.fields-include. > Which I think you also > > > > > suggested > > > > > > > > > in > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > comment, > > > > > > > > > > > > > > > > > > > > > > > > > > right? > > > > > > > > > > > > > > > > > > > > > > > > > > > > As for the cast vs > declaring output type of > > > > > computed > > > > > > > > > > > > > column. > > > > > > > > > > > > > > > I > > > > > > > > > > > > > > > > > > > think > > > > > > > > > > > > > > > > > > > > > > > > > > it's better not to use > CAST, but declare a type > > > > > > > > > > > > > > > > > > > > > > > > > > of an > > > > > > > > > > > > > > > expression > > > > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > > > > > > > later > > > > > > > > > > > > > > > > > > > > > > > > > > on infer the output type > of SYSTEM_METADATA. The > > > > > reason > > > > > > > > > > is > > > > > > > > > > > I > > > > > > > > > > > > > > > think > > > > > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > > > > > > > > > way > > > > > > > > > > > > > > > > > > > > > > > > > > it will be easier to > implement e.g. filter push > > > > > > > > > > > > > > > > > > > > > > > > > > downs > > > > > > > > > > when > > > > > > > > > > > > > > > working > > > > > > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > > native types of the > source, e.g. in case of Kafka's > > > > > > > > > > > offset, i > > > > > > > > > > > > > > > > > > > think it's > > > > > > > > > > > > > > > > > > > > > > > > > > better to pushdown long > rather than string. This > > > > > could > > > > > > > > > > let > > > > > > > > > > > us > > > > > > > > > > > > > > > push > > > > > > > > > > > > > > > > > > > > > > > > > > expression like e.g. > offset > 12345 & offset < > > > > > > > > > > > > > > > > > > > > > > > > > > 59382. > > > > > > > > > > > > > > > Otherwise we > > > > > > > > > > > > > > > > > > > would > > > > > > > > > > > > > > > > > > > > > > > > > > have to push down > cast(offset, long) > 12345 && > > > > > > > > > > > cast(offset, > > > > > > > > > > > > > > > long) > > > > > > > > > > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > 59382. > > > > > > > > > > > > > > > > > > > > > > > > > > Moreover I think we need > to introduce the type for > > > > > > > > > > computed > > > > > > > > > > > > > > > columns > > > > > > > > > > > > > > > > > > > > > > > > > anyway > > > > > > > > > > > > > > > > > > > > > > > > > > to support functions > that infer output type > > > > > > > > > > > > > > > > > > > > > > > > > > based on > > > > > > > > > > > expected > > > > > > > > > > > > > > > > > > > return > > > > > > > > > > > > > > > > > > > > > > > > > type. > > > > > > > > > > > > > > > > > > > > > > > > > > > > As for the computed > column push down. Yes, > > > > > > > > > > > SYSTEM_METADATA > > > > > > > > > > > > > > > would > > > > > > > > > > > > > > > > > > > have > > > > > > > > > > > > > > > > > > > > > > > > > > to be pushed down to the > source. If it is not > > > > > possible > > > > > > > > > > the > > > > > > > > > > > > > > > planner > > > > > > > > > > > > > > > > > > > > > > > > > should > > > > > > > > > > > > > > > > > > > > > > > > > > fail. As far as I know > computed columns push down > > > > > will > > > > > > > > > be > > > > > > > > > > > > > part > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > > > source > > > > > > > > > > > > > > > > > > > > > > > > > > rework, won't it? ;) > > > > > > > > > > > > > > > > > > > > > > > > > > > > As for the persisted > computed column. I think > > > > > > > > > > > > > > > > > > > > > > > > > > > > it is > > > > > > > > > > > > > > > completely > > > > > > > > > > > > > > > > > > > > > > > > > > orthogonal. In my > current proposal you can also > > > > > > > > > partition > > > > > > > > > > > by > > > > > > > > > > > > > a > > > > > > > > > > > > > > > > > > > computed > > > > > > > > > > > > > > > > > > > > > > > > > > column. The difference > between using a udf in > > > > > > > > > partitioned > > > > > > > > > > > by > > > > > > > > > > > > > vs > > > > > > > > > > > > > > > > > > > > > > > > > partitioned > > > > > > > > > > > > > > > > > > > > > > > > > > by a computed column is > that when you partition > > > > > > > > > > > > > > > > > > > > > > > > > > by a > > > > > > > > > > > computed > > > > > > > > > > > > > > > > > > > column > > > > > > > > > > > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > > > > > > > > > > column must be also > computed when reading the > > > > > > > > > > > > > > > > > > > > > > > > > > table. > > > > > If > > > > > > > > > > you > > > > > > > > > > > > > > > use a > > > > > > > > > > > > > > > > > > > udf in > > > > > > > > > > > > > > > > > > > > > > > > > > the partitioned by, the > expression is computed only > > > > > > > > > when > > > > > > > > > > > > > > > inserting > > > > > > > > > > > > > > > > > > > into > > > > > > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > > table. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hope this answers > some of your questions. Looking > > > > > > > > > > forward > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > further > > > > > > > > > > > > > > > > > > > > > > > > > > suggestions. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Dawid > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 02/03/2020 05:18, > Jark Wu wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Dawid for > starting such a great > > > > > > > > > > > > > > > > > > > > > > > > > > > > > discussion. > > > > > > > > > > > Reaing > > > > > > > > > > > > > > > > > > > metadata > > > > > > > > > > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > > > > > > > > > > > key-part > information from source is an important > > > > > > > > > > feature > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > > > > > > > streaming > > > > > > > > > > > > > > > > > > > > > > > > > > > > > users. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In general, I > agree with the proposal of the > > > > > > > > > > > > > > > > > > > > > > > > > > > > > FLIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I will leave my > thoughts and comments here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1) +1 to use > connector properties instead of > > > > > > > > > > introducing > > > > > > > > > > > > > > > HEADER > > > > > > > > > > > > > > > > > > > > > > > > > > keyword as > > > > > > > > > > > > > > > > > > > > > > > > > > > > > the reason you > mentioned in the FLIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2) we already > introduced PARTITIONED BY in > > > > > FLIP-63. > > > > > > > > > > > Maybe > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > > > > should > > > > > > > > > > > > > > > > > > > > > > > > > > add a > > > > > > > > > > > > > > > > > > > > > > > > > > > > > section to explain > what's the relationship > > > > > > > > > > > > > > > > > > > > > > > > > > > > > between > > > > > > > > > > them. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Do their concepts > conflict? Could INSERT > > > > > > > > > > > > > > > > > > > > > > > > > > > > > PARTITION > > > > > > > > > be > > > > > > > > > > > used > > > > > > > > > > > > > > > on > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > > > > > PARTITIONED table > in this FLIP? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3) Currently, > properties are hierarchical in > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Flink > > > > > > > > > > SQL. > > > > > > > > > > > > > > > Shall we > > > > > > > > > > > > > > > > > > > > > > > > > make > > > > > > > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > > > > > new introduced > properties more hierarchical? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For example, > "timestamp" => > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > "connector.timestamp"? > > > > > > > > > > > > > > > (actually, I > > > > > > > > > > > > > > > > > > > > > > > > > > prefer > > > > > > > > > > > > > > > > > > > > > > > > > > > > > "kafka.timestamp" > which is another > > > > > > > > > > > > > > > > > > > > > > > > > > > > > improvement for > > > > > > > > > > > > > > > properties > > > > > > > > > > > > > > > > > > > > > > > > > > FLINK-12557) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > A single > "timestamp" in properties may mislead > > > > > users > > > > > > > > > > > that > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > field > > > > > > > > > > > > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > > > > > > > > > > > > > a rowtime > attribute. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I also left some > minor comments in the FLIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jark > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sun, 1 Mar 2020 > at 22:30, Dawid Wysakowicz < > > > > > > > > > > > > > > > > > > > > > > > > > dwysakow...@apache.org> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I would like to > propose an improvement that > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > would > > > > > > > > > > > enable > > > > > > > > > > > > > > > > > > > reading > > > > > > > > > > > > > > > > > > > > > > > > > table > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > columns from > different parts of source records. > > > > > > > > > > Besides > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > main > > > > > > > > > > > > > > > > > > > > > > > > > > payload > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > majority (if not > all of the sources) expose > > > > > > > > > > additional > > > > > > > > > > > > > > > > > > > > > > > > > information. It > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > can be simply a > read-only metadata such as > > > > > offset, > > > > > > > > > > > > > > > ingestion > > > > > > > > > > > > > > > > > > > time > > > > > > > > > > > > > > > > > > > > > > > > > or a > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > read and write > parts of the record that contain > > > > > > > > > data > > > > > > > > > > > but > > > > > > > > > > > > > > > > > > > > > > > > > additionally > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > serve different > purposes (partitioning, > > > > > compaction > > > > > > > > > > > etc.), > > > > > > > > > > > > > > > e.g. > > > > > > > > > > > > > > > > > > > key > > > > > > > > > > > > > > > > > > > > > > > > > or > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > timestamp in > Kafka. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We should make > it possible to read and write > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > data > > > > > > > > > > from > > > > > > > > > > > > > all > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > > > those > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > locations. In > this proposal I discuss reading > > > > > > > > > > > > > partitioning > > > > > > > > > > > > > > > > > > > data, > > > > > > > > > > > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > completeness > this proposal discusses also the > > > > > > > > > > > > > partitioning > > > > > > > > > > > > > > > when > > > > > > > > > > > > > > > > > > > > > > > > > > writing > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > data out. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am looking > forward to your comments. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > You can access > the FLIP here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Dawid > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >