Thanks for driving this Timo, +1 for voting ~
Best, Danny Chan 在 2020年9月10日 +0800 PM3:47,Timo Walther <twal...@apache.org>,写道: > Thanks everyone for this healthy discussion. I updated the FLIP with the > outcome. I think the result is very powerful but also very easy to > declare. Thanks for all the contributions. > > If there are no objections, I would continue with a voting. > > What do you think? > > Regards, > Timo > > > On 09.09.20 16:52, Timo Walther wrote: > > "If virtual by default, when a user types "timestamp int" ==> persisted > > column, then adds a "metadata" after that ==> virtual column, then adds > > a "persisted" after that ==> persisted column." > > > > Thanks for this nice mental model explanation, Jark. This makes total > > sense to me. Also making the the most common case as short at just > > adding `METADATA` is a very good idea. Thanks, Danny! > > > > Let me update the FLIP again with all these ideas. > > > > Regards, > > Timo > > > > > > On 09.09.20 15:03, Jark Wu wrote: > > > I'm also +1 to Danny's proposal: timestamp INT METADATA [FROM > > > 'my-timestamp-field'] [VIRTUAL] > > > Especially I like the shortcut: timestamp INT METADATA, this makes the > > > most > > > common case to be supported in the simplest way. > > > > > > I also think the default should be "PERSISTED", so VIRTUAL is optional > > > when > > > you are accessing a read-only metadata. Because: > > > 1. The "timestamp INT METADATA" should be a normal column, because > > > "METADATA" is just a modifier to indicate it is from metadata, a normal > > > column should be persisted. > > > If virtual by default, when a user types "timestamp int" ==> > > > persisted > > > column, then adds a "metadata" after that ==> virtual column, then adds a > > > "persisted" after that ==> persisted column. > > > I think this looks reversed several times and makes users confused. > > > Physical fields are also prefixed with "fieldName TYPE", so "timestamp > > > INT > > > METADATA" is persisted is very straightforward. > > > 2. From the collected user question [1], we can see that "timestamp" > > > is the > > > most common use case. "timestamp" is a read-write metadata. Persisted by > > > default doesn't break the reading behavior. > > > > > > Best, > > > Jark > > > > > > [1]: https://issues.apache.org/jira/browse/FLINK-15869 > > > > > > On Wed, 9 Sep 2020 at 20:56, Leonard Xu <xbjt...@gmail.com> wrote: > > > > > > > Thanks @Dawid for the nice summary, I think you catch all opinions of > > > > the > > > > long discussion well. > > > > > > > > @Danny > > > > “ timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL] > > > > Note that the "FROM 'field name'" is only needed when the name > > > > conflict > > > > with the declared table column name, when there are no conflicts, > > > > we can > > > > simplify it to > > > > timestamp INT METADATA" > > > > > > > > I really like the proposal, there is no confusion with computed > > > > column any > > > > more, and it’s concise enough. > > > > > > > > > > > > @Timo @Dawid > > > > “We use `SYSTEM_TIME` for temporal tables. I think prefixing with SYSTEM > > > > makes it clearer that it comes magically from the system.” > > > > “As for the issue of shortening the SYSTEM_METADATA to METADATA. Here I > > > > very much prefer the SYSTEM_ prefix.” > > > > > > > > I think `SYSTEM_TIME` is different with `SYSTEM_METADATA ` a lot, > > > > First of all, the word `TIME` has broad meanings but the word > > > > `METADATA ` > > > > not, `METADATA ` has specific meaning, > > > > Secondly, `FOR SYSTEM_TIME AS OF` exists in SQL standard but > > > > `SYSTEM_METADATA ` not. > > > > Personally, I like more simplify way,sometimes less is more. > > > > > > > > > > > > Best, > > > > Leonard > > > > > > > > > > > > > > > > > > > > > > Timo Walther <twal...@apache.org> 于2020年9月9日周三 下午6:41写道: > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > "key" and "value" in the properties are a special case because they > > > > > > need > > > > > > to configure a format. So key and value are more than just metadata. > > > > > > Jark's example for setting a timestamp would work but as the FLIP > > > > > > discusses, we have way more metadata fields like headers, > > > > > > epoch-leader, > > > > > > etc. Having a property for all of this metadata would mess up the > > > > > > WITH > > > > > > section entirely. Furthermore, we also want to deal with metadata > > > > > > from > > > > > > the formats. Solving this through properties as well would further > > > > > > complicate the property design. > > > > > > > > > > > > Personally, I still like the computed column design more because it > > > > > > allows to have full flexibility to compute the final column: > > > > > > > > > > > > timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS > > > > TIMESTAMP(3))) > > > > > > > > > > > > Instead of having a helper column and a real column in the table: > > > > > > > > > > > > helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3)) > > > > > > realTimestamp AS adjustTimestamp(helperTimestamp) > > > > > > > > > > > > But I see that the discussion leans towards: > > > > > > > > > > > > timestamp INT SYSTEM_METADATA("ts") > > > > > > > > > > > > Which is fine with me. It is the shortest solution, because we don't > > > > > > need additional CAST. We can discuss the syntax, so that confusion > > > > > > with > > > > > > computed columns can be avoided. > > > > > > > > > > > > timestamp INT USING SYSTEM_METADATA("ts") > > > > > > timestamp INT FROM SYSTEM_METADATA("ts") > > > > > > timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED > > > > > > > > > > > > We use `SYSTEM_TIME` for temporal tables. I think prefixing with > > > > > > SYSTEM > > > > > > makes it clearer that it comes magically from the system. > > > > > > > > > > > > What do you think? > > > > > > > > > > > > Regards, > > > > > > Timo > > > > > > > > > > > > > > > > > > > > > > > > On 09.09.20 11:41, Jark Wu wrote: > > > > > > > Hi Danny, > > > > > > > > > > > > > > This is not Oracle and MySQL computed column syntax, because > > > > > > > there is > > > > no > > > > > > > "AS" after the type. > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > If we want to use "offset INT SYSTEM_METADATA("offset")", then I > > > > > > > think > > > > we > > > > > > > must further discuss about "PERSISED" or "VIRTUAL" keyword for > > > > query-sink > > > > > > > schema problem. > > > > > > > Personally, I think we can use a shorter keyword "METADATA" for > > > > > > > "SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a system > > > > > > function > > > > > > > and confuse users this looks like a computed column. > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > Jark > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 9 Sep 2020 at 17:23, Danny Chan <danny0...@apache.org> > > > > > > > wrote: > > > > > > > > > > > > > > > "offset INT SYSTEM_METADATA("offset")" > > > > > > > > > > > > > > > > This is actually Oracle or MySQL style computed column syntax. > > > > > > > > > > > > > > > > "You are right that one could argue that "timestamp", "headers" > > > > > > > > are > > > > > > > > something like "key" and "value"" > > > > > > > > > > > > > > > > I have the same feeling, both key value and headers timestamp > > > > > > > > are > > > > *real* > > > > > > > > data > > > > > > > > stored in the consumed record, they are not computed or > > > > > > > > generated. > > > > > > > > > > > > > > > > "Trying to solve everything via properties sounds rather like a > > > > > > > > hack > > > > to > > > > > > > > me" > > > > > > > > > > > > > > > > Things are not that hack if we can unify the routines or the > > > > definitions > > > > > > > > (all from the computed column way or all from the table > > > > > > > > options), i > > > > also > > > > > > > > think that it is a hacky that we mix in 2 kinds of syntax for > > > > different > > > > > > > > kinds of metadata (read-only and read-write). In this FLIP, we > > > > > > > > declare > > > > > > the > > > > > > > > Kafka key fields with table options but SYSTEM_METADATA for > > > > > > > > other > > > > > > metadata, > > > > > > > > that is a hacky thing or something in-consistent. > > > > > > > > > > > > > > > > Kurt Young <ykt...@gmail.com> 于2020年9月9日周三 下午4:48写道: > > > > > > > > > > > > > > > > > I would vote for `offset INT SYSTEM_METADATA("offset")`. > > > > > > > > > > > > > > > > > > I don't think we can stick with the SQL standard in DDL part > > > > > > > > > forever, > > > > > > > > > especially as there are more and more > > > > > > > > > requirements coming from different connectors and external > > > > > > > > > systems. > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Kurt > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 4:40 PM Timo Walther > > > > > > > > > <twal...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Jark, > > > > > > > > > > > > > > > > > > > > now we are back at the original design proposed by Dawid :D > > > > > > > > > > Yes, we > > > > > > > > > > should be cautious about adding new syntax. But the length > > > > > > > > > > of this > > > > > > > > > > discussion shows that we are looking for a good long-term > > > > > > > > > > solution. > > > > In > > > > > > > > > > this case I would rather vote for a deep integration into > > > > > > > > > > the > > > > syntax. > > > > > > > > > > > > > > > > > > > > Computed columns are also not SQL standard compliant. And > > > > > > > > > > our > > > > > > > > > > DDL is > > > > > > > > > > neither, so we have some degree of freedom here. > > > > > > > > > > > > > > > > > > > > Trying to solve everything via properties sounds rather > > > > > > > > > > like a > > > > > > > > > > hack > > > > to > > > > > > > > > > me. You are right that one could argue that "timestamp", > > > > > > > > > > "headers" > > > > are > > > > > > > > > > something like "key" and "value". However, mixing > > > > > > > > > > > > > > > > > > > > `offset AS SYSTEM_METADATA("offset")` > > > > > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > > `'timestamp.field' = 'ts'` > > > > > > > > > > > > > > > > > > > > looks more confusing to users that an explicit > > > > > > > > > > > > > > > > > > > > `offset AS CAST(SYSTEM_METADATA("offset") AS INT)` > > > > > > > > > > > > > > > > > > > > or > > > > > > > > > > > > > > > > > > > > `offset INT SYSTEM_METADATA("offset")` > > > > > > > > > > > > > > > > > > > > that is symetric for both source and sink. > > > > > > > > > > > > > > > > > > > > What do others think? > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > Timo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 09.09.20 10:09, Jark Wu wrote: > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > I think we have a conclusion that the writable metadata > > > > > > > > > > > shouldn't > > > > be > > > > > > > > > > > defined as a computed column, but a normal column. > > > > > > > > > > > > > > > > > > > > > > "timestamp STRING SYSTEM_METADATA('timestamp')" is one of > > > > > > > > > > > the > > > > > > > > > approaches. > > > > > > > > > > > However, it is not SQL standard compliant, we need to be > > > > > > > > > > > cautious > > > > > > > > > enough > > > > > > > > > > > when adding new syntax. > > > > > > > > > > > Besides, we have to introduce the `PERSISTED` or `VIRTUAL` > > > > > > > > > > > keyword > > > > to > > > > > > > > > > > resolve the query-sink schema problem if it is read-only > > > > > > > > > > > metadata. > > > > > > > > That > > > > > > > > > > > adds more stuff to learn for users. > > > > > > > > > > > > > > > > > > > > > > > From my point of view, the "timestamp", "headers" are > > > > > > > > > > > > something > > > > like > > > > > > > > > > "key" > > > > > > > > > > > and "value" that stores with the real data. So why not > > > > > > > > > > > define the > > > > > > > > > > > "timestamp" in the same way with "key" by using a > > > > > > > > > > > "timestamp.field" > > > > > > > > > > > connector option? > > > > > > > > > > > On the other side, the read-only metadata, such as > > > > > > > > > > > "offset", > > > > > > > > shouldn't > > > > > > > > > be > > > > > > > > > > > defined as a normal column. So why not use the existing > > > > > > > > > > > computed > > > > > > > > column > > > > > > > > > > > syntax for such metadata? Then we don't have the > > > > > > > > > > > query-sink > > > > > > > > > > > schema > > > > > > > > > > problem. > > > > > > > > > > > So here is my proposal: > > > > > > > > > > > > > > > > > > > > > > CREATE TABLE kafka_table ( > > > > > > > > > > > id BIGINT, > > > > > > > > > > > name STRING, > > > > > > > > > > > col1 STRING, > > > > > > > > > > > col2 STRING, > > > > > > > > > > > ts TIMESTAMP(3) WITH LOCAL TIME ZONE, -- ts is a > > > > > > > > > > > normal > > > > field, > > > > > > > > so > > > > > > > > > > can > > > > > > > > > > > be read and written. > > > > > > > > > > > offset AS SYSTEM_METADATA("offset") > > > > > > > > > > > ) WITH ( > > > > > > > > > > > 'connector' = 'kafka', > > > > > > > > > > > 'topic' = 'test-topic', > > > > > > > > > > > 'key.fields' = 'id, name', > > > > > > > > > > > 'key.format' = 'csv', > > > > > > > > > > > 'value.format' = 'avro', > > > > > > > > > > > 'timestamp.field' = 'ts' -- define the mapping of > > > > > > > > > > > Kafka > > > > > > > > timestamp > > > > > > > > > > > ); > > > > > > > > > > > > > > > > > > > > > > INSERT INTO kafka_table > > > > > > > > > > > SELECT id, name, col1, col2, rowtime FROM another_table; > > > > > > > > > > > > > > > > > > > > > > I think this can solve all the problems without > > > > > > > > > > > introducing > > > > > > > > > > > any new > > > > > > > > > > syntax. > > > > > > > > > > > The only minor disadvantage is that we separate the > > > > > > > > > > > definition > > > > > > > > > way/syntax > > > > > > > > > > > of read-only metadata and read-write fields. > > > > > > > > > > > However, I don't think this is a big problem. > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > Jark > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 9 Sep 2020 at 15:09, Timo Walther > > > > > > > > > > > <twal...@apache.org> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Kurt, > > > > > > > > > > > > > > > > > > > > > > > > thanks for sharing your opinion. I'm totally up for not > > > > > > > > > > > > reusing > > > > > > > > > computed > > > > > > > > > > > > columns. I think Jark was a big supporter of this > > > > > > > > > > > > syntax, @Jark > > > > are > > > > > > > > > you > > > > > > > > > > > > fine with this as well? The non-computed column > > > > > > > > > > > > approach was > > > > > > > > > > > > only > > > > a > > > > > > > > > > > > "slightly rejected alternative". > > > > > > > > > > > > > > > > > > > > > > > > Furthermore, we would need to think about how such a > > > > > > > > > > > > new design > > > > > > > > > > > > influences the LIKE clause though. > > > > > > > > > > > > > > > > > > > > > > > > However, we should still keep the `PERSISTED` keyword > > > > > > > > > > > > as it > > > > > > > > influences > > > > > > > > > > > > the query->sink schema. If you look at the list of > > > > > > > > > > > > metadata for > > > > > > > > > existing > > > > > > > > > > > > connectors and formats, we currently offer only two > > > > > > > > > > > > writable > > > > > > > > metadata > > > > > > > > > > > > fields. Otherwise, one would need to declare two tables > > > > > > > > > > > > whenever a > > > > > > > > > > > > metadata columns is read (one for the source, one for > > > > > > > > > > > > the sink). > > > > > > > > This > > > > > > > > > > > > can be quite inconvientient e.g. for just reading the > > > > > > > > > > > > topic. > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > Timo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 09.09.20 08:52, Kurt Young wrote: > > > > > > > > > > > > > I also share the concern that reusing the computed > > > > > > > > > > > > > column > > > > > > > > > > > > > syntax > > > > > > > > but > > > > > > > > > > have > > > > > > > > > > > > > different semantics > > > > > > > > > > > > > would confuse users a lot. > > > > > > > > > > > > > > > > > > > > > > > > > > Besides, I think metadata fields are conceptually not > > > > > > > > > > > > > the same > > > > with > > > > > > > > > > > > > computed columns. The metadata > > > > > > > > > > > > > field is a connector specific thing and it only > > > > > > > > > > > > > contains the > > > > > > > > > > information > > > > > > > > > > > > > that where does the field come > > > > > > > > > > > > > from (during source) or where does the field need to > > > > > > > > > > > > > write to > > > > > > > > (during > > > > > > > > > > > > > sink). It's more similar with normal > > > > > > > > > > > > > fields, with assumption that all these fields need > > > > > > > > > > > > > going to the > > > > > > > > data > > > > > > > > > > > > part. > > > > > > > > > > > > > > > > > > > > > > > > > > Thus I'm more lean to the rejected alternative that > > > > > > > > > > > > > Timo > > > > mentioned. > > > > > > > > > > And I > > > > > > > > > > > > > think we don't need the > > > > > > > > > > > > > PERSISTED keyword, SYSTEM_METADATA should be enough. > > > > > > > > > > > > > > > > > > > > > > > > > > During implementation, the framework only needs to > > > > > > > > > > > > > pass such > > > > > > > > <field, > > > > > > > > > > > > > metadata field> information to the > > > > > > > > > > > > > connector, and the logic of handling such fields > > > > > > > > > > > > > inside the > > > > > > > > connector > > > > > > > > > > > > > should be straightforward. > > > > > > > > > > > > > > > > > > > > > > > > > > Regarding the downside Timo mentioned: > > > > > > > > > > > > > > > > > > > > > > > > > > > The disadvantage is that users cannot call UDFs or > > > > > > > > > > > > > > parse > > > > > > > > timestamps. > > > > > > > > > > > > > > > > > > > > > > > > > > I think this is fairly simple to solve. Since the > > > > > > > > > > > > > metadata > > > > > > > > > > > > > field > > > > > > > > > isn't > > > > > > > > > > a > > > > > > > > > > > > > computed column anymore, we can support > > > > > > > > > > > > > referencing such fields in the computed column. For > > > > > > > > > > > > > example: > > > > > > > > > > > > > > > > > > > > > > > > > > CREATE TABLE kafka_table ( > > > > > > > > > > > > > id BIGINT, > > > > > > > > > > > > > name STRING, > > > > > > > > > > > > > timestamp STRING > > > > > > > > > > > > > SYSTEM_METADATA("timestamp"), // > > > > > > > > > > > > > get the > > > > > > > > > > > > timestamp > > > > > > > > > > > > > field from metadata > > > > > > > > > > > > > ts AS to_timestamp(timestamp) // normal > > > > > > > > > > > > > computed > > > > > > > > > > > > > column, > > > > > > > > parse > > > > > > > > > > the > > > > > > > > > > > > > string to TIMESTAMP type by using the metadata field > > > > > > > > > > > > > ) WITH ( > > > > > > > > > > > > > ... > > > > > > > > > > > > > ) > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > Kurt > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Sep 8, 2020 at 11:57 PM Timo Walther > > > > > > > > > > > > > <twal...@apache.org > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Leonard, > > > > > > > > > > > > > > > > > > > > > > > > > > > > the only alternative I see is that we introduce a > > > > > > > > > > > > > > concept that > > > > is > > > > > > > > > > > > > > completely different to computed columns. This is > > > > > > > > > > > > > > also > > > > > > > > > > > > > > mentioned > > > > > > > > in > > > > > > > > > > the > > > > > > > > > > > > > > rejected alternative section of the FLIP. Something > > > > > > > > > > > > > > like: > > > > > > > > > > > > > > > > > > > > > > > > > > > > CREATE TABLE kafka_table ( > > > > > > > > > > > > > > id BIGINT, > > > > > > > > > > > > > > name STRING, > > > > > > > > > > > > > > timestamp INT SYSTEM_METADATA("timestamp") > > > > > > > > > > > > > > PERSISTED, > > > > > > > > > > > > > > headers MAP<STRING, BYTES> > > > > > > > > > > > > > > SYSTEM_METADATA("headers") > > > > > > > > > PERSISTED > > > > > > > > > > > > > > ) WITH ( > > > > > > > > > > > > > > ... > > > > > > > > > > > > > > ) > > > > > > > > > > > > > > > > > > > > > > > > > > > > This way we would avoid confusion at all and can > > > > > > > > > > > > > > easily map > > > > > > > > columns > > > > > > > > > to > > > > > > > > > > > > > > metadata columns. The disadvantage is that users > > > > > > > > > > > > > > cannot call > > > > UDFs > > > > > > > > or > > > > > > > > > > > > > > parse timestamps. This would need to be done in a > > > > > > > > > > > > > > real > > > > > > > > > > > > > > computed > > > > > > > > > > column. > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm happy about better alternatives. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > Timo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 08.09.20 15:37, Leonard Xu wrote: > > > > > > > > > > > > > > > HI, Timo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for driving this FLIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Sorry but I have a concern about Writing metadata > > > > > > > > > > > > > > > via > > > > > > > > > > DynamicTableSink > > > > > > > > > > > > > > section: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > CREATE TABLE kafka_table ( > > > > > > > > > > > > > > > id BIGINT, > > > > > > > > > > > > > > > name STRING, > > > > > > > > > > > > > > > timestamp AS > > > > > > > > > > > > > > > CAST(SYSTEM_METADATA("timestamp") AS > > > > > > > > > > > > > > > BIGINT) > > > > > > > > > > > > PERSISTED, > > > > > > > > > > > > > > > headers AS CAST(SYSTEM_METADATA("headers") > > > > > > > > > > > > > > > AS > > > > > > > > > > > > > > > MAP<STRING, > > > > > > > > > > BYTES>) > > > > > > > > > > > > > > PERSISTED > > > > > > > > > > > > > > > ) WITH ( > > > > > > > > > > > > > > > ... > > > > > > > > > > > > > > > ) > > > > > > > > > > > > > > > An insert statement could look like: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > INSERT INTO kafka_table VALUES ( > > > > > > > > > > > > > > > (1, "ABC", 1599133672, MAP('checksum', > > > > > > > > computeChecksum(...))) > > > > > > > > > > > > > > > ) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The proposed INERT syntax does not make sense to > > > > > > > > > > > > > > > me, > > > > > > > > > > > > > > > because it > > > > > > > > > > > > contains > > > > > > > > > > > > > > computed(generated) column. > > > > > > > > > > > > > > > Both SQL server and Postgresql do not allow to > > > > > > > > > > > > > > > insert > > > > > > > > > > > > > > > value to > > > > > > > > > > computed > > > > > > > > > > > > > > columns even they are persisted, this boke the > > > > > > > > > > > > > > generated > > > > > > > > > > > > > > column > > > > > > > > > > > > semantics > > > > > > > > > > > > > > and may confuse user much. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For SQL server computed column[1]: > > > > > > > > > > > > > > > > column_name AS computed_column_expression [ > > > > > > > > > > > > > > > > PERSISTED [ NOT > > > > > > > > NULL ] > > > > > > > > > > > > ]... > > > > > > > > > > > > > > > > NOTE: A computed column cannot be the target of > > > > > > > > > > > > > > > > an INSERT or > > > > > > > > > UPDATE > > > > > > > > > > > > > > statement. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For Postgresql generated column[2]: > > > > > > > > > > > > > > > > height_in numeric GENERATED ALWAYS AS > > > > > > > > > > > > > > > > (height_cm / > > > > > > > > > > > > > > > > 2.54) > > > > > > > > > STORED > > > > > > > > > > > > > > > > NOTE: A generated column cannot be written to > > > > > > > > > > > > > > > > directly. In > > > > > > > > INSERT > > > > > > > > > or > > > > > > > > > > > > > > UPDATE commands, a value cannot be specified for a > > > > > > > > > > > > > > generated > > > > > > > > column, > > > > > > > > > > but > > > > > > > > > > > > > > the keyword DEFAULT may be specified. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It shouldn't be allowed to set/update value for > > > > > > > > > > > > > > > generated > > > > column > > > > > > > > > > after > > > > > > > > > > > > > > lookup the SQL 2016: > > > > > > > > > > > > > > > > <insert statement> ::= > > > > > > > > > > > > > > > > INSERT INTO <insertion target> <insert columns > > > > > > > > > > > > > > > > and source> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > If <contextually typed table value constructor> > > > > > > > > > > > > > > > > CTTVC is > > > > > > > > > specified, > > > > > > > > > > > > > > then every <contextually typed row > > > > > > > > > > > > > > > > value constructor element> simply contained in > > > > > > > > > > > > > > > > CTTVC whose > > > > > > > > > > > > positionally > > > > > > > > > > > > > > corresponding <column name> > > > > > > > > > > > > > > > > in <insert column list> references a column of > > > > > > > > > > > > > > > > which some > > > > > > > > > underlying > > > > > > > > > > > > > > column is a generated column shall > > > > > > > > > > > > > > > > be a <default specification>. > > > > > > > > > > > > > > > > A <default specification> specifies the default > > > > > > > > > > > > > > > > value of > > > > > > > > > > > > > > > > some > > > > > > > > > > > > > > associated item. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15 > > > > > > > > > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [2] > > > > > > > > https://www.postgresql.org/docs/12/ddl-generated-columns.html > > > > > > > > > < > > > > > > > > > > > > > > https://www.postgresql.org/docs/12/ddl-generated-columns.html> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2020年9月8日,17:31,Timo Walther > > > > > > > > > > > > > > > > <twal...@apache.org> > > > > > > > > > > > > > > > > 写道: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Jark, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > according to Flink's and Calcite's casting > > > > > > > > > > > > > > > > definition in > > > > [1][2] > > > > > > > > > > > > > > TIMESTAMP WITH LOCAL TIME ZONE should be castable > > > > > > > > > > > > > > from BIGINT. > > > > If > > > > > > > > > not, > > > > > > > > > > > > we > > > > > > > > > > > > > > will make it possible ;-) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm aware of > > > > > > > > > > > > > > > > DeserializationSchema.getProducedType but I > > > > > > > > > > > > > > > > think > > > > > > > > > that > > > > > > > > > > > > > > this method is actually misplaced. The type should > > > > > > > > > > > > > > rather be > > > > > > > > passed > > > > > > > > > to > > > > > > > > > > > > the > > > > > > > > > > > > > > source itself. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For our Kafka SQL source, we will also not use > > > > > > > > > > > > > > > > this method > > > > > > > > because > > > > > > > > > > the > > > > > > > > > > > > > > Kafka source will add own metadata in addition to > > > > > > > > > > > > > > the > > > > > > > > > > > > > > DeserializationSchema. So > > > > > > > > > > > > > > DeserializationSchema.getProducedType > > > > > > > > will > > > > > > > > > > > > never > > > > > > > > > > > > > > be read. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For now I suggest to leave out the `DataType` > > > > > > > > > > > > > > > > from > > > > > > > > > > > > > > DecodingFormat.applyReadableMetadata. Also because > > > > > > > > > > > > > > the > > > > > > > > > > > > > > format's > > > > > > > > > > physical > > > > > > > > > > > > > > type is passed later in `createRuntimeDecoder`. If > > > > > > > > > > > > > > necessary, it > > > > > > > > can > > > > > > > > > > be > > > > > > > > > > > > > > computed manually by consumedType + metadata types. > > > > > > > > > > > > > > We will > > > > > > > > provide > > > > > > > > > a > > > > > > > > > > > > > > metadata utility class for that. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > > Timo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200 > > > > > > > > > > > > > > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 08.09.20 10:52, Jark Wu wrote: > > > > > > > > > > > > > > > > > Hi Timo, > > > > > > > > > > > > > > > > > The updated CAST SYSTEM_METADATA behavior > > > > > > > > > > > > > > > > > sounds good to > > > > > > > > > > > > > > > > > me. > > > > I > > > > > > > > > just > > > > > > > > > > > > > > noticed > > > > > > > > > > > > > > > > > that a BIGINT can't be converted to > > > > > > > > > > > > > > > > > "TIMESTAMP(3) WITH > > > > > > > > > > > > > > > > > LOCAL > > > > > > > > TIME > > > > > > > > > > > > > > ZONE". > > > > > > > > > > > > > > > > > So maybe we need to support this, or use > > > > > > > > > > > > > > > > > "TIMESTAMP(3) WITH > > > > > > > > LOCAL > > > > > > > > > > > > TIME > > > > > > > > > > > > > > > > > ZONE" as the defined type of Kafka timestamp? > > > > > > > > > > > > > > > > > I think this > > > > > > > > makes > > > > > > > > > > > > sense, > > > > > > > > > > > > > > > > > because it represents the milli-seconds since > > > > > > > > > > > > > > > > > epoch. > > > > > > > > > > > > > > > > > Regarding "DeserializationSchema doesn't need > > > > > > > > > > > > > > > > > TypeInfo", I > > > > > > > > don't > > > > > > > > > > > > think > > > > > > > > > > > > > > so. > > > > > > > > > > > > > > > > > The DeserializationSchema implements > > > > > > > > > > > > > > > > > ResultTypeQueryable, > > > > thus > > > > > > > > > the > > > > > > > > > > > > > > > > > implementation needs to return an output > > > > > > > > > > > > > > > > > TypeInfo. > > > > > > > > > > > > > > > > > Besides, FlinkKafkaConsumer also > > > > > > > > > > > > > > > > > calls DeserializationSchema.getProducedType > > > > > > > > > > > > > > > > > as the produced > > > > > > > > type > > > > > > > > > of > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > source function [1]. > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > Jark > > > > > > > > > > > > > > > > > [1]: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066 > > > > > > > > > > > > > > > > > > > > > On Tue, 8 Sep 2020 at 16:35, Timo Walther < > > > > twal...@apache.org> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I updated the FLIP again and hope that I > > > > > > > > > > > > > > > > > > could address the > > > > > > > > > > mentioned > > > > > > > > > > > > > > > > > > concerns. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > @Leonard: Thanks for the explanation. I > > > > > > > > > > > > > > > > > > wasn't aware that > > > > > > > > ts_ms > > > > > > > > > > and > > > > > > > > > > > > > > > > > > source.ts_ms have different semantics. I > > > > > > > > > > > > > > > > > > updated the FLIP > > > > and > > > > > > > > > > expose > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > most commonly used properties separately. > > > > > > > > > > > > > > > > > > So frequently > > > > > > > > > > > > > > > > > > used > > > > > > > > > > > > > > properties > > > > > > > > > > > > > > > > > > are not hidden in the MAP anymore: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > debezium-json.ingestion-timestamp > > > > > > > > > > > > > > > > > > debezium-json.source.timestamp > > > > > > > > > > > > > > > > > > debezium-json.source.database > > > > > > > > > > > > > > > > > > debezium-json.source.schema > > > > > > > > > > > > > > > > > > debezium-json.source.table > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > However, since other properties depend on > > > > > > > > > > > > > > > > > > the used > > > > > > > > > > connector/vendor, > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > remaining options are stored in: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > debezium-json.source.properties > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > And accessed with: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > CAST(SYSTEM_METADATA('debezium-json.source.properties') > > > > > > > > > > > > > > > > > > AS > > > > > > > > > > > > MAP<STRING, > > > > > > > > > > > > > > > > > > STRING>)['table'] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Otherwise it is not possible to figure out > > > > > > > > > > > > > > > > > > the value and > > > > > > > > column > > > > > > > > > > type > > > > > > > > > > > > > > > > > > during validation. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > @Jark: You convinced me in relaxing the CAST > > > > > > > > > > > > > > > > > > constraints. I > > > > > > > > > added > > > > > > > > > > a > > > > > > > > > > > > > > > > > > dedicacated sub-section to the FLIP: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For making the use of SYSTEM_METADATA > > > > > > > > > > > > > > > > > > easier and avoid > > > > nested > > > > > > > > > > > > casting > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > > > allow explicit casting to a target data > > > > > > > > > > > > > > > > > > type: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > rowtime AS > > > > > > > > > > > > > > > > > > CAST(SYSTEM_METADATA("timestamp") AS > > > > > > > > > > > > > > > > > > TIMESTAMP(3) > > > > > > > > > WITH > > > > > > > > > > > > > > LOCAL > > > > > > > > > > > > > > > > > > TIME ZONE) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > A connector still produces and consumes the > > > > > > > > > > > > > > > > > > data type > > > > returned > > > > > > > > > by > > > > > > > > > > > > > > > > > > `listMetadata()`. The planner will insert > > > > > > > > > > > > > > > > > > necessary > > > > > > > > > > > > > > > > > > explicit > > > > > > > > > > casts. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In any case, the user must provide a CAST > > > > > > > > > > > > > > > > > > such that the > > > > > > > > computed > > > > > > > > > > > > > > column > > > > > > > > > > > > > > > > > > receives a valid data type when > > > > > > > > > > > > > > > > > > constructing the table > > > > schema. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > "I don't see a reason why > > > > > > > > `DecodingFormat#applyReadableMetadata` > > > > > > > > > > > > > > needs a > > > > > > > > > > > > > > > > > > DataType argument." > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Correct he DeserializationSchema doesn't > > > > > > > > > > > > > > > > > > need TypeInfo, it > > > > is > > > > > > > > > > always > > > > > > > > > > > > > > > > > > executed locally. It is the source that > > > > > > > > > > > > > > > > > > needs TypeInfo for > > > > > > > > > > > > serializing > > > > > > > > > > > > > > > > > > the record to the next operator. And that's > > > > > > > > > > > > > > > > > > this is > > > > > > > > > > > > > > > > > > what we > > > > > > > > > > provide. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > @Danny: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > “SYSTEM_METADATA("offset")` returns the > > > > > > > > > > > > > > > > > > NULL type by > > > > default” > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We can also use some other means to > > > > > > > > > > > > > > > > > > represent an UNKNOWN > > > > data > > > > > > > > > > type. > > > > > > > > > > > > In > > > > > > > > > > > > > > > > > > the Flink type system, we use the NullType > > > > > > > > > > > > > > > > > > for it. The > > > > > > > > important > > > > > > > > > > > > part > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > > that the final data type is known for the > > > > > > > > > > > > > > > > > > entire computed > > > > > > > > > column. > > > > > > > > > > > > As I > > > > > > > > > > > > > > > > > > mentioned before, I would avoid the > > > > > > > > > > > > > > > > > > suggested option b) > > > > > > > > > > > > > > > > > > that > > > > > > > > > would > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > > similar to your suggestion. The CAST should > > > > > > > > > > > > > > > > > > be enough and > > > > > > > > allows > > > > > > > > > > for > > > > > > > > > > > > > > > > > > complex expressions in the computed column. > > > > > > > > > > > > > > > > > > Option b) > > > > > > > > > > > > > > > > > > would > > > > > > > > need > > > > > > > > > > > > > > parser > > > > > > > > > > > > > > > > > > changes. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > > > > Timo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 08.09.20 06:21, Leonard Xu wrote: > > > > > > > > > > > > > > > > > > > Hi, Timo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for you explanation and update, I > > > > > > > > > > > > > > > > > > > have only one > > > > > > > > > question > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > the latest FLIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > About the MAP<STRING, STRING> DataType of > > > > > > > > > > > > > > > > > > > key > > > > > > > > > > > > > > 'debezium-json.source', if > > > > > > > > > > > > > > > > > > user want to use the table name metadata, > > > > > > > > > > > > > > > > > > they need to > > > > write: > > > > > > > > > > > > > > > > > > > tableName STRING AS > > > > > > > > CAST(SYSTEM_METADATA('debeuim-json.source') > > > > > > > > > > AS > > > > > > > > > > > > > > > > > > MAP<STRING, STRING>)['table'] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > the expression is a little complex for > > > > > > > > > > > > > > > > > > > user, Could we > > > > > > > > > > > > > > > > > > > only > > > > > > > > > > support > > > > > > > > > > > > > > > > > > necessary metas with simple DataType as > > > > > > > > > > > > > > > > > > following? > > > > > > > > > > > > > > > > > > > tableName STRING AS > > > > > > > > > > > > > > CAST(SYSTEM_METADATA('debeuim-json.source.table') AS > > > > > > > > > > > > > > > > > > STRING), > > > > > > > > > > > > > > > > > > > transactionTime LONG AS > > > > > > > > > > > > > > > > > > CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') > > > > > > > > > > > > > > > > > > AS > > > > BIGINT), > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In this way, we can simplify the > > > > > > > > > > > > > > > > > > > expression, the mainly > > > > used > > > > > > > > > > > > > > metadata in > > > > > > > > > > > > > > > > > > changelog format may include > > > > > > > > > > > > > > 'database','table','source.ts_ms','ts_ms' from > > > > > > > > > > > > > > > > > > my side, > > > > > > > > > > > > > > > > > > > maybe we could only support them at first > > > > > > > > > > > > > > > > > > > version. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Both Debezium and Canal have above four > > > > > > > > > > > > > > > > > > > metadata, and I‘m > > > > > > > > > willing > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > take some subtasks in next development if > > > > > > > > > > > > > > > > > > necessary. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Debezium: > > > > > > > > > > > > > > > > > > > { > > > > > > > > > > > > > > > > > > > "before": null, > > > > > > > > > > > > > > > > > > > "after": { "id": 101,"name": > > > > > > > > > > > > > > > > > > > "scooter"}, > > > > > > > > > > > > > > > > > > > "source": { > > > > > > > > > > > > > > > > > > > "db": "inventory", > > > > > > > > > > > > > > > > > > > # 1. > > > > > > > > > > > > > > > > > > > database > > > > > > > > name > > > > > > > > > > the > > > > > > > > > > > > > > > > > > changelog belongs to. > > > > > > > > > > > > > > > > > > > "table": "products", > > > > > > > > > > > > > > > > > > > # 2. > > > > > > > > > > > > > > > > > > > table name > > > > > > > > the > > > > > > > > > > > > > > changelog > > > > > > > > > > > > > > > > > > belongs to. > > > > > > > > > > > > > > > > > > > "ts_ms": 1589355504100, > > > > > > > > > > > > > > > > > > > # 3. > > > > > > > > > > > > > > > > > > > timestamp > > > > > > of > > > > > > > > > the > > > > > > > > > > > > > > change > > > > > > > > > > > > > > > > > > happened in database system, i.e.: > > > > > > > > > > > > > > > > > > transaction time in > > > > > > > > database. > > > > > > > > > > > > > > > > > > > "connector": "mysql", > > > > > > > > > > > > > > > > > > > …. > > > > > > > > > > > > > > > > > > > }, > > > > > > > > > > > > > > > > > > > "ts_ms": 1589355606100, > > > > > > > > > > > > > > > > > > > # 4. > > > > > > > > > > > > > > > > > > > timestamp > > > > > > > > when > > > > > > > > > > the > > > > > > > > > > > > > > debezium > > > > > > > > > > > > > > > > > > processed the changelog. > > > > > > > > > > > > > > > > > > > "op": "c", > > > > > > > > > > > > > > > > > > > "transaction": null > > > > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Canal: > > > > > > > > > > > > > > > > > > > { > > > > > > > > > > > > > > > > > > > "data": [{ "id": "102", "name": > > > > > > > > > > > > > > > > > > > "car battery" }], > > > > > > > > > > > > > > > > > > > "database": "inventory", # 1. > > > > > > > > > > > > > > > > > > > database > > > > > > > > > > > > > > > > > > > name the > > > > > > > > > > changelog > > > > > > > > > > > > > > > > > > belongs to. > > > > > > > > > > > > > > > > > > > "table": "products", # 2. > > > > > > > > > > > > > > > > > > > table name the > > > > > > > > > changelog > > > > > > > > > > > > > > belongs > > > > > > > > > > > > > > > > > > to. > > > > > > > > > > > > > > > > > > > "es": 1589374013000, # 3. > > > > > > > > > > > > > > > > > > > execution > > > > > > > > > > > > > > > > > > > time of > > > > > > > > the > > > > > > > > > > > > change > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > > database system, i.e.: transaction time in > > > > > > > > > > > > > > > > > > database. > > > > > > > > > > > > > > > > > > > "ts": 1589374013680, # 4. > > > > > > > > > > > > > > > > > > > timestamp > > > > > > > > > > > > > > > > > > > when the > > > > > > > > > > cannal > > > > > > > > > > > > > > > > > > processed the changelog. > > > > > > > > > > > > > > > > > > > "isDdl": false, > > > > > > > > > > > > > > > > > > > "mysqlType": {}, > > > > > > > > > > > > > > > > > > > .... > > > > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best > > > > > > > > > > > > > > > > > > > Leonard > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2020年9月8日,11:57,Danny Chan > > > > > > > > > > > > > > > > > > > > <yuzhao....@gmail.com> 写道: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Timo ~ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The FLIP was already in pretty good > > > > > > > > > > > > > > > > > > > > shape, I have only 2 > > > > > > > > > > questions > > > > > > > > > > > > > > here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. “`CAST(SYSTEM_METADATA("offset") AS > > > > > > > > > > > > > > > > > > > > INT)` would be a > > > > > > > > valid > > > > > > > > > > > > > > read-only > > > > > > > > > > > > > > > > > > computed column for Kafka and can be > > > > > > > > > > > > > > > > > > extracted by the > > > > > > > > planner.” > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > What is the pros we follow the > > > > > > > > > > > > > > > > > > > > SQL-SERVER syntax here ? > > > > > > > > > Usually > > > > > > > > > > an > > > > > > > > > > > > > > > > > > expression return type can be inferred > > > > > > > > > > > > > > > > > > automatically. > > > > > > > > > > > > > > > > > > But I > > > > > > > > > guess > > > > > > > > > > > > > > > > > > SQL-SERVER does not have function like > > > > > > > > > > > > > > > > > > SYSTEM_METADATA > > > > > > > > > > > > > > > > > > which > > > > > > > > > > > > actually > > > > > > > > > > > > > > does > > > > > > > > > > > > > > > > > > not have a specific return type. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > And why not use the Oracle or MySQL > > > > > > > > > > > > > > > > > > > > syntax there ? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > column_name [datatype] [GENERATED > > > > > > > > > > > > > > > > > > > > ALWAYS] AS > > > > > > > > > > > > > > > > > > > > (expression) > > > > > > > > > > > > [VIRTUAL] > > > > > > > > > > > > > > > > > > > > Which is more straight-forward. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. “SYSTEM_METADATA("offset")` returns > > > > > > > > > > > > > > > > > > > > the NULL type by > > > > > > > > > default” > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The default type should not be NULL > > > > > > > > > > > > > > > > > > > > because only NULL > > > > > > > > literal > > > > > > > > > > does > > > > > > > > > > > > > > > > > > that. Usually we use ANY as the type if we > > > > > > > > > > > > > > > > > > do not know the > > > > > > > > > > specific > > > > > > > > > > > > > > type in > > > > > > > > > > > > > > > > > > the SQL context. ANY means the physical > > > > > > > > > > > > > > > > > > value can be any > > > > java > > > > > > > > > > > > object. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > https://oracle-base.com/articles/11g/virtual-columns-11gr1 > > > > > > > > > > > > > > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > Danny Chan > > > > > > > > > > > > > > > > > > > > 在 2020年9月4日 +0800 PM4:48,Timo Walther > > > > > > > > > > > > > > > > > > > > <twal...@apache.org > > > > > > > > > > ,写道: > > > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I completely reworked FLIP-107. It > > > > > > > > > > > > > > > > > > > > > now covers the full > > > > > > > > story > > > > > > > > > > how > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > read > > > > > > > > > > > > > > > > > > > > > and write metadata from/to connectors > > > > > > > > > > > > > > > > > > > > > and formats. It > > > > > > > > > considers > > > > > > > > > > > > > > all of > > > > > > > > > > > > > > > > > > > > > the latest FLIPs, namely FLIP-95, > > > > > > > > > > > > > > > > > > > > > FLIP-132 and > > > > > > > > > > > > > > > > > > > > > FLIP-122. > > > > It > > > > > > > > > > > > > > introduces > > > > > > > > > > > > > > > > > > > > > the concept of PERSISTED computed > > > > > > > > > > > > > > > > > > > > > columns and leaves > > > > > > > > > > > > > > > > > > > > > out > > > > > > > > > > > > > > partitioning > > > > > > > > > > > > > > > > > > > > > for now. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Looking forward to your feedback. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > > > > > > > Timo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 04.03.20 09:45, Kurt Young wrote: > > > > > > > > > > > > > > > > > > > > > > Sorry, forgot one question. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 4. Can we make the > > > > > > > > > > > > > > > > > > > > > > value.fields-include more > > > > > > > > > > > > > > > > > > > > > > orthogonal? > > > > > > > > > Like > > > > > > > > > > > > one > > > > > > > > > > > > > > can > > > > > > > > > > > > > > > > > > > > > > specify it as "EXCEPT_KEY, > > > > > > > > > > > > > > > > > > > > > > EXCEPT_TIMESTAMP". > > > > > > > > > > > > > > > > > > > > > > With current EXCEPT_KEY and > > > > > > > > > > > > > > > > > > > > > > EXCEPT_KEY_TIMESTAMP, > > > > > > > > > > > > > > > > > > > > > > users > > > > > > > > can > > > > > > > > > > not > > > > > > > > > > > > > > > > > > config to > > > > > > > > > > > > > > > > > > > > > > just ignore timestamp but keep key. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > Kurt > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 4, 2020 at 4:42 PM Kurt > > > > > > > > > > > > > > > > > > > > > > Young < > > > > > > > > ykt...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Dawid, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I have a couple of questions > > > > > > > > > > > > > > > > > > > > > > > around key fields, > > > > actually > > > > > > > > I > > > > > > > > > > also > > > > > > > > > > > > > > have > > > > > > > > > > > > > > > > > > some > > > > > > > > > > > > > > > > > > > > > > > other questions but want to be > > > > > > > > > > > > > > > > > > > > > > > focused on key fields > > > > > > > > first. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. I don't fully understand the > > > > > > > > > > > > > > > > > > > > > > > usage of > > > > > > > > > > > > > > > > > > > > > > > "key.fields". > > > > Is > > > > > > > > > > this > > > > > > > > > > > > > > > > > > option only > > > > > > > > > > > > > > > > > > > > > > > valid during write operation? > > > > > > > > > > > > > > > > > > > > > > > Because for > > > > > > > > > > > > > > > > > > > > > > > reading, I can't imagine how such > > > > > > > > > > > > > > > > > > > > > > > options can be > > > > > > > > applied. I > > > > > > > > > > > > would > > > > > > > > > > > > > > > > > > expect > > > > > > > > > > > > > > > > > > > > > > > that there might be a > > > > > > > > > > > > > > > > > > > > > > > SYSTEM_METADATA("key") > > > > > > > > > > > > > > > > > > > > > > > to read and assign the key to a > > > > > > > > > > > > > > > > > > > > > > > normal field? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. If "key.fields" is only valid > > > > > > > > > > > > > > > > > > > > > > > in write > > > > > > > > > > > > > > > > > > > > > > > operation, I > > > > > > > > want > > > > > > > > > > to > > > > > > > > > > > > > > > > > > propose we > > > > > > > > > > > > > > > > > > > > > > > can simplify the options to not > > > > > > > > > > > > > > > > > > > > > > > introducing > > > > > > > > key.format.type > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > > > > > other related options. I think a > > > > > > > > > > > > > > > > > > > > > > > single "key.field" > > > > (not > > > > > > > > > > > > fields) > > > > > > > > > > > > > > > > > > would be > > > > > > > > > > > > > > > > > > > > > > > enough, users can use UDF to > > > > > > > > > > > > > > > > > > > > > > > calculate whatever key > > > > they > > > > > > > > > > > > > > > > > > > > > > > want before sink. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. Also I don't want to introduce > > > > > > > > > > > > > > > > > > > > > > > "value.format.type" > > > > and > > > > > > > > > > > > > > > > > > > > > > > "value.format.xxx" with the > > > > > > > > > > > > > > > > > > > > > > > "value" prefix. Not every > > > > > > > > > > connector > > > > > > > > > > > > > > has a > > > > > > > > > > > > > > > > > > > > > > > concept > > > > > > > > > > > > > > > > > > > > > > > of key and values. The old > > > > > > > > > > > > > > > > > > > > > > > parameter "format.type" > > > > > > > > already > > > > > > > > > > good > > > > > > > > > > > > > > > > > > enough to > > > > > > > > > > > > > > > > > > > > > > > use. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > > Kurt > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 3, 2020 at 10:40 PM > > > > > > > > > > > > > > > > > > > > > > > Jark Wu < > > > > > > > > imj...@gmail.com> > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Dawid, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I have two more questions. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > SupportsMetadata > > > > > > > > > > > > > > > > > > > > > > > > Introducing SupportsMetadata > > > > > > > > > > > > > > > > > > > > > > > > sounds good to me. > > > > > > > > > > > > > > > > > > > > > > > > But I > > > > > > > > have > > > > > > > > > > > > some > > > > > > > > > > > > > > > > > > questions > > > > > > > > > > > > > > > > > > > > > > > > regarding to this interface. > > > > > > > > > > > > > > > > > > > > > > > > 1) How do the source know what > > > > > > > > > > > > > > > > > > > > > > > > the expected return > > > > type > > > > > > > > of > > > > > > > > > > > > each > > > > > > > > > > > > > > > > > > metadata? > > > > > > > > > > > > > > > > > > > > > > > > 2) Where to put the metadata > > > > > > > > > > > > > > > > > > > > > > > > fields? Append to the > > > > > > > > > existing > > > > > > > > > > > > > > physical > > > > > > > > > > > > > > > > > > > > > > > > fields? > > > > > > > > > > > > > > > > > > > > > > > > If yes, I would suggest to > > > > > > > > > > > > > > > > > > > > > > > > change the signature to > > > > > > > > > > > > `TableSource > > > > > > > > > > > > > > > > > > > > > > > > appendMetadataFields(String[] > > > > > > > > > > > > > > > > > > > > > > > > metadataNames, > > > > DataType[] > > > > > > > > > > > > > > > > > > metadataTypes)` > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > SYSTEM_METADATA("partition") > > > > > > > > > > > > > > > > > > > > > > > > Can SYSTEM_METADATA() function > > > > > > > > > > > > > > > > > > > > > > > > be used nested in a > > > > > > > > > computed > > > > > > > > > > > > > > column > > > > > > > > > > > > > > > > > > > > > > > > expression? If yes, how to > > > > > > > > > > > > > > > > > > > > > > > > specify the return > > > > > > > > > > > > > > > > > > > > > > > > type of > > > > > > > > > > > > > > > > > > SYSTEM_METADATA? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > > > Jark > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, 3 Mar 2020 at 17:06, > > > > > > > > > > > > > > > > > > > > > > > > Dawid Wysakowicz < > > > > > > > > > > > > > > > > > > dwysakow...@apache.org> > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. I thought a bit more on > > > > > > > > > > > > > > > > > > > > > > > > > how the source would > > > > > > > > > > > > > > > > > > > > > > > > > emit > > > > > > > > the > > > > > > > > > > > > > > columns > > > > > > > > > > > > > > > > > > and I > > > > > > > > > > > > > > > > > > > > > > > > > now see its not exactly the > > > > > > > > > > > > > > > > > > > > > > > > > same as regular > > > > > > > > > > > > > > > > > > > > > > > > > columns. > > > > I > > > > > > > > > see > > > > > > > > > > a > > > > > > > > > > > > > > need > > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > > > > > > elaborate a bit more on that > > > > > > > > > > > > > > > > > > > > > > > > > in the FLIP as you > > > > asked, > > > > > > > > > > Jark. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I do agree mostly with Danny > > > > > > > > > > > > > > > > > > > > > > > > > on how we should do > > > > that. > > > > > > > > > One > > > > > > > > > > > > > > > > > > additional > > > > > > > > > > > > > > > > > > > > > > > > > things I would introduce is an > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > interface SupportsMetadata { > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > boolean > > > > > > > > > > > > > > > > > > > > > > > > > supportsMetadata(Set<String> > > > > > > > > > > > > > > > > > > > > > > > > > metadataFields); > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > TableSource > > > > > > > > > > > > > > > > > > > > > > > > > generateMetadataFields(Set<String> > > > > > > > > > > > > metadataFields); > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This way the source would > > > > > > > > > > > > > > > > > > > > > > > > > have to declare/emit only > > > > the > > > > > > > > > > > > > > requested > > > > > > > > > > > > > > > > > > > > > > > > > metadata fields. In order not > > > > > > > > > > > > > > > > > > > > > > > > > to clash with user > > > > > > > > defined > > > > > > > > > > > > > > fields. > > > > > > > > > > > > > > > > > > When > > > > > > > > > > > > > > > > > > > > > > > > > emitting the metadata field I > > > > > > > > > > > > > > > > > > > > > > > > > would prepend the > > > > column > > > > > > > > > name > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > > > > > > > > > > __system_{property_name}. > > > > > > > > > > > > > > > > > > > > > > > > > Therefore when requested > > > > > > > > > > > > > > > > > > > > > > > > > SYSTEM_METADATA("partition") > > > > > > > > > > > > > > > > > > > > > > > > > the source would > > > > > > > > > > > > > > > > > > > > > > > > > append > > > > a > > > > > > > > > > field > > > > > > > > > > > > > > > > > > > > > > > > > __system_partition to the > > > > > > > > > > > > > > > > > > > > > > > > > schema. This would be > > > > > > > > > > > > > > > > > > > > > > > > > never > > > > > > > > > > visible > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > user as it would be used only > > > > > > > > > > > > > > > > > > > > > > > > > for the subsequent > > > > > > > > computed > > > > > > > > > > > > > > columns. > > > > > > > > > > > > > > > > > > If > > > > > > > > > > > > > > > > > > > > > > > > > that makes sense to you, I > > > > > > > > > > > > > > > > > > > > > > > > > will update the FLIP > > > > > > > > > > > > > > > > > > > > > > > > > with > > > > > > > > this > > > > > > > > > > > > > > > > > > description. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. CAST vs explicit type in > > > > > > > > > > > > > > > > > > > > > > > > > computed columns > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Here I agree with Danny. It > > > > > > > > > > > > > > > > > > > > > > > > > is also the current > > > > > > > > > > > > > > > > > > > > > > > > > state > > > > > > > > of > > > > > > > > > > the > > > > > > > > > > > > > > > > > > proposal. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. Partitioning on computed > > > > > > > > > > > > > > > > > > > > > > > > > column vs function > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Here I also agree with Danny. > > > > > > > > > > > > > > > > > > > > > > > > > I also think those > > > > > > > > > > > > > > > > > > > > > > > > > are > > > > > > > > > > > > > > orthogonal. I > > > > > > > > > > > > > > > > > > would > > > > > > > > > > > > > > > > > > > > > > > > > leave out the STORED computed > > > > > > > > > > > > > > > > > > > > > > > > > columns out of the > > > > > > > > > > discussion. > > > > > > > > > > > > I > > > > > > > > > > > > > > > > > > don't see > > > > > > > > > > > > > > > > > > > > > > > > > how do they relate to the > > > > > > > > > > > > > > > > > > > > > > > > > partitioning. I > > > > > > > > > > > > > > > > > > > > > > > > > already put > > > > > > > > > both > > > > > > > > > > of > > > > > > > > > > > > > > those > > > > > > > > > > > > > > > > > > > > > > > > > cases in the document. We can > > > > > > > > > > > > > > > > > > > > > > > > > either partition on a > > > > > > > > > > computed > > > > > > > > > > > > > > > > > > column or > > > > > > > > > > > > > > > > > > > > > > > > > use a udf in a partioned by > > > > > > > > > > > > > > > > > > > > > > > > > clause. I am fine with > > > > > > > > > leaving > > > > > > > > > > > > out > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > partitioning by udf in the > > > > > > > > > > > > > > > > > > > > > > > > > first version if you > > > > > > > > > > > > > > > > > > > > > > > > > still > > > > > > > > > have > > > > > > > > > > > > some > > > > > > > > > > > > > > > > > > > > > > > > concerns. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > As for your question Danny. > > > > > > > > > > > > > > > > > > > > > > > > > It depends which > > > > > > > > partitioning > > > > > > > > > > > > > > strategy > > > > > > > > > > > > > > > > > > you > > > > > > > > > > > > > > > > > > > > > > > > use. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For the HASH partitioning > > > > > > > > > > > > > > > > > > > > > > > > > strategy I thought it > > > > > > > > > > > > > > > > > > > > > > > > > would > > > > > > > > > work > > > > > > > > > > as > > > > > > > > > > > > > > you > > > > > > > > > > > > > > > > > > > > > > > > > explained. It would be N = > > > > > > > > > > > > > > > > > > > > > > > > > MOD(expr, num). I am not > > > > > > > > sure > > > > > > > > > > > > > > though if > > > > > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > > > > > > > > > > should introduce the > > > > > > > > > > > > > > > > > > > > > > > > > PARTITIONS clause. Usually > > > > > > > > > > > > > > > > > > > > > > > > > Flink > > > > > > > > > does > > > > > > > > > > > > not > > > > > > > > > > > > > > own > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > data and the partitions are > > > > > > > > > > > > > > > > > > > > > > > > > already an intrinsic > > > > > > > > property > > > > > > > > > > of > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > underlying source e.g. for > > > > > > > > > > > > > > > > > > > > > > > > > kafka we do not create > > > > > > > > topics, > > > > > > > > > > but > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > > > just > > > > > > > > > > > > > > > > > > > > > > > > > describe pre-existing > > > > > > > > > > > > > > > > > > > > > > > > > pre-partitioned topic. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 4. timestamp vs > > > > > > > > > > > > > > > > > > > > > > > > > timestamp.field vs > > > > > > > > > > > > > > > > > > > > > > > > > connector.field vs > > > > > > > > ... > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am fine with changing it to > > > > > > > > > > > > > > > > > > > > > > > > > timestamp.field to be > > > > > > > > > > > > consistent > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > > > > > > > > > > other value.fields and > > > > > > > > > > > > > > > > > > > > > > > > > key.fields. Actually that > > > > > > > > > > > > > > > > > > > > > > > > > was > > > > > > > > also > > > > > > > > > > my > > > > > > > > > > > > > > > > > > initial > > > > > > > > > > > > > > > > > > > > > > > > > proposal in a first draft I > > > > > > > > > > > > > > > > > > > > > > > > > prepared. I changed it > > > > > > > > > > afterwards > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > shorten > > > > > > > > > > > > > > > > > > > > > > > > > the key. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Dawid > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 03/03/2020 09:00, Danny > > > > > > > > > > > > > > > > > > > > > > > > > Chan wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Dawid for bringing > > > > > > > > > > > > > > > > > > > > > > > > > > up this discussion, I > > > > think > > > > > > > > it > > > > > > > > > > is > > > > > > > > > > > > a > > > > > > > > > > > > > > > > > > useful > > > > > > > > > > > > > > > > > > > > > > > > > feature ~ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > About how the metadata > > > > > > > > > > > > > > > > > > > > > > > > > > outputs from source > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think it is completely > > > > > > > > > > > > > > > > > > > > > > > > > > orthogonal, computed > > > > > > > > > > > > > > > > > > > > > > > > > > column > > > > > > > > > push > > > > > > > > > > > > > > down is > > > > > > > > > > > > > > > > > > > > > > > > > another topic, this should > > > > > > > > > > > > > > > > > > > > > > > > > not be a blocker but a > > > > > > > > > > promotion, > > > > > > > > > > > > > > if we > > > > > > > > > > > > > > > > > > do > > > > > > > > > > > > > > > > > > > > > > > > not > > > > > > > > > > > > > > > > > > > > > > > > > have any filters on the > > > > > > > > > > > > > > > > > > > > > > > > > computed column, there > > > > > > > > > > > > > > > > > > > > > > > > > is no > > > > > > > > need > > > > > > > > > > to > > > > > > > > > > > > > > do any > > > > > > > > > > > > > > > > > > > > > > > > > pushings; the source node > > > > > > > > > > > > > > > > > > > > > > > > > just emit the complete > > > > record > > > > > > > > > > with > > > > > > > > > > > > > > full > > > > > > > > > > > > > > > > > > > > > > > > metadata > > > > > > > > > > > > > > > > > > > > > > > > > with the declared physical > > > > > > > > > > > > > > > > > > > > > > > > > schema, then when > > > > generating > > > > > > > > > the > > > > > > > > > > > > > > virtual > > > > > > > > > > > > > > > > > > > > > > > > > columns, we would extract the > > > > > > > > > > > > > > > > > > > > > > > > > metadata info and > > > > output > > > > > > > > as > > > > > > > > > > > > full > > > > > > > > > > > > > > > > > > > > > > > > columns(with > > > > > > > > > > > > > > > > > > > > > > > > > full schema). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > About the type of metadata > > > > > > > > > > > > > > > > > > > > > > > > > > column > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Personally i prefer > > > > > > > > > > > > > > > > > > > > > > > > > > explicit type instead of > > > > > > > > > > > > > > > > > > > > > > > > > > CAST, > > > > > > > > they > > > > > > > > > > are > > > > > > > > > > > > > > > > > > symantic > > > > > > > > > > > > > > > > > > > > > > > > > equivalent though, explict > > > > > > > > > > > > > > > > > > > > > > > > > type is more > > > > > > > > straight-forward > > > > > > > > > > and > > > > > > > > > > > > > > we can > > > > > > > > > > > > > > > > > > > > > > > > declare > > > > > > > > > > > > > > > > > > > > > > > > > the nullable attribute there. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > About option A: > > > > > > > > > > > > > > > > > > > > > > > > > > partitioning based on > > > > > > > > > > > > > > > > > > > > > > > > > > acomputed > > > > column > > > > > > > > > VS > > > > > > > > > > > > > > option > > > > > > > > > > > > > > > > > > B: > > > > > > > > > > > > > > > > > > > > > > > > > partitioning with just a > > > > > > > > > > > > > > > > > > > > > > > > > function > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > From the FLIP, it > > > > > > > > > > > > > > > > > > > > > > > > > > seems that B's > > > > > > > > > > > > > > > > > > > > > > > > > > partitioning is > > > > > > > > > just > > > > > > > > > > a > > > > > > > > > > > > > > strategy > > > > > > > > > > > > > > > > > > when > > > > > > > > > > > > > > > > > > > > > > > > > writing data, the partiton > > > > > > > > > > > > > > > > > > > > > > > > > column is not > > > > > > > > > > > > > > > > > > > > > > > > > included in > > > > > > > > the > > > > > > > > > > > > table > > > > > > > > > > > > > > > > > > schema, > > > > > > > > > > > > > > > > > > > > > > > > so > > > > > > > > > > > > > > > > > > > > > > > > > it's just useless when > > > > > > > > > > > > > > > > > > > > > > > > > reading from that. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > - Compared to A, we do not > > > > > > > > > > > > > > > > > > > > > > > > > > need to generate the > > > > > > > > > partition > > > > > > > > > > > > > > column > > > > > > > > > > > > > > > > > > when > > > > > > > > > > > > > > > > > > > > > > > > > selecting from the table(but > > > > > > > > > > > > > > > > > > > > > > > > > insert into) > > > > > > > > > > > > > > > > > > > > > > > > > > - For A we can also mark > > > > > > > > > > > > > > > > > > > > > > > > > > the column as STORED when > > > > we > > > > > > > > > want > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > persist > > > > > > > > > > > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > So in my opition they are > > > > > > > > > > > > > > > > > > > > > > > > > > orthogonal, we can > > > > > > > > > > > > > > > > > > > > > > > > > > support > > > > > > > > > > both, i > > > > > > > > > > > > > > saw > > > > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > > > > > > > > MySQL/Oracle[1][2] would > > > > > > > > > > > > > > > > > > > > > > > > > suggest to also define the > > > > > > > > > > > > PARTITIONS > > > > > > > > > > > > > > > > > > num, and > > > > > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > partitions are managed under > > > > > > > > > > > > > > > > > > > > > > > > > a "tablenamespace", > > > > > > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > partition > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > > which > > > > > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > record is stored is partition > > > > > > > > > > > > > > > > > > > > > > > > > number N, where N = > > > > > > > > > MOD(expr, > > > > > > > > > > > > > > num), > > > > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > > > > > > your > > > > > > > > > > > > > > > > > > > > > > > > > design, which partiton the > > > > > > > > > > > > > > > > > > > > > > > > > record would persist ? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html > > > > > > > > > > > > > > > > > > > > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > > > > > Danny Chan > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2020年3月2日 +0800 > > > > > > > > > > > > > > > > > > > > > > > > > > PM6:16,Dawid Wysakowicz < > > > > > > > > > > > > > > dwysakow...@apache.org > > > > > > > > > > > > > > > > > > > > > > > > > ,写道: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Jark, > > > > > > > > > > > > > > > > > > > > > > > > > > > Ad. 2 I added a section > > > > > > > > > > > > > > > > > > > > > > > > > > > to discuss relation to > > > > > > > > FLIP-63 > > > > > > > > > > > > > > > > > > > > > > > > > > > Ad. 3 Yes, I also tried > > > > > > > > > > > > > > > > > > > > > > > > > > > to somewhat keep > > > > > > > > > > > > > > > > > > > > > > > > > > > hierarchy > > > > of > > > > > > > > > > > > > > properties. > > > > > > > > > > > > > > > > > > > > > > > > > Therefore you have the > > > > > > > > > > > > > > > > > > > > > > > > > key.format.type. > > > > > > > > > > > > > > > > > > > > > > > > > > > I also considered exactly > > > > > > > > > > > > > > > > > > > > > > > > > > > what you are suggesting > > > > > > > > > > > > (prefixing > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > > > > > > > > > > connector or kafka). I > > > > > > > > > > > > > > > > > > > > > > > > > should've put that into an > > > > > > > > > > > > > > Option/Rejected > > > > > > > > > > > > > > > > > > > > > > > > > alternatives. > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree timestamp, key.*, > > > > > > > > > > > > > > > > > > > > > > > > > > > value.* are connector > > > > > > > > > > properties. > > > > > > > > > > > > > > Why I > > > > > > > > > > > > > > > > > > > > > > > > > wanted to suggest not adding > > > > > > > > > > > > > > > > > > > > > > > > > that prefix in the > > > > > > > > > > > > > > > > > > > > > > > > > first > > > > > > > > > > version > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > > > > > > > > actually all the properties > > > > > > > > > > > > > > > > > > > > > > > > > in the WITH section are > > > > > > > > > > connector > > > > > > > > > > > > > > > > > > > > > > > > properties. > > > > > > > > > > > > > > > > > > > > > > > > > Even format is in the end a > > > > > > > > > > > > > > > > > > > > > > > > > connector property as > > > > some > > > > > > > > of > > > > > > > > > > the > > > > > > > > > > > > > > > > > > sources > > > > > > > > > > > > > > > > > > > > > > > > might > > > > > > > > > > > > > > > > > > > > > > > > > not have a format, imo. The > > > > > > > > > > > > > > > > > > > > > > > > > benefit of not > > > > > > > > > > > > > > > > > > > > > > > > > adding the > > > > > > > > > > prefix > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > > that it > > > > > > > > > > > > > > > > > > > > > > > > > makes the keys a bit shorter. > > > > > > > > > > > > > > > > > > > > > > > > > Imagine prefixing all > > > > the > > > > > > > > > > > > > > properties > > > > > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > > > > > > > > > > connector (or if we go with > > > > > > > > > > > > > > > > > > > > > > > > > FLINK-12557: > > > > > > > > elasticsearch): > > > > > > > > > > > > > > > > > > > > > > > > > > > elasticsearch.key.format.type: > > > > > > > > > > > > > > > > > > > > > > > > > > > csv > > > > > > > > > > > > > > > > > > > > > > > > > > > elasticsearch.key.format.field: > > > > > > > > > > > > > > > > > > > > > > > > > > > .... > > > > > > > > > > > > > > > > > > > > > > > > > > > elasticsearch.key.format.delimiter: > > > > > > > > > > > > > > > > > > > > > > > > > > > .... > > > > > > > > > > > > > > > > > > > > > > > > > > > elasticsearch.key.format.*: > > > > > > > > > > > > > > > > > > > > > > > > > > > .... > > > > > > > > > > > > > > > > > > > > > > > > > > > I am fine with doing it > > > > > > > > > > > > > > > > > > > > > > > > > > > though if this is a > > > > preferred > > > > > > > > > > > > > > approach > > > > > > > > > > > > > > > > > > in the > > > > > > > > > > > > > > > > > > > > > > > > > community. > > > > > > > > > > > > > > > > > > > > > > > > > > > Ad in-line comments: > > > > > > > > > > > > > > > > > > > > > > > > > > > I forgot to update the > > > > > > > > > > > > > > > > > > > > > > > > > > > `value.fields.include` > > > > > > > > property. > > > > > > > > > > It > > > > > > > > > > > > > > > > > > should be > > > > > > > > > > > > > > > > > > > > > > > > > value.fields-include. Which I > > > > > > > > > > > > > > > > > > > > > > > > > think you also > > > > suggested > > > > > > > > in > > > > > > > > > > the > > > > > > > > > > > > > > > > > > comment, > > > > > > > > > > > > > > > > > > > > > > > > > right? > > > > > > > > > > > > > > > > > > > > > > > > > > > As for the cast vs > > > > > > > > > > > > > > > > > > > > > > > > > > > declaring output type of > > > > computed > > > > > > > > > > > > column. > > > > > > > > > > > > > > I > > > > > > > > > > > > > > > > > > think > > > > > > > > > > > > > > > > > > > > > > > > > it's better not to use CAST, > > > > > > > > > > > > > > > > > > > > > > > > > but declare a type > > > > > > > > > > > > > > > > > > > > > > > > > of an > > > > > > > > > > > > > > expression > > > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > > > > > > later > > > > > > > > > > > > > > > > > > > > > > > > > on infer the output type of > > > > > > > > > > > > > > > > > > > > > > > > > SYSTEM_METADATA. The > > > > reason > > > > > > > > > is > > > > > > > > > > I > > > > > > > > > > > > > > think > > > > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > > > > > > > > way > > > > > > > > > > > > > > > > > > > > > > > > > it will be easier to > > > > > > > > > > > > > > > > > > > > > > > > > implement e.g. filter push > > > > > > > > > > > > > > > > > > > > > > > > > downs > > > > > > > > > when > > > > > > > > > > > > > > working > > > > > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > native types of the source, > > > > > > > > > > > > > > > > > > > > > > > > > e.g. in case of Kafka's > > > > > > > > > > offset, i > > > > > > > > > > > > > > > > > > think it's > > > > > > > > > > > > > > > > > > > > > > > > > better to pushdown long > > > > > > > > > > > > > > > > > > > > > > > > > rather than string. This > > > > could > > > > > > > > > let > > > > > > > > > > us > > > > > > > > > > > > > > push > > > > > > > > > > > > > > > > > > > > > > > > > expression like e.g. offset > > > > > > > > > > > > > > > > > > > > > > > > > > 12345 & offset < > > > > > > > > > > > > > > > > > > > > > > > > > 59382. > > > > > > > > > > > > > > Otherwise we > > > > > > > > > > > > > > > > > > would > > > > > > > > > > > > > > > > > > > > > > > > > have to push down > > > > > > > > > > > > > > > > > > > > > > > > > cast(offset, long) > 12345 && > > > > > > > > > > cast(offset, > > > > > > > > > > > > > > long) > > > > > > > > > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > 59382. > > > > > > > > > > > > > > > > > > > > > > > > > Moreover I think we need to > > > > > > > > > > > > > > > > > > > > > > > > > introduce the type for > > > > > > > > > computed > > > > > > > > > > > > > > columns > > > > > > > > > > > > > > > > > > > > > > > > anyway > > > > > > > > > > > > > > > > > > > > > > > > > to support functions that > > > > > > > > > > > > > > > > > > > > > > > > > infer output type > > > > > > > > > > > > > > > > > > > > > > > > > based on > > > > > > > > > > expected > > > > > > > > > > > > > > > > > > return > > > > > > > > > > > > > > > > > > > > > > > > type. > > > > > > > > > > > > > > > > > > > > > > > > > > > As for the computed > > > > > > > > > > > > > > > > > > > > > > > > > > > column push down. Yes, > > > > > > > > > > SYSTEM_METADATA > > > > > > > > > > > > > > would > > > > > > > > > > > > > > > > > > have > > > > > > > > > > > > > > > > > > > > > > > > > to be pushed down to the > > > > > > > > > > > > > > > > > > > > > > > > > source. If it is not > > > > possible > > > > > > > > > the > > > > > > > > > > > > > > planner > > > > > > > > > > > > > > > > > > > > > > > > should > > > > > > > > > > > > > > > > > > > > > > > > > fail. As far as I know > > > > > > > > > > > > > > > > > > > > > > > > > computed columns push down > > > > will > > > > > > > > be > > > > > > > > > > > > part > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > > source > > > > > > > > > > > > > > > > > > > > > > > > > rework, won't it? ;) > > > > > > > > > > > > > > > > > > > > > > > > > > > As for the persisted > > > > > > > > > > > > > > > > > > > > > > > > > > > computed column. I think > > > > > > > > > > > > > > > > > > > > > > > > > > > it is > > > > > > > > > > > > > > completely > > > > > > > > > > > > > > > > > > > > > > > > > orthogonal. In my current > > > > > > > > > > > > > > > > > > > > > > > > > proposal you can also > > > > > > > > partition > > > > > > > > > > by > > > > > > > > > > > > a > > > > > > > > > > > > > > > > > > computed > > > > > > > > > > > > > > > > > > > > > > > > > column. The difference > > > > > > > > > > > > > > > > > > > > > > > > > between using a udf in > > > > > > > > partitioned > > > > > > > > > > by > > > > > > > > > > > > vs > > > > > > > > > > > > > > > > > > > > > > > > partitioned > > > > > > > > > > > > > > > > > > > > > > > > > by a computed column is that > > > > > > > > > > > > > > > > > > > > > > > > > when you partition > > > > > > > > > > > > > > > > > > > > > > > > > by a > > > > > > > > > > computed > > > > > > > > > > > > > > > > > > column > > > > > > > > > > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > > > > > > > > > column must be also computed > > > > > > > > > > > > > > > > > > > > > > > > > when reading the > > > > > > > > > > > > > > > > > > > > > > > > > table. > > > > If > > > > > > > > > you > > > > > > > > > > > > > > use a > > > > > > > > > > > > > > > > > > udf in > > > > > > > > > > > > > > > > > > > > > > > > > the partitioned by, the > > > > > > > > > > > > > > > > > > > > > > > > > expression is computed only > > > > > > > > when > > > > > > > > > > > > > > inserting > > > > > > > > > > > > > > > > > > into > > > > > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > table. > > > > > > > > > > > > > > > > > > > > > > > > > > > Hope this answers some of > > > > > > > > > > > > > > > > > > > > > > > > > > > your questions. Looking > > > > > > > > > forward > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > further > > > > > > > > > > > > > > > > > > > > > > > > > suggestions. > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > > > > > > Dawid > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 02/03/2020 05:18, Jark > > > > > > > > > > > > > > > > > > > > > > > > > > > Wu wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Dawid for > > > > > > > > > > > > > > > > > > > > > > > > > > > > starting such a great > > > > > > > > > > > > > > > > > > > > > > > > > > > > discussion. > > > > > > > > > > Reaing > > > > > > > > > > > > > > > > > > metadata > > > > > > > > > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > > > > > > > > > > key-part information > > > > > > > > > > > > > > > > > > > > > > > > > > > > from source is an > > > > > > > > > > > > > > > > > > > > > > > > > > > > important > > > > > > > > > feature > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > > > > > > streaming > > > > > > > > > > > > > > > > > > > > > > > > > > > > users. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In general, I agree > > > > > > > > > > > > > > > > > > > > > > > > > > > > with the proposal of the > > > > > > > > > > > > > > > > > > > > > > > > > > > > FLIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > I will leave my > > > > > > > > > > > > > > > > > > > > > > > > > > > > thoughts and comments > > > > > > > > > > > > > > > > > > > > > > > > > > > > here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1) +1 to use connector > > > > > > > > > > > > > > > > > > > > > > > > > > > > properties instead of > > > > > > > > > introducing > > > > > > > > > > > > > > HEADER > > > > > > > > > > > > > > > > > > > > > > > > > keyword as > > > > > > > > > > > > > > > > > > > > > > > > > > > > the reason you > > > > > > > > > > > > > > > > > > > > > > > > > > > > mentioned in the FLIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2) we already > > > > > > > > > > > > > > > > > > > > > > > > > > > > introduced PARTITIONED > > > > > > > > > > > > > > > > > > > > > > > > > > > > BY in > > > > FLIP-63. > > > > > > > > > > Maybe > > > > > > > > > > > > we > > > > > > > > > > > > > > > > > > should > > > > > > > > > > > > > > > > > > > > > > > > > add a > > > > > > > > > > > > > > > > > > > > > > > > > > > > section to explain > > > > > > > > > > > > > > > > > > > > > > > > > > > > what's the relationship > > > > > > > > > > > > > > > > > > > > > > > > > > > > between > > > > > > > > > them. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Do their concepts > > > > > > > > > > > > > > > > > > > > > > > > > > > > conflict? Could INSERT > > > > > > > > > > > > > > > > > > > > > > > > > > > > PARTITION > > > > > > > > be > > > > > > > > > > used > > > > > > > > > > > > > > on > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > > > > PARTITIONED table in > > > > > > > > > > > > > > > > > > > > > > > > > > > > this FLIP? > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3) Currently, > > > > > > > > > > > > > > > > > > > > > > > > > > > > properties are > > > > > > > > > > > > > > > > > > > > > > > > > > > > hierarchical in > > > > > > > > > > > > > > > > > > > > > > > > > > > > Flink > > > > > > > > > SQL. > > > > > > > > > > > > > > Shall we > > > > > > > > > > > > > > > > > > > > > > > > make > > > > > > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > > > > > new introduced > > > > > > > > > > > > > > > > > > > > > > > > > > > > properties more > > > > > > > > > > > > > > > > > > > > > > > > > > > > hierarchical? > > > > > > > > > > > > > > > > > > > > > > > > > > > > For example, > > > > > > > > > > > > > > > > > > > > > > > > > > > > "timestamp" => > > > > > > > > > > > > > > > > > > > > > > > > > > > > "connector.timestamp"? > > > > > > > > > > > > > > (actually, I > > > > > > > > > > > > > > > > > > > > > > > > > prefer > > > > > > > > > > > > > > > > > > > > > > > > > > > > "kafka.timestamp" which > > > > > > > > > > > > > > > > > > > > > > > > > > > > is another > > > > > > > > > > > > > > > > > > > > > > > > > > > > improvement for > > > > > > > > > > > > > > properties > > > > > > > > > > > > > > > > > > > > > > > > > FLINK-12557) > > > > > > > > > > > > > > > > > > > > > > > > > > > > A single "timestamp" in > > > > > > > > > > > > > > > > > > > > > > > > > > > > properties may mislead > > > > users > > > > > > > > > > that > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > field > > > > > > > > > > > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > > > > > > > > > > > > a rowtime attribute. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I also left some minor > > > > > > > > > > > > > > > > > > > > > > > > > > > > comments in the FLIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jark > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sun, 1 Mar 2020 at > > > > > > > > > > > > > > > > > > > > > > > > > > > > 22:30, Dawid Wysakowicz > > > > > > > > > > > > > > > > > > > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > dwysakow...@apache.org> > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I would like to > > > > > > > > > > > > > > > > > > > > > > > > > > > > > propose an > > > > > > > > > > > > > > > > > > > > > > > > > > > > > improvement that > > > > > > > > > > > > > > > > > > > > > > > > > > > > > would > > > > > > > > > > enable > > > > > > > > > > > > > > > > > > reading > > > > > > > > > > > > > > > > > > > > > > > > table > > > > > > > > > > > > > > > > > > > > > > > > > > > > > columns from > > > > > > > > > > > > > > > > > > > > > > > > > > > > > different parts of > > > > > > > > > > > > > > > > > > > > > > > > > > > > > source records. > > > > > > > > > Besides > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > main > > > > > > > > > > > > > > > > > > > > > > > > > payload > > > > > > > > > > > > > > > > > > > > > > > > > > > > > majority (if not all > > > > > > > > > > > > > > > > > > > > > > > > > > > > > of the sources) expose > > > > > > > > > additional > > > > > > > > > > > > > > > > > > > > > > > > information. It > > > > > > > > > > > > > > > > > > > > > > > > > > > > > can be simply a > > > > > > > > > > > > > > > > > > > > > > > > > > > > > read-only metadata > > > > > > > > > > > > > > > > > > > > > > > > > > > > > such as > > > > offset, > > > > > > > > > > > > > > ingestion > > > > > > > > > > > > > > > > > > time > > > > > > > > > > > > > > > > > > > > > > > > or a > > > > > > > > > > > > > > > > > > > > > > > > > > > > > read and write parts > > > > > > > > > > > > > > > > > > > > > > > > > > > > > of the record that > > > > > > > > > > > > > > > > > > > > > > > > > > > > > contain > > > > > > > > data > > > > > > > > > > but > > > > > > > > > > > > > > > > > > > > > > > > additionally > > > > > > > > > > > > > > > > > > > > > > > > > > > > > serve different > > > > > > > > > > > > > > > > > > > > > > > > > > > > > purposes > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (partitioning, > > > > compaction > > > > > > > > > > etc.), > > > > > > > > > > > > > > e.g. > > > > > > > > > > > > > > > > > > key > > > > > > > > > > > > > > > > > > > > > > > > or > > > > > > > > > > > > > > > > > > > > > > > > > > > > > timestamp in Kafka. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We should make it > > > > > > > > > > > > > > > > > > > > > > > > > > > > > possible to read and > > > > > > > > > > > > > > > > > > > > > > > > > > > > > write > > > > > > > > > > > > > > > > > > > > > > > > > > > > > data > > > > > > > > > from > > > > > > > > > > > > all > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > > those > > > > > > > > > > > > > > > > > > > > > > > > > > > > > locations. In this > > > > > > > > > > > > > > > > > > > > > > > > > > > > > proposal I discuss > > > > > > > > > > > > > > > > > > > > > > > > > > > > > reading > > > > > > > > > > > > partitioning > > > > > > > > > > > > > > > > > > data, > > > > > > > > > > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > > > > > > > > > > > completeness this > > > > > > > > > > > > > > > > > > > > > > > > > > > > > proposal discusses > > > > > > > > > > > > > > > > > > > > > > > > > > > > > also the > > > > > > > > > > > > partitioning > > > > > > > > > > > > > > when > > > > > > > > > > > > > > > > > > > > > > > > > writing > > > > > > > > > > > > > > > > > > > > > > > > > > > > > data out. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am looking forward > > > > > > > > > > > > > > > > > > > > > > > > > > > > > to your comments. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > You can access the > > > > > > > > > > > > > > > > > > > > > > > > > > > > > FLIP here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Dawid > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >