Thanks for driving this Timo, +1 for voting ~

Best,
Danny Chan
在 2020年9月10日 +0800 PM3:47,Timo Walther <twal...@apache.org>,写道:
> Thanks everyone for this healthy discussion. I updated the FLIP with the
> outcome. I think the result is very powerful but also very easy to
> declare. Thanks for all the contributions.
>
> If there are no objections, I would continue with a voting.
>
> What do you think?
>
> Regards,
> Timo
>
>
> On 09.09.20 16:52, Timo Walther wrote:
> > "If virtual by default, when a user types "timestamp int" ==> persisted
> > column, then adds a "metadata" after that ==> virtual column, then adds
> > a "persisted" after that ==> persisted column."
> >
> > Thanks for this nice mental model explanation, Jark. This makes total
> > sense to me. Also making the the most common case as short at just
> > adding `METADATA` is a very good idea. Thanks, Danny!
> >
> > Let me update the FLIP again with all these ideas.
> >
> > Regards,
> > Timo
> >
> >
> > On 09.09.20 15:03, Jark Wu wrote:
> > > I'm also +1 to Danny's proposal: timestamp INT METADATA [FROM
> > > 'my-timestamp-field'] [VIRTUAL]
> > > Especially I like the shortcut: timestamp INT METADATA, this makes the
> > > most
> > > common case to be supported in the simplest way.
> > >
> > > I also think the default should be "PERSISTED", so VIRTUAL is optional
> > > when
> > > you are accessing a read-only metadata. Because:
> > > 1. The "timestamp INT METADATA" should be a normal column, because
> > > "METADATA" is just a modifier to indicate it is from metadata, a normal
> > > column should be persisted.
> > >      If virtual by default, when a user types "timestamp int" ==>
> > > persisted
> > > column, then adds a "metadata" after that ==> virtual column, then adds a
> > > "persisted" after that ==> persisted column.
> > >      I think this looks reversed several times and makes users confused.
> > > Physical fields are also prefixed with "fieldName TYPE", so "timestamp
> > > INT
> > > METADATA" is persisted is very straightforward.
> > > 2. From the collected user question [1], we can see that "timestamp"
> > > is the
> > > most common use case. "timestamp" is a read-write metadata. Persisted by
> > > default doesn't break the reading behavior.
> > >
> > > Best,
> > > Jark
> > >
> > > [1]: https://issues.apache.org/jira/browse/FLINK-15869
> > >
> > > On Wed, 9 Sep 2020 at 20:56, Leonard Xu <xbjt...@gmail.com> wrote:
> > >
> > > > Thanks @Dawid for the nice summary, I think you catch all opinions of
> > > > the
> > > > long discussion well.
> > > >
> > > > @Danny
> > > > “ timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL]
> > > >   Note that the "FROM 'field name'" is only needed when the name
> > > > conflict
> > > >   with the declared table column name, when there are no conflicts,
> > > > we can
> > > > simplify it to
> > > >        timestamp INT METADATA"
> > > >
> > > > I really like the proposal, there is no confusion with computed
> > > > column any
> > > > more,  and it’s concise enough.
> > > >
> > > >
> > > > @Timo @Dawid
> > > > “We use `SYSTEM_TIME` for temporal tables. I think prefixing with SYSTEM
> > > > makes it clearer that it comes magically from the system.”
> > > > “As for the issue of shortening the SYSTEM_METADATA to METADATA. Here I
> > > > very much prefer the SYSTEM_ prefix.”
> > > >
> > > > I think `SYSTEM_TIME` is different with `SYSTEM_METADATA ` a lot,
> > > > First of all,  the word `TIME` has broad meanings but the word
> > > > `METADATA `
> > > > not,  `METADATA ` has specific meaning,
> > > > Secondly, `FOR SYSTEM_TIME AS OF` exists in SQL standard but
> > > > `SYSTEM_METADATA ` not.
> > > > Personally, I like more simplify way,sometimes  less is more.
> > > >
> > > >
> > > > Best,
> > > > Leonard
> > > >
> > > >
> > > >
> > > > >
> > > > > Timo Walther <twal...@apache.org> 于2020年9月9日周三 下午6:41写道:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > "key" and "value" in the properties are a special case because they
> > > > > > need
> > > > > > to configure a format. So key and value are more than just metadata.
> > > > > > Jark's example for setting a timestamp would work but as the FLIP
> > > > > > discusses, we have way more metadata fields like headers,
> > > > > > epoch-leader,
> > > > > > etc. Having a property for all of this metadata would mess up the 
> > > > > > WITH
> > > > > > section entirely. Furthermore, we also want to deal with metadata 
> > > > > > from
> > > > > > the formats. Solving this through properties as well would further
> > > > > > complicate the property design.
> > > > > >
> > > > > > Personally, I still like the computed column design more because it
> > > > > > allows to have full flexibility to compute the final column:
> > > > > >
> > > > > > timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS
> > > > TIMESTAMP(3)))
> > > > > >
> > > > > > Instead of having a helper column and a real column in the table:
> > > > > >
> > > > > > helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3))
> > > > > > realTimestamp AS adjustTimestamp(helperTimestamp)
> > > > > >
> > > > > > But I see that the discussion leans towards:
> > > > > >
> > > > > > timestamp INT SYSTEM_METADATA("ts")
> > > > > >
> > > > > > Which is fine with me. It is the shortest solution, because we don't
> > > > > > need additional CAST. We can discuss the syntax, so that confusion
> > > > > > with
> > > > > > computed columns can be avoided.
> > > > > >
> > > > > > timestamp INT USING SYSTEM_METADATA("ts")
> > > > > > timestamp INT FROM SYSTEM_METADATA("ts")
> > > > > > timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED
> > > > > >
> > > > > > We use `SYSTEM_TIME` for temporal tables. I think prefixing with
> > > > > > SYSTEM
> > > > > > makes it clearer that it comes magically from the system.
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > Regards,
> > > > > > Timo
> > > > > >
> > > > > >
> > > > > >
> > > > > > On 09.09.20 11:41, Jark Wu wrote:
> > > > > > > Hi Danny,
> > > > > > >
> > > > > > > This is not Oracle and MySQL computed column syntax, because 
> > > > > > > there is
> > > > no
> > > > > > > "AS" after the type.
> > > > > > >
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > If we want to use "offset INT SYSTEM_METADATA("offset")", then I
> > > > > > > think
> > > > we
> > > > > > > must further discuss about "PERSISED" or "VIRTUAL" keyword for
> > > > query-sink
> > > > > > > schema problem.
> > > > > > > Personally, I think we can use a shorter keyword "METADATA" for
> > > > > > > "SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a system
> > > > > > function
> > > > > > > and confuse users this looks like a computed column.
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Jark
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, 9 Sep 2020 at 17:23, Danny Chan <danny0...@apache.org> 
> > > > > > > wrote:
> > > > > > >
> > > > > > > > "offset INT SYSTEM_METADATA("offset")"
> > > > > > > >
> > > > > > > > This is actually Oracle or MySQL style computed column syntax.
> > > > > > > >
> > > > > > > > "You are right that one could argue that "timestamp", "headers" 
> > > > > > > > are
> > > > > > > > something like "key" and "value""
> > > > > > > >
> > > > > > > > I have the same feeling, both key value and headers timestamp 
> > > > > > > > are
> > > > *real*
> > > > > > > > data
> > > > > > > > stored in the consumed record, they are not computed or 
> > > > > > > > generated.
> > > > > > > >
> > > > > > > > "Trying to solve everything via properties sounds rather like a 
> > > > > > > > hack
> > > > to
> > > > > > > > me"
> > > > > > > >
> > > > > > > > Things are not that hack if we can unify the routines or the
> > > > definitions
> > > > > > > > (all from the computed column way or all from the table 
> > > > > > > > options), i
> > > > also
> > > > > > > > think that it is a hacky that we mix in 2 kinds of syntax for
> > > > different
> > > > > > > > kinds of metadata (read-only and read-write). In this FLIP, we
> > > > > > > > declare
> > > > > > the
> > > > > > > > Kafka key fields with table options but SYSTEM_METADATA for 
> > > > > > > > other
> > > > > > metadata,
> > > > > > > > that is a hacky thing or something in-consistent.
> > > > > > > >
> > > > > > > > Kurt Young <ykt...@gmail.com> 于2020年9月9日周三 下午4:48写道:
> > > > > > > >
> > > > > > > > >   I would vote for `offset INT SYSTEM_METADATA("offset")`.
> > > > > > > > >
> > > > > > > > > I don't think we can stick with the SQL standard in DDL part
> > > > > > > > > forever,
> > > > > > > > > especially as there are more and more
> > > > > > > > > requirements coming from different connectors and external 
> > > > > > > > > systems.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Kurt
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Sep 9, 2020 at 4:40 PM Timo Walther 
> > > > > > > > > <twal...@apache.org>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Jark,
> > > > > > > > > >
> > > > > > > > > > now we are back at the original design proposed by Dawid :D
> > > > > > > > > > Yes, we
> > > > > > > > > > should be cautious about adding new syntax. But the length 
> > > > > > > > > > of this
> > > > > > > > > > discussion shows that we are looking for a good long-term
> > > > > > > > > > solution.
> > > > In
> > > > > > > > > > this case I would rather vote for a deep integration into 
> > > > > > > > > > the
> > > > syntax.
> > > > > > > > > >
> > > > > > > > > > Computed columns are also not SQL standard compliant. And 
> > > > > > > > > > our
> > > > > > > > > > DDL is
> > > > > > > > > > neither, so we have some degree of freedom here.
> > > > > > > > > >
> > > > > > > > > > Trying to solve everything via properties sounds rather 
> > > > > > > > > > like a
> > > > > > > > > > hack
> > > > to
> > > > > > > > > > me. You are right that one could argue that "timestamp", 
> > > > > > > > > > "headers"
> > > > are
> > > > > > > > > > something like "key" and "value". However, mixing
> > > > > > > > > >
> > > > > > > > > > `offset AS SYSTEM_METADATA("offset")`
> > > > > > > > > >
> > > > > > > > > > and
> > > > > > > > > >
> > > > > > > > > > `'timestamp.field' = 'ts'`
> > > > > > > > > >
> > > > > > > > > > looks more confusing to users that an explicit
> > > > > > > > > >
> > > > > > > > > > `offset AS CAST(SYSTEM_METADATA("offset") AS INT)`
> > > > > > > > > >
> > > > > > > > > > or
> > > > > > > > > >
> > > > > > > > > > `offset INT SYSTEM_METADATA("offset")`
> > > > > > > > > >
> > > > > > > > > > that is symetric for both source and sink.
> > > > > > > > > >
> > > > > > > > > > What do others think?
> > > > > > > > > >
> > > > > > > > > > Regards,
> > > > > > > > > > Timo
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On 09.09.20 10:09, Jark Wu wrote:
> > > > > > > > > > > Hi everyone,
> > > > > > > > > > >
> > > > > > > > > > > I think we have a conclusion that the writable metadata 
> > > > > > > > > > > shouldn't
> > > > be
> > > > > > > > > > > defined as a computed column, but a normal column.
> > > > > > > > > > >
> > > > > > > > > > > "timestamp STRING SYSTEM_METADATA('timestamp')" is one of 
> > > > > > > > > > > the
> > > > > > > > > approaches.
> > > > > > > > > > > However, it is not SQL standard compliant, we need to be 
> > > > > > > > > > > cautious
> > > > > > > > > enough
> > > > > > > > > > > when adding new syntax.
> > > > > > > > > > > Besides, we have to introduce the `PERSISTED` or `VIRTUAL`
> > > > > > > > > > > keyword
> > > > to
> > > > > > > > > > > resolve the query-sink schema problem if it is read-only
> > > > > > > > > > > metadata.
> > > > > > > > That
> > > > > > > > > > > adds more stuff to learn for users.
> > > > > > > > > > >
> > > > > > > > > > > >  From my point of view, the "timestamp", "headers" are 
> > > > > > > > > > > > something
> > > > like
> > > > > > > > > > "key"
> > > > > > > > > > > and "value" that stores with the real data. So why not 
> > > > > > > > > > > define the
> > > > > > > > > > > "timestamp" in the same way with "key" by using a
> > > > > > > > > > > "timestamp.field"
> > > > > > > > > > > connector option?
> > > > > > > > > > > On the other side, the read-only metadata, such as 
> > > > > > > > > > > "offset",
> > > > > > > > shouldn't
> > > > > > > > > be
> > > > > > > > > > > defined as a normal column. So why not use the existing 
> > > > > > > > > > > computed
> > > > > > > > column
> > > > > > > > > > > syntax for such metadata? Then we don't have the 
> > > > > > > > > > > query-sink
> > > > > > > > > > > schema
> > > > > > > > > > problem.
> > > > > > > > > > > So here is my proposal:
> > > > > > > > > > >
> > > > > > > > > > > CREATE TABLE kafka_table (
> > > > > > > > > > >     id BIGINT,
> > > > > > > > > > >     name STRING,
> > > > > > > > > > >     col1 STRING,
> > > > > > > > > > >     col2 STRING,
> > > > > > > > > > >     ts TIMESTAMP(3) WITH LOCAL TIME ZONE,    -- ts is a 
> > > > > > > > > > > normal
> > > > field,
> > > > > > > > so
> > > > > > > > > > can
> > > > > > > > > > > be read and written.
> > > > > > > > > > >     offset AS SYSTEM_METADATA("offset")
> > > > > > > > > > > ) WITH (
> > > > > > > > > > >     'connector' = 'kafka',
> > > > > > > > > > >     'topic' = 'test-topic',
> > > > > > > > > > >     'key.fields' = 'id, name',
> > > > > > > > > > >     'key.format' = 'csv',
> > > > > > > > > > >     'value.format' = 'avro',
> > > > > > > > > > >     'timestamp.field' = 'ts'    -- define the mapping of 
> > > > > > > > > > > Kafka
> > > > > > > > timestamp
> > > > > > > > > > > );
> > > > > > > > > > >
> > > > > > > > > > > INSERT INTO kafka_table
> > > > > > > > > > > SELECT id, name, col1, col2, rowtime FROM another_table;
> > > > > > > > > > >
> > > > > > > > > > > I think this can solve all the problems without 
> > > > > > > > > > > introducing
> > > > > > > > > > > any new
> > > > > > > > > > syntax.
> > > > > > > > > > > The only minor disadvantage is that we separate the 
> > > > > > > > > > > definition
> > > > > > > > > way/syntax
> > > > > > > > > > > of read-only metadata and read-write fields.
> > > > > > > > > > > However, I don't think this is a big problem.
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Jark
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Wed, 9 Sep 2020 at 15:09, Timo Walther 
> > > > > > > > > > > <twal...@apache.org>
> > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Kurt,
> > > > > > > > > > > >
> > > > > > > > > > > > thanks for sharing your opinion. I'm totally up for not 
> > > > > > > > > > > > reusing
> > > > > > > > > computed
> > > > > > > > > > > > columns. I think Jark was a big supporter of this 
> > > > > > > > > > > > syntax, @Jark
> > > > are
> > > > > > > > > you
> > > > > > > > > > > > fine with this as well? The non-computed column 
> > > > > > > > > > > > approach was
> > > > > > > > > > > > only
> > > > a
> > > > > > > > > > > > "slightly rejected alternative".
> > > > > > > > > > > >
> > > > > > > > > > > > Furthermore, we would need to think about how such a 
> > > > > > > > > > > > new design
> > > > > > > > > > > > influences the LIKE clause though.
> > > > > > > > > > > >
> > > > > > > > > > > > However, we should still keep the `PERSISTED` keyword 
> > > > > > > > > > > > as it
> > > > > > > > influences
> > > > > > > > > > > > the query->sink schema. If you look at the list of 
> > > > > > > > > > > > metadata for
> > > > > > > > > existing
> > > > > > > > > > > > connectors and formats, we currently offer only two 
> > > > > > > > > > > > writable
> > > > > > > > metadata
> > > > > > > > > > > > fields. Otherwise, one would need to declare two tables
> > > > > > > > > > > > whenever a
> > > > > > > > > > > > metadata columns is read (one for the source, one for 
> > > > > > > > > > > > the sink).
> > > > > > > > This
> > > > > > > > > > > > can be quite inconvientient e.g. for just reading the 
> > > > > > > > > > > > topic.
> > > > > > > > > > > >
> > > > > > > > > > > > Regards,
> > > > > > > > > > > > Timo
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On 09.09.20 08:52, Kurt Young wrote:
> > > > > > > > > > > > > I also share the concern that reusing the computed 
> > > > > > > > > > > > > column
> > > > > > > > > > > > > syntax
> > > > > > > > but
> > > > > > > > > > have
> > > > > > > > > > > > > different semantics
> > > > > > > > > > > > > would confuse users a lot.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Besides, I think metadata fields are conceptually not 
> > > > > > > > > > > > > the same
> > > > with
> > > > > > > > > > > > > computed columns. The metadata
> > > > > > > > > > > > > field is a connector specific thing and it only 
> > > > > > > > > > > > > contains the
> > > > > > > > > > information
> > > > > > > > > > > > > that where does the field come
> > > > > > > > > > > > > from (during source) or where does the field need to 
> > > > > > > > > > > > > write to
> > > > > > > > (during
> > > > > > > > > > > > > sink). It's more similar with normal
> > > > > > > > > > > > > fields, with assumption that all these fields need 
> > > > > > > > > > > > > going to the
> > > > > > > > data
> > > > > > > > > > > > part.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thus I'm more lean to the rejected alternative that 
> > > > > > > > > > > > > Timo
> > > > mentioned.
> > > > > > > > > > And I
> > > > > > > > > > > > > think we don't need the
> > > > > > > > > > > > > PERSISTED keyword, SYSTEM_METADATA should be enough.
> > > > > > > > > > > > >
> > > > > > > > > > > > > During implementation, the framework only needs to 
> > > > > > > > > > > > > pass such
> > > > > > > > <field,
> > > > > > > > > > > > > metadata field> information to the
> > > > > > > > > > > > > connector, and the logic of handling such fields 
> > > > > > > > > > > > > inside the
> > > > > > > > connector
> > > > > > > > > > > > > should be straightforward.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Regarding the downside Timo mentioned:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > The disadvantage is that users cannot call UDFs or 
> > > > > > > > > > > > > > parse
> > > > > > > > timestamps.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I think this is fairly simple to solve. Since the 
> > > > > > > > > > > > > metadata
> > > > > > > > > > > > > field
> > > > > > > > > isn't
> > > > > > > > > > a
> > > > > > > > > > > > > computed column anymore, we can support
> > > > > > > > > > > > > referencing such fields in the computed column. For 
> > > > > > > > > > > > > example:
> > > > > > > > > > > > >
> > > > > > > > > > > > > CREATE TABLE kafka_table (
> > > > > > > > > > > > >         id BIGINT,
> > > > > > > > > > > > >         name STRING,
> > > > > > > > > > > > >         timestamp STRING 
> > > > > > > > > > > > > SYSTEM_METADATA("timestamp"),  //
> > > > > > > > > > > > > get the
> > > > > > > > > > > > timestamp
> > > > > > > > > > > > > field from metadata
> > > > > > > > > > > > >         ts AS to_timestamp(timestamp) // normal 
> > > > > > > > > > > > > computed
> > > > > > > > > > > > > column,
> > > > > > > > parse
> > > > > > > > > > the
> > > > > > > > > > > > > string to TIMESTAMP type by using the metadata field
> > > > > > > > > > > > > ) WITH (
> > > > > > > > > > > > >        ...
> > > > > > > > > > > > > )
> > > > > > > > > > > > >
> > > > > > > > > > > > > Best,
> > > > > > > > > > > > > Kurt
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Tue, Sep 8, 2020 at 11:57 PM Timo Walther
> > > > > > > > > > > > > <twal...@apache.org
> > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Leonard,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > the only alternative I see is that we introduce a 
> > > > > > > > > > > > > > concept that
> > > > is
> > > > > > > > > > > > > > completely different to computed columns. This is 
> > > > > > > > > > > > > > also
> > > > > > > > > > > > > > mentioned
> > > > > > > > in
> > > > > > > > > > the
> > > > > > > > > > > > > > rejected alternative section of the FLIP. Something 
> > > > > > > > > > > > > > like:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > CREATE TABLE kafka_table (
> > > > > > > > > > > > > >         id BIGINT,
> > > > > > > > > > > > > >         name STRING,
> > > > > > > > > > > > > >         timestamp INT SYSTEM_METADATA("timestamp") 
> > > > > > > > > > > > > > PERSISTED,
> > > > > > > > > > > > > >         headers MAP<STRING, BYTES> 
> > > > > > > > > > > > > > SYSTEM_METADATA("headers")
> > > > > > > > > PERSISTED
> > > > > > > > > > > > > > ) WITH (
> > > > > > > > > > > > > >        ...
> > > > > > > > > > > > > > )
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > This way we would avoid confusion at all and can 
> > > > > > > > > > > > > > easily map
> > > > > > > > columns
> > > > > > > > > to
> > > > > > > > > > > > > > metadata columns. The disadvantage is that users 
> > > > > > > > > > > > > > cannot call
> > > > UDFs
> > > > > > > > or
> > > > > > > > > > > > > > parse timestamps. This would need to be done in a 
> > > > > > > > > > > > > > real
> > > > > > > > > > > > > > computed
> > > > > > > > > > column.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I'm happy about better alternatives.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Regards,
> > > > > > > > > > > > > > Timo
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On 08.09.20 15:37, Leonard Xu wrote:
> > > > > > > > > > > > > > > HI, Timo
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks for driving this FLIP.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Sorry but I have a concern about Writing metadata 
> > > > > > > > > > > > > > > via
> > > > > > > > > > DynamicTableSink
> > > > > > > > > > > > > > section:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > CREATE TABLE kafka_table (
> > > > > > > > > > > > > > >       id BIGINT,
> > > > > > > > > > > > > > >       name STRING,
> > > > > > > > > > > > > > >       timestamp AS 
> > > > > > > > > > > > > > > CAST(SYSTEM_METADATA("timestamp") AS
> > > > > > > > > > > > > > > BIGINT)
> > > > > > > > > > > > PERSISTED,
> > > > > > > > > > > > > > >       headers AS CAST(SYSTEM_METADATA("headers") 
> > > > > > > > > > > > > > > AS
> > > > > > > > > > > > > > > MAP<STRING,
> > > > > > > > > > BYTES>)
> > > > > > > > > > > > > > PERSISTED
> > > > > > > > > > > > > > > ) WITH (
> > > > > > > > > > > > > > >       ...
> > > > > > > > > > > > > > > )
> > > > > > > > > > > > > > > An insert statement could look like:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > INSERT INTO kafka_table VALUES (
> > > > > > > > > > > > > > >       (1, "ABC", 1599133672, MAP('checksum',
> > > > > > > > computeChecksum(...)))
> > > > > > > > > > > > > > > )
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > The proposed INERT syntax does not make sense to 
> > > > > > > > > > > > > > > me,
> > > > > > > > > > > > > > > because it
> > > > > > > > > > > > contains
> > > > > > > > > > > > > > computed(generated) column.
> > > > > > > > > > > > > > > Both SQL server and Postgresql do not allow to 
> > > > > > > > > > > > > > > insert
> > > > > > > > > > > > > > > value to
> > > > > > > > > > computed
> > > > > > > > > > > > > > columns even they are persisted, this boke the 
> > > > > > > > > > > > > > generated
> > > > > > > > > > > > > > column
> > > > > > > > > > > > semantics
> > > > > > > > > > > > > > and may confuse user much.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > For SQL server computed column[1]:
> > > > > > > > > > > > > > > > column_name AS computed_column_expression [ 
> > > > > > > > > > > > > > > > PERSISTED [ NOT
> > > > > > > > NULL ]
> > > > > > > > > > > > ]...
> > > > > > > > > > > > > > > > NOTE: A computed column cannot be the target of 
> > > > > > > > > > > > > > > > an INSERT or
> > > > > > > > > UPDATE
> > > > > > > > > > > > > > statement.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > For Postgresql generated column[2]:
> > > > > > > > > > > > > > > >      height_in numeric GENERATED ALWAYS AS 
> > > > > > > > > > > > > > > > (height_cm /
> > > > > > > > > > > > > > > > 2.54)
> > > > > > > > > STORED
> > > > > > > > > > > > > > > > NOTE: A generated column cannot be written to 
> > > > > > > > > > > > > > > > directly. In
> > > > > > > > INSERT
> > > > > > > > > or
> > > > > > > > > > > > > > UPDATE commands, a value cannot be specified for a 
> > > > > > > > > > > > > > generated
> > > > > > > > column,
> > > > > > > > > > but
> > > > > > > > > > > > > > the keyword DEFAULT may be specified.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > It shouldn't be allowed to set/update value for 
> > > > > > > > > > > > > > > generated
> > > > column
> > > > > > > > > > after
> > > > > > > > > > > > > > lookup the SQL 2016:
> > > > > > > > > > > > > > > > <insert statement> ::=
> > > > > > > > > > > > > > > > INSERT INTO <insertion target> <insert columns 
> > > > > > > > > > > > > > > > and source>
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > If <contextually typed table value constructor> 
> > > > > > > > > > > > > > > > CTTVC is
> > > > > > > > > specified,
> > > > > > > > > > > > > > then every <contextually typed row
> > > > > > > > > > > > > > > > value constructor element> simply contained in 
> > > > > > > > > > > > > > > > CTTVC whose
> > > > > > > > > > > > positionally
> > > > > > > > > > > > > > corresponding <column name>
> > > > > > > > > > > > > > > > in <insert column list> references a column of 
> > > > > > > > > > > > > > > > which some
> > > > > > > > > underlying
> > > > > > > > > > > > > > column is a generated column shall
> > > > > > > > > > > > > > > > be a <default specification>.
> > > > > > > > > > > > > > > > A <default specification> specifies the default 
> > > > > > > > > > > > > > > > value of
> > > > > > > > > > > > > > > > some
> > > > > > > > > > > > > > associated item.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > [1]
> > > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
> > > >
> > > > > > > > > > > > > > <
> > > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
> > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > [2]
> > > > > > > > https://www.postgresql.org/docs/12/ddl-generated-columns.html
> > > > > > > > > <
> > > > > > > > > > > > > > https://www.postgresql.org/docs/12/ddl-generated-columns.html>
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 在 2020年9月8日,17:31,Timo Walther 
> > > > > > > > > > > > > > > > <twal...@apache.org>
> > > > > > > > > > > > > > > > 写道:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi Jark,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > according to Flink's and Calcite's casting 
> > > > > > > > > > > > > > > > definition in
> > > > [1][2]
> > > > > > > > > > > > > > TIMESTAMP WITH LOCAL TIME ZONE should be castable 
> > > > > > > > > > > > > > from BIGINT.
> > > > If
> > > > > > > > > not,
> > > > > > > > > > > > we
> > > > > > > > > > > > > > will make it possible ;-)
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I'm aware of 
> > > > > > > > > > > > > > > > DeserializationSchema.getProducedType but I
> > > > > > > > > > > > > > > > think
> > > > > > > > > that
> > > > > > > > > > > > > > this method is actually misplaced. The type should 
> > > > > > > > > > > > > > rather be
> > > > > > > > passed
> > > > > > > > > to
> > > > > > > > > > > > the
> > > > > > > > > > > > > > source itself.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > For our Kafka SQL source, we will also not use 
> > > > > > > > > > > > > > > > this method
> > > > > > > > because
> > > > > > > > > > the
> > > > > > > > > > > > > > Kafka source will add own metadata in addition to 
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > DeserializationSchema. So
> > > > > > > > > > > > > > DeserializationSchema.getProducedType
> > > > > > > > will
> > > > > > > > > > > > never
> > > > > > > > > > > > > > be read.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > For now I suggest to leave out the `DataType` 
> > > > > > > > > > > > > > > > from
> > > > > > > > > > > > > > DecodingFormat.applyReadableMetadata. Also because 
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > format's
> > > > > > > > > > physical
> > > > > > > > > > > > > > type is passed later in `createRuntimeDecoder`. If
> > > > > > > > > > > > > > necessary, it
> > > > > > > > can
> > > > > > > > > > be
> > > > > > > > > > > > > > computed manually by consumedType + metadata types. 
> > > > > > > > > > > > > > We will
> > > > > > > > provide
> > > > > > > > > a
> > > > > > > > > > > > > > metadata utility class for that.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Regards,
> > > > > > > > > > > > > > > > Timo
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > [1]
> > > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200
> > > >
> > > > > > > > > > > > > > > > [2]
> > > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254
> > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On 08.09.20 10:52, Jark Wu wrote:
> > > > > > > > > > > > > > > > > Hi Timo,
> > > > > > > > > > > > > > > > > The updated CAST SYSTEM_METADATA behavior 
> > > > > > > > > > > > > > > > > sounds good to
> > > > > > > > > > > > > > > > > me.
> > > > I
> > > > > > > > > just
> > > > > > > > > > > > > > noticed
> > > > > > > > > > > > > > > > > that a BIGINT can't be converted to 
> > > > > > > > > > > > > > > > > "TIMESTAMP(3) WITH
> > > > > > > > > > > > > > > > > LOCAL
> > > > > > > > TIME
> > > > > > > > > > > > > > ZONE".
> > > > > > > > > > > > > > > > > So maybe we need to support this, or use 
> > > > > > > > > > > > > > > > > "TIMESTAMP(3) WITH
> > > > > > > > LOCAL
> > > > > > > > > > > > TIME
> > > > > > > > > > > > > > > > > ZONE" as the defined type of Kafka timestamp? 
> > > > > > > > > > > > > > > > > I think this
> > > > > > > > makes
> > > > > > > > > > > > sense,
> > > > > > > > > > > > > > > > > because it represents the milli-seconds since 
> > > > > > > > > > > > > > > > > epoch.
> > > > > > > > > > > > > > > > > Regarding "DeserializationSchema doesn't need 
> > > > > > > > > > > > > > > > > TypeInfo", I
> > > > > > > > don't
> > > > > > > > > > > > think
> > > > > > > > > > > > > > so.
> > > > > > > > > > > > > > > > > The DeserializationSchema implements 
> > > > > > > > > > > > > > > > > ResultTypeQueryable,
> > > > thus
> > > > > > > > > the
> > > > > > > > > > > > > > > > > implementation needs to return an output 
> > > > > > > > > > > > > > > > > TypeInfo.
> > > > > > > > > > > > > > > > > Besides, FlinkKafkaConsumer also
> > > > > > > > > > > > > > > > > calls DeserializationSchema.getProducedType 
> > > > > > > > > > > > > > > > > as the produced
> > > > > > > > type
> > > > > > > > > of
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > source function [1].
> > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > Jark
> > > > > > > > > > > > > > > > > [1]:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066
> > > >
> > > > > > > > > > > > > > > > > On Tue, 8 Sep 2020 at 16:35, Timo Walther <
> > > > twal...@apache.org>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > Hi everyone,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I updated the FLIP again and hope that I 
> > > > > > > > > > > > > > > > > > could address the
> > > > > > > > > > mentioned
> > > > > > > > > > > > > > > > > > concerns.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > @Leonard: Thanks for the explanation. I 
> > > > > > > > > > > > > > > > > > wasn't aware that
> > > > > > > > ts_ms
> > > > > > > > > > and
> > > > > > > > > > > > > > > > > > source.ts_ms have different semantics. I 
> > > > > > > > > > > > > > > > > > updated the FLIP
> > > > and
> > > > > > > > > > expose
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > most commonly used properties separately. 
> > > > > > > > > > > > > > > > > > So frequently
> > > > > > > > > > > > > > > > > > used
> > > > > > > > > > > > > > properties
> > > > > > > > > > > > > > > > > > are not hidden in the MAP anymore:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > debezium-json.ingestion-timestamp
> > > > > > > > > > > > > > > > > > debezium-json.source.timestamp
> > > > > > > > > > > > > > > > > > debezium-json.source.database
> > > > > > > > > > > > > > > > > > debezium-json.source.schema
> > > > > > > > > > > > > > > > > > debezium-json.source.table
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > However, since other properties depend on 
> > > > > > > > > > > > > > > > > > the used
> > > > > > > > > > connector/vendor,
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > remaining options are stored in:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > debezium-json.source.properties
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > And accessed with:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > CAST(SYSTEM_METADATA('debezium-json.source.properties')
> > > > > > > > > > > > > > > > > >  AS
> > > > > > > > > > > > MAP<STRING,
> > > > > > > > > > > > > > > > > > STRING>)['table']
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Otherwise it is not possible to figure out 
> > > > > > > > > > > > > > > > > > the value and
> > > > > > > > column
> > > > > > > > > > type
> > > > > > > > > > > > > > > > > > during validation.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > @Jark: You convinced me in relaxing the CAST
> > > > > > > > > > > > > > > > > > constraints. I
> > > > > > > > > added
> > > > > > > > > > a
> > > > > > > > > > > > > > > > > > dedicacated sub-section to the FLIP:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > For making the use of SYSTEM_METADATA 
> > > > > > > > > > > > > > > > > > easier and avoid
> > > > nested
> > > > > > > > > > > > casting
> > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > allow explicit casting to a target data 
> > > > > > > > > > > > > > > > > > type:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > rowtime AS 
> > > > > > > > > > > > > > > > > > CAST(SYSTEM_METADATA("timestamp") AS
> > > > > > > > > > > > > > > > > > TIMESTAMP(3)
> > > > > > > > > WITH
> > > > > > > > > > > > > > LOCAL
> > > > > > > > > > > > > > > > > > TIME ZONE)
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > A connector still produces and consumes the 
> > > > > > > > > > > > > > > > > > data type
> > > > returned
> > > > > > > > > by
> > > > > > > > > > > > > > > > > > `listMetadata()`. The planner will insert 
> > > > > > > > > > > > > > > > > > necessary
> > > > > > > > > > > > > > > > > > explicit
> > > > > > > > > > casts.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > In any case, the user must provide a CAST 
> > > > > > > > > > > > > > > > > > such that the
> > > > > > > > computed
> > > > > > > > > > > > > > column
> > > > > > > > > > > > > > > > > > receives a valid data type when 
> > > > > > > > > > > > > > > > > > constructing the table
> > > > schema.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > "I don't see a reason why
> > > > > > > > `DecodingFormat#applyReadableMetadata`
> > > > > > > > > > > > > > needs a
> > > > > > > > > > > > > > > > > > DataType argument."
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Correct he DeserializationSchema doesn't 
> > > > > > > > > > > > > > > > > > need TypeInfo, it
> > > > is
> > > > > > > > > > always
> > > > > > > > > > > > > > > > > > executed locally. It is the source that 
> > > > > > > > > > > > > > > > > > needs TypeInfo for
> > > > > > > > > > > > serializing
> > > > > > > > > > > > > > > > > > the record to the next operator. And that's 
> > > > > > > > > > > > > > > > > > this is
> > > > > > > > > > > > > > > > > > what we
> > > > > > > > > > provide.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > @Danny:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > “SYSTEM_METADATA("offset")` returns the 
> > > > > > > > > > > > > > > > > > NULL type by
> > > > default”
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > We can also use some other means to 
> > > > > > > > > > > > > > > > > > represent an UNKNOWN
> > > > data
> > > > > > > > > > type.
> > > > > > > > > > > > In
> > > > > > > > > > > > > > > > > > the Flink type system, we use the NullType 
> > > > > > > > > > > > > > > > > > for it. The
> > > > > > > > important
> > > > > > > > > > > > part
> > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > that the final data type is known for the 
> > > > > > > > > > > > > > > > > > entire computed
> > > > > > > > > column.
> > > > > > > > > > > > As I
> > > > > > > > > > > > > > > > > > mentioned before, I would avoid the 
> > > > > > > > > > > > > > > > > > suggested option b)
> > > > > > > > > > > > > > > > > > that
> > > > > > > > > would
> > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > similar to your suggestion. The CAST should 
> > > > > > > > > > > > > > > > > > be enough and
> > > > > > > > allows
> > > > > > > > > > for
> > > > > > > > > > > > > > > > > > complex expressions in the computed column. 
> > > > > > > > > > > > > > > > > > Option b)
> > > > > > > > > > > > > > > > > > would
> > > > > > > > need
> > > > > > > > > > > > > > parser
> > > > > > > > > > > > > > > > > > changes.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Regards,
> > > > > > > > > > > > > > > > > > Timo
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On 08.09.20 06:21, Leonard Xu wrote:
> > > > > > > > > > > > > > > > > > > Hi, Timo
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Thanks for you explanation and update,  I 
> > > > > > > > > > > > > > > > > > > have only one
> > > > > > > > > question
> > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > the latest FLIP.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > About the MAP<STRING, STRING> DataType of 
> > > > > > > > > > > > > > > > > > > key
> > > > > > > > > > > > > > 'debezium-json.source', if
> > > > > > > > > > > > > > > > > > user want to use the table name metadata, 
> > > > > > > > > > > > > > > > > > they need to
> > > > write:
> > > > > > > > > > > > > > > > > > > tableName STRING AS
> > > > > > > > CAST(SYSTEM_METADATA('debeuim-json.source')
> > > > > > > > > > AS
> > > > > > > > > > > > > > > > > > MAP<STRING, STRING>)['table']
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > the expression is a little complex for 
> > > > > > > > > > > > > > > > > > > user, Could we
> > > > > > > > > > > > > > > > > > > only
> > > > > > > > > > support
> > > > > > > > > > > > > > > > > > necessary metas with simple DataType as 
> > > > > > > > > > > > > > > > > > following?
> > > > > > > > > > > > > > > > > > > tableName STRING AS
> > > > > > > > > > > > > > CAST(SYSTEM_METADATA('debeuim-json.source.table') AS
> > > > > > > > > > > > > > > > > > STRING),
> > > > > > > > > > > > > > > > > > > transactionTime LONG AS
> > > > > > > > > > > > > > > > > > CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms')
> > > > > > > > > > > > > > > > > >  AS
> > > > BIGINT),
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > In this way, we can simplify the 
> > > > > > > > > > > > > > > > > > > expression, the mainly
> > > > used
> > > > > > > > > > > > > > metadata in
> > > > > > > > > > > > > > > > > > changelog format may include
> > > > > > > > > > > > > > 'database','table','source.ts_ms','ts_ms' from
> > > > > > > > > > > > > > > > > > my side,
> > > > > > > > > > > > > > > > > > > maybe we could only support them at first 
> > > > > > > > > > > > > > > > > > > version.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Both Debezium and Canal have above four 
> > > > > > > > > > > > > > > > > > > metadata, and I‘m
> > > > > > > > > willing
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > take some subtasks in next development if 
> > > > > > > > > > > > > > > > > > necessary.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Debezium:
> > > > > > > > > > > > > > > > > > > {
> > > > > > > > > > > > > > > > > > >        "before": null,
> > > > > > > > > > > > > > > > > > >        "after": {  "id": 101,"name": 
> > > > > > > > > > > > > > > > > > > "scooter"},
> > > > > > > > > > > > > > > > > > >        "source": {
> > > > > > > > > > > > > > > > > > >          "db": "inventory",               
> > > > > > > > > > > > > > > > > > >    # 1.
> > > > > > > > > > > > > > > > > > > database
> > > > > > > > name
> > > > > > > > > > the
> > > > > > > > > > > > > > > > > > changelog belongs to.
> > > > > > > > > > > > > > > > > > >          "table": "products",             
> > > > > > > > > > > > > > > > > > >    # 2.
> > > > > > > > > > > > > > > > > > > table name
> > > > > > > > the
> > > > > > > > > > > > > > changelog
> > > > > > > > > > > > > > > > > > belongs to.
> > > > > > > > > > > > > > > > > > >          "ts_ms": 1589355504100,          
> > > > > > > > > > > > > > > > > > >    # 3.
> > > > > > > > > > > > > > > > > > > timestamp
> > > > > > of
> > > > > > > > > the
> > > > > > > > > > > > > > change
> > > > > > > > > > > > > > > > > > happened in database system, i.e.: 
> > > > > > > > > > > > > > > > > > transaction time in
> > > > > > > > database.
> > > > > > > > > > > > > > > > > > >          "connector": "mysql",
> > > > > > > > > > > > > > > > > > >          ….
> > > > > > > > > > > > > > > > > > >        },
> > > > > > > > > > > > > > > > > > >        "ts_ms": 1589355606100,            
> > > > > > > > > > > > > > > > > > >   # 4.
> > > > > > > > > > > > > > > > > > > timestamp
> > > > > > > > when
> > > > > > > > > > the
> > > > > > > > > > > > > > debezium
> > > > > > > > > > > > > > > > > > processed the changelog.
> > > > > > > > > > > > > > > > > > >        "op": "c",
> > > > > > > > > > > > > > > > > > >        "transaction": null
> > > > > > > > > > > > > > > > > > > }
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Canal:
> > > > > > > > > > > > > > > > > > > {
> > > > > > > > > > > > > > > > > > >        "data": [{  "id": "102", "name": 
> > > > > > > > > > > > > > > > > > > "car battery" }],
> > > > > > > > > > > > > > > > > > >        "database": "inventory",      # 1. 
> > > > > > > > > > > > > > > > > > > database
> > > > > > > > > > > > > > > > > > > name the
> > > > > > > > > > changelog
> > > > > > > > > > > > > > > > > > belongs to.
> > > > > > > > > > > > > > > > > > >        "table": "products",          # 2. 
> > > > > > > > > > > > > > > > > > > table name the
> > > > > > > > > changelog
> > > > > > > > > > > > > > belongs
> > > > > > > > > > > > > > > > > > to.
> > > > > > > > > > > > > > > > > > >        "es": 1589374013000,          # 3. 
> > > > > > > > > > > > > > > > > > > execution
> > > > > > > > > > > > > > > > > > > time of
> > > > > > > > the
> > > > > > > > > > > > change
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > database system, i.e.: transaction time in 
> > > > > > > > > > > > > > > > > > database.
> > > > > > > > > > > > > > > > > > >        "ts": 1589374013680,          # 4. 
> > > > > > > > > > > > > > > > > > > timestamp
> > > > > > > > > > > > > > > > > > > when the
> > > > > > > > > > cannal
> > > > > > > > > > > > > > > > > > processed the changelog.
> > > > > > > > > > > > > > > > > > >        "isDdl": false,
> > > > > > > > > > > > > > > > > > >        "mysqlType": {},
> > > > > > > > > > > > > > > > > > >        ....
> > > > > > > > > > > > > > > > > > > }
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Best
> > > > > > > > > > > > > > > > > > > Leonard
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > 在 2020年9月8日,11:57,Danny Chan
> > > > > > > > > > > > > > > > > > > > <yuzhao....@gmail.com> 写道:
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Thanks Timo ~
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > The FLIP was already in pretty good 
> > > > > > > > > > > > > > > > > > > > shape, I have only 2
> > > > > > > > > > questions
> > > > > > > > > > > > > > here:
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > 1. “`CAST(SYSTEM_METADATA("offset") AS 
> > > > > > > > > > > > > > > > > > > > INT)` would be a
> > > > > > > > valid
> > > > > > > > > > > > > > read-only
> > > > > > > > > > > > > > > > > > computed column for Kafka and can be 
> > > > > > > > > > > > > > > > > > extracted by the
> > > > > > > > planner.”
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > What is the pros we follow the 
> > > > > > > > > > > > > > > > > > > > SQL-SERVER syntax here ?
> > > > > > > > > Usually
> > > > > > > > > > an
> > > > > > > > > > > > > > > > > > expression return type can be inferred 
> > > > > > > > > > > > > > > > > > automatically.
> > > > > > > > > > > > > > > > > > But I
> > > > > > > > > guess
> > > > > > > > > > > > > > > > > > SQL-SERVER does not have function like 
> > > > > > > > > > > > > > > > > > SYSTEM_METADATA
> > > > > > > > > > > > > > > > > > which
> > > > > > > > > > > > actually
> > > > > > > > > > > > > > does
> > > > > > > > > > > > > > > > > > not have a specific return type.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > And why not use the Oracle or MySQL 
> > > > > > > > > > > > > > > > > > > > syntax there ?
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > column_name [datatype] [GENERATED 
> > > > > > > > > > > > > > > > > > > > ALWAYS] AS
> > > > > > > > > > > > > > > > > > > > (expression)
> > > > > > > > > > > > [VIRTUAL]
> > > > > > > > > > > > > > > > > > > > Which is more straight-forward.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > 2. “SYSTEM_METADATA("offset")` returns 
> > > > > > > > > > > > > > > > > > > > the NULL type by
> > > > > > > > > default”
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > The default type should not be NULL 
> > > > > > > > > > > > > > > > > > > > because only NULL
> > > > > > > > literal
> > > > > > > > > > does
> > > > > > > > > > > > > > > > > > that. Usually we use ANY as the type if we 
> > > > > > > > > > > > > > > > > > do not know the
> > > > > > > > > > specific
> > > > > > > > > > > > > > type in
> > > > > > > > > > > > > > > > > > the SQL context. ANY means the physical 
> > > > > > > > > > > > > > > > > > value can be any
> > > > java
> > > > > > > > > > > > object.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > [1]
> > > > > > > > > https://oracle-base.com/articles/11g/virtual-columns-11gr1
> > > > > > > > > > > > > > > > > > > > [2]
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html
> > > >
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > Danny Chan
> > > > > > > > > > > > > > > > > > > > 在 2020年9月4日 +0800 PM4:48,Timo Walther
> > > > > > > > > > > > > > > > > > > > <twal...@apache.org
> > > > > > > > > > ,写道:
> > > > > > > > > > > > > > > > > > > > > Hi everyone,
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > I completely reworked FLIP-107. It 
> > > > > > > > > > > > > > > > > > > > > now covers the full
> > > > > > > > story
> > > > > > > > > > how
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > read
> > > > > > > > > > > > > > > > > > > > > and write metadata from/to connectors 
> > > > > > > > > > > > > > > > > > > > > and formats. It
> > > > > > > > > considers
> > > > > > > > > > > > > > all of
> > > > > > > > > > > > > > > > > > > > > the latest FLIPs, namely FLIP-95, 
> > > > > > > > > > > > > > > > > > > > > FLIP-132 and
> > > > > > > > > > > > > > > > > > > > > FLIP-122.
> > > > It
> > > > > > > > > > > > > > introduces
> > > > > > > > > > > > > > > > > > > > > the concept of PERSISTED computed 
> > > > > > > > > > > > > > > > > > > > > columns and leaves
> > > > > > > > > > > > > > > > > > > > > out
> > > > > > > > > > > > > > partitioning
> > > > > > > > > > > > > > > > > > > > > for now.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Regards,
> > > > > > > > > > > > > > > > > > > > > Timo
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > On 04.03.20 09:45, Kurt Young wrote:
> > > > > > > > > > > > > > > > > > > > > > Sorry, forgot one question.
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > 4. Can we make the 
> > > > > > > > > > > > > > > > > > > > > > value.fields-include more
> > > > > > > > > > > > > > > > > > > > > > orthogonal?
> > > > > > > > > Like
> > > > > > > > > > > > one
> > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > > > > > > > specify it as "EXCEPT_KEY, 
> > > > > > > > > > > > > > > > > > > > > > EXCEPT_TIMESTAMP".
> > > > > > > > > > > > > > > > > > > > > > With current EXCEPT_KEY and 
> > > > > > > > > > > > > > > > > > > > > > EXCEPT_KEY_TIMESTAMP,
> > > > > > > > > > > > > > > > > > > > > > users
> > > > > > > > can
> > > > > > > > > > not
> > > > > > > > > > > > > > > > > > config to
> > > > > > > > > > > > > > > > > > > > > > just ignore timestamp but keep key.
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > > > Kurt
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 4, 2020 at 4:42 PM Kurt 
> > > > > > > > > > > > > > > > > > > > > > Young <
> > > > > > > > ykt...@gmail.com
> > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > Hi Dawid,
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > I have a couple of questions 
> > > > > > > > > > > > > > > > > > > > > > > around key fields,
> > > > actually
> > > > > > > > I
> > > > > > > > > > also
> > > > > > > > > > > > > > have
> > > > > > > > > > > > > > > > > > some
> > > > > > > > > > > > > > > > > > > > > > > other questions but want to be 
> > > > > > > > > > > > > > > > > > > > > > > focused on key fields
> > > > > > > > first.
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > 1. I don't fully understand the 
> > > > > > > > > > > > > > > > > > > > > > > usage of
> > > > > > > > > > > > > > > > > > > > > > > "key.fields".
> > > > Is
> > > > > > > > > > this
> > > > > > > > > > > > > > > > > > option only
> > > > > > > > > > > > > > > > > > > > > > > valid during write operation? 
> > > > > > > > > > > > > > > > > > > > > > > Because for
> > > > > > > > > > > > > > > > > > > > > > > reading, I can't imagine how such 
> > > > > > > > > > > > > > > > > > > > > > > options can be
> > > > > > > > applied. I
> > > > > > > > > > > > would
> > > > > > > > > > > > > > > > > > expect
> > > > > > > > > > > > > > > > > > > > > > > that there might be a 
> > > > > > > > > > > > > > > > > > > > > > > SYSTEM_METADATA("key")
> > > > > > > > > > > > > > > > > > > > > > > to read and assign the key to a 
> > > > > > > > > > > > > > > > > > > > > > > normal field?
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > 2. If "key.fields" is only valid 
> > > > > > > > > > > > > > > > > > > > > > > in write
> > > > > > > > > > > > > > > > > > > > > > > operation, I
> > > > > > > > want
> > > > > > > > > > to
> > > > > > > > > > > > > > > > > > propose we
> > > > > > > > > > > > > > > > > > > > > > > can simplify the options to not 
> > > > > > > > > > > > > > > > > > > > > > > introducing
> > > > > > > > key.format.type
> > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > > > > other related options. I think a 
> > > > > > > > > > > > > > > > > > > > > > > single "key.field"
> > > > (not
> > > > > > > > > > > > fields)
> > > > > > > > > > > > > > > > > > would be
> > > > > > > > > > > > > > > > > > > > > > > enough, users can use UDF to 
> > > > > > > > > > > > > > > > > > > > > > > calculate whatever key
> > > > they
> > > > > > > > > > > > > > > > > > > > > > > want before sink.
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > 3. Also I don't want to introduce 
> > > > > > > > > > > > > > > > > > > > > > > "value.format.type"
> > > > and
> > > > > > > > > > > > > > > > > > > > > > > "value.format.xxx" with the 
> > > > > > > > > > > > > > > > > > > > > > > "value" prefix. Not every
> > > > > > > > > > connector
> > > > > > > > > > > > > > has a
> > > > > > > > > > > > > > > > > > > > > > > concept
> > > > > > > > > > > > > > > > > > > > > > > of key and values. The old 
> > > > > > > > > > > > > > > > > > > > > > > parameter "format.type"
> > > > > > > > already
> > > > > > > > > > good
> > > > > > > > > > > > > > > > > > enough to
> > > > > > > > > > > > > > > > > > > > > > > use.
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > > > > Kurt
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 3, 2020 at 10:40 PM 
> > > > > > > > > > > > > > > > > > > > > > > Jark Wu <
> > > > > > > > imj...@gmail.com>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > Thanks Dawid,
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > I have two more questions.
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > SupportsMetadata
> > > > > > > > > > > > > > > > > > > > > > > > Introducing SupportsMetadata 
> > > > > > > > > > > > > > > > > > > > > > > > sounds good to me.
> > > > > > > > > > > > > > > > > > > > > > > > But I
> > > > > > > > have
> > > > > > > > > > > > some
> > > > > > > > > > > > > > > > > > questions
> > > > > > > > > > > > > > > > > > > > > > > > regarding to this interface.
> > > > > > > > > > > > > > > > > > > > > > > > 1) How do the source know what 
> > > > > > > > > > > > > > > > > > > > > > > > the expected return
> > > > type
> > > > > > > > of
> > > > > > > > > > > > each
> > > > > > > > > > > > > > > > > > metadata?
> > > > > > > > > > > > > > > > > > > > > > > > 2) Where to put the metadata 
> > > > > > > > > > > > > > > > > > > > > > > > fields? Append to the
> > > > > > > > > existing
> > > > > > > > > > > > > > physical
> > > > > > > > > > > > > > > > > > > > > > > > fields?
> > > > > > > > > > > > > > > > > > > > > > > > If yes, I would suggest to 
> > > > > > > > > > > > > > > > > > > > > > > > change the signature to
> > > > > > > > > > > > `TableSource
> > > > > > > > > > > > > > > > > > > > > > > > appendMetadataFields(String[] 
> > > > > > > > > > > > > > > > > > > > > > > > metadataNames,
> > > > DataType[]
> > > > > > > > > > > > > > > > > > metadataTypes)`
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > SYSTEM_METADATA("partition")
> > > > > > > > > > > > > > > > > > > > > > > > Can SYSTEM_METADATA() function 
> > > > > > > > > > > > > > > > > > > > > > > > be used nested in a
> > > > > > > > > computed
> > > > > > > > > > > > > > column
> > > > > > > > > > > > > > > > > > > > > > > > expression? If yes, how to 
> > > > > > > > > > > > > > > > > > > > > > > > specify the return
> > > > > > > > > > > > > > > > > > > > > > > > type of
> > > > > > > > > > > > > > > > > > SYSTEM_METADATA?
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > > > > > Jark
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > On Tue, 3 Mar 2020 at 17:06, 
> > > > > > > > > > > > > > > > > > > > > > > > Dawid Wysakowicz <
> > > > > > > > > > > > > > > > > > dwysakow...@apache.org>
> > > > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > Hi,
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > 1. I thought a bit more on 
> > > > > > > > > > > > > > > > > > > > > > > > > how the source would
> > > > > > > > > > > > > > > > > > > > > > > > > emit
> > > > > > > > the
> > > > > > > > > > > > > > columns
> > > > > > > > > > > > > > > > > > and I
> > > > > > > > > > > > > > > > > > > > > > > > > now see its not exactly the 
> > > > > > > > > > > > > > > > > > > > > > > > > same as regular
> > > > > > > > > > > > > > > > > > > > > > > > > columns.
> > > > I
> > > > > > > > > see
> > > > > > > > > > a
> > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > > > > elaborate a bit more on that 
> > > > > > > > > > > > > > > > > > > > > > > > > in the FLIP as you
> > > > asked,
> > > > > > > > > > Jark.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > I do agree mostly with Danny 
> > > > > > > > > > > > > > > > > > > > > > > > > on how we should do
> > > > that.
> > > > > > > > > One
> > > > > > > > > > > > > > > > > > additional
> > > > > > > > > > > > > > > > > > > > > > > > > things I would introduce is an
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > interface SupportsMetadata {
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > boolean 
> > > > > > > > > > > > > > > > > > > > > > > > > supportsMetadata(Set<String>
> > > > > > > > > > > > > > > > > > > > > > > > > metadataFields);
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > TableSource 
> > > > > > > > > > > > > > > > > > > > > > > > > generateMetadataFields(Set<String>
> > > > > > > > > > > > metadataFields);
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > }
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > This way the source would 
> > > > > > > > > > > > > > > > > > > > > > > > > have to declare/emit only
> > > > the
> > > > > > > > > > > > > > requested
> > > > > > > > > > > > > > > > > > > > > > > > > metadata fields. In order not 
> > > > > > > > > > > > > > > > > > > > > > > > > to clash with user
> > > > > > > > defined
> > > > > > > > > > > > > > fields.
> > > > > > > > > > > > > > > > > > When
> > > > > > > > > > > > > > > > > > > > > > > > > emitting the metadata field I 
> > > > > > > > > > > > > > > > > > > > > > > > > would prepend the
> > > > column
> > > > > > > > > name
> > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > > > > > > > __system_{property_name}. 
> > > > > > > > > > > > > > > > > > > > > > > > > Therefore when requested
> > > > > > > > > > > > > > > > > > > > > > > > > SYSTEM_METADATA("partition") 
> > > > > > > > > > > > > > > > > > > > > > > > > the source would
> > > > > > > > > > > > > > > > > > > > > > > > > append
> > > > a
> > > > > > > > > > field
> > > > > > > > > > > > > > > > > > > > > > > > > __system_partition to the 
> > > > > > > > > > > > > > > > > > > > > > > > > schema. This would be
> > > > > > > > > > > > > > > > > > > > > > > > > never
> > > > > > > > > > visible
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > user as it would be used only 
> > > > > > > > > > > > > > > > > > > > > > > > > for the subsequent
> > > > > > > > computed
> > > > > > > > > > > > > > columns.
> > > > > > > > > > > > > > > > > > If
> > > > > > > > > > > > > > > > > > > > > > > > > that makes sense to you, I 
> > > > > > > > > > > > > > > > > > > > > > > > > will update the FLIP
> > > > > > > > > > > > > > > > > > > > > > > > > with
> > > > > > > > this
> > > > > > > > > > > > > > > > > > description.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > 2. CAST vs explicit type in 
> > > > > > > > > > > > > > > > > > > > > > > > > computed columns
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > Here I agree with Danny. It 
> > > > > > > > > > > > > > > > > > > > > > > > > is also the current
> > > > > > > > > > > > > > > > > > > > > > > > > state
> > > > > > > > of
> > > > > > > > > > the
> > > > > > > > > > > > > > > > > > proposal.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > 3. Partitioning on computed 
> > > > > > > > > > > > > > > > > > > > > > > > > column vs function
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > Here I also agree with Danny. 
> > > > > > > > > > > > > > > > > > > > > > > > > I also think those
> > > > > > > > > > > > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > orthogonal. I
> > > > > > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > > > > > > > > > > leave out the STORED computed 
> > > > > > > > > > > > > > > > > > > > > > > > > columns out of the
> > > > > > > > > > discussion.
> > > > > > > > > > > > I
> > > > > > > > > > > > > > > > > > don't see
> > > > > > > > > > > > > > > > > > > > > > > > > how do they relate to the 
> > > > > > > > > > > > > > > > > > > > > > > > > partitioning. I
> > > > > > > > > > > > > > > > > > > > > > > > > already put
> > > > > > > > > both
> > > > > > > > > > of
> > > > > > > > > > > > > > those
> > > > > > > > > > > > > > > > > > > > > > > > > cases in the document. We can 
> > > > > > > > > > > > > > > > > > > > > > > > > either partition on a
> > > > > > > > > > computed
> > > > > > > > > > > > > > > > > > column or
> > > > > > > > > > > > > > > > > > > > > > > > > use a udf in a partioned by 
> > > > > > > > > > > > > > > > > > > > > > > > > clause. I am fine with
> > > > > > > > > leaving
> > > > > > > > > > > > out
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > partitioning by udf in the 
> > > > > > > > > > > > > > > > > > > > > > > > > first version if you
> > > > > > > > > > > > > > > > > > > > > > > > > still
> > > > > > > > > have
> > > > > > > > > > > > some
> > > > > > > > > > > > > > > > > > > > > > > > concerns.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > As for your question Danny. 
> > > > > > > > > > > > > > > > > > > > > > > > > It depends which
> > > > > > > > partitioning
> > > > > > > > > > > > > > strategy
> > > > > > > > > > > > > > > > > > you
> > > > > > > > > > > > > > > > > > > > > > > > use.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > For the HASH partitioning 
> > > > > > > > > > > > > > > > > > > > > > > > > strategy I thought it
> > > > > > > > > > > > > > > > > > > > > > > > > would
> > > > > > > > > work
> > > > > > > > > > as
> > > > > > > > > > > > > > you
> > > > > > > > > > > > > > > > > > > > > > > > > explained. It would be N = 
> > > > > > > > > > > > > > > > > > > > > > > > > MOD(expr, num). I am not
> > > > > > > > sure
> > > > > > > > > > > > > > though if
> > > > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > > > > > > > > should introduce the 
> > > > > > > > > > > > > > > > > > > > > > > > > PARTITIONS clause. Usually
> > > > > > > > > > > > > > > > > > > > > > > > > Flink
> > > > > > > > > does
> > > > > > > > > > > > not
> > > > > > > > > > > > > > own
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > data and the partitions are 
> > > > > > > > > > > > > > > > > > > > > > > > > already an intrinsic
> > > > > > > > property
> > > > > > > > > > of
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > underlying source e.g. for 
> > > > > > > > > > > > > > > > > > > > > > > > > kafka we do not create
> > > > > > > > topics,
> > > > > > > > > > but
> > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > just
> > > > > > > > > > > > > > > > > > > > > > > > > describe pre-existing 
> > > > > > > > > > > > > > > > > > > > > > > > > pre-partitioned topic.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > 4. timestamp vs 
> > > > > > > > > > > > > > > > > > > > > > > > > timestamp.field vs
> > > > > > > > > > > > > > > > > > > > > > > > > connector.field vs
> > > > > > > > ...
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > I am fine with changing it to 
> > > > > > > > > > > > > > > > > > > > > > > > > timestamp.field to be
> > > > > > > > > > > > consistent
> > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > > > > > > > other value.fields and 
> > > > > > > > > > > > > > > > > > > > > > > > > key.fields. Actually that
> > > > > > > > > > > > > > > > > > > > > > > > > was
> > > > > > > > also
> > > > > > > > > > my
> > > > > > > > > > > > > > > > > > initial
> > > > > > > > > > > > > > > > > > > > > > > > > proposal in a first draft I 
> > > > > > > > > > > > > > > > > > > > > > > > > prepared. I changed it
> > > > > > > > > > afterwards
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > shorten
> > > > > > > > > > > > > > > > > > > > > > > > > the key.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > Dawid
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > On 03/03/2020 09:00, Danny 
> > > > > > > > > > > > > > > > > > > > > > > > > Chan wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Dawid for bringing 
> > > > > > > > > > > > > > > > > > > > > > > > > > up this discussion, I
> > > > think
> > > > > > > > it
> > > > > > > > > > is
> > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > useful
> > > > > > > > > > > > > > > > > > > > > > > > > feature ~
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > About how the metadata 
> > > > > > > > > > > > > > > > > > > > > > > > > > outputs from source
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > I think it is completely 
> > > > > > > > > > > > > > > > > > > > > > > > > > orthogonal, computed
> > > > > > > > > > > > > > > > > > > > > > > > > > column
> > > > > > > > > push
> > > > > > > > > > > > > > down is
> > > > > > > > > > > > > > > > > > > > > > > > > another topic, this should 
> > > > > > > > > > > > > > > > > > > > > > > > > not be a blocker but a
> > > > > > > > > > promotion,
> > > > > > > > > > > > > > if we
> > > > > > > > > > > > > > > > > > do
> > > > > > > > > > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > > > > > > > > have any filters on the 
> > > > > > > > > > > > > > > > > > > > > > > > > computed column, there
> > > > > > > > > > > > > > > > > > > > > > > > > is no
> > > > > > > > need
> > > > > > > > > > to
> > > > > > > > > > > > > > do any
> > > > > > > > > > > > > > > > > > > > > > > > > pushings; the source node 
> > > > > > > > > > > > > > > > > > > > > > > > > just emit the complete
> > > > record
> > > > > > > > > > with
> > > > > > > > > > > > > > full
> > > > > > > > > > > > > > > > > > > > > > > > metadata
> > > > > > > > > > > > > > > > > > > > > > > > > with the declared physical 
> > > > > > > > > > > > > > > > > > > > > > > > > schema, then when
> > > > generating
> > > > > > > > > the
> > > > > > > > > > > > > > virtual
> > > > > > > > > > > > > > > > > > > > > > > > > columns, we would extract the 
> > > > > > > > > > > > > > > > > > > > > > > > > metadata info and
> > > > output
> > > > > > > > as
> > > > > > > > > > > > full
> > > > > > > > > > > > > > > > > > > > > > > > columns(with
> > > > > > > > > > > > > > > > > > > > > > > > > full schema).
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > About the type of metadata 
> > > > > > > > > > > > > > > > > > > > > > > > > > column
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > Personally i prefer 
> > > > > > > > > > > > > > > > > > > > > > > > > > explicit type instead of 
> > > > > > > > > > > > > > > > > > > > > > > > > > CAST,
> > > > > > > > they
> > > > > > > > > > are
> > > > > > > > > > > > > > > > > > symantic
> > > > > > > > > > > > > > > > > > > > > > > > > equivalent though, explict 
> > > > > > > > > > > > > > > > > > > > > > > > > type is more
> > > > > > > > straight-forward
> > > > > > > > > > and
> > > > > > > > > > > > > > we can
> > > > > > > > > > > > > > > > > > > > > > > > declare
> > > > > > > > > > > > > > > > > > > > > > > > > the nullable attribute there.
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > About option A: 
> > > > > > > > > > > > > > > > > > > > > > > > > > partitioning based on 
> > > > > > > > > > > > > > > > > > > > > > > > > > acomputed
> > > > column
> > > > > > > > > VS
> > > > > > > > > > > > > > option
> > > > > > > > > > > > > > > > > > B:
> > > > > > > > > > > > > > > > > > > > > > > > > partitioning with just a 
> > > > > > > > > > > > > > > > > > > > > > > > > function
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > >      From the FLIP, it 
> > > > > > > > > > > > > > > > > > > > > > > > > > seems that B's
> > > > > > > > > > > > > > > > > > > > > > > > > > partitioning is
> > > > > > > > > just
> > > > > > > > > > a
> > > > > > > > > > > > > > strategy
> > > > > > > > > > > > > > > > > > when
> > > > > > > > > > > > > > > > > > > > > > > > > writing data, the partiton 
> > > > > > > > > > > > > > > > > > > > > > > > > column is not
> > > > > > > > > > > > > > > > > > > > > > > > > included in
> > > > > > > > the
> > > > > > > > > > > > table
> > > > > > > > > > > > > > > > > > schema,
> > > > > > > > > > > > > > > > > > > > > > > > so
> > > > > > > > > > > > > > > > > > > > > > > > > it's just useless when 
> > > > > > > > > > > > > > > > > > > > > > > > > reading from that.
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > - Compared to A, we do not 
> > > > > > > > > > > > > > > > > > > > > > > > > > need to generate the
> > > > > > > > > partition
> > > > > > > > > > > > > > column
> > > > > > > > > > > > > > > > > > when
> > > > > > > > > > > > > > > > > > > > > > > > > selecting from the table(but 
> > > > > > > > > > > > > > > > > > > > > > > > > insert into)
> > > > > > > > > > > > > > > > > > > > > > > > > > - For A we can also mark 
> > > > > > > > > > > > > > > > > > > > > > > > > > the column as STORED when
> > > > we
> > > > > > > > > want
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > persist
> > > > > > > > > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > So in my opition they are 
> > > > > > > > > > > > > > > > > > > > > > > > > > orthogonal, we can
> > > > > > > > > > > > > > > > > > > > > > > > > > support
> > > > > > > > > > both, i
> > > > > > > > > > > > > > saw
> > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > > > MySQL/Oracle[1][2] would 
> > > > > > > > > > > > > > > > > > > > > > > > > suggest to also define the
> > > > > > > > > > > > PARTITIONS
> > > > > > > > > > > > > > > > > > num, and
> > > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > partitions are managed under 
> > > > > > > > > > > > > > > > > > > > > > > > > a "tablenamespace",
> > > > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > partition
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > which
> > > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > record is stored is partition 
> > > > > > > > > > > > > > > > > > > > > > > > > number N, where N =
> > > > > > > > > MOD(expr,
> > > > > > > > > > > > > > num),
> > > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > > > > your
> > > > > > > > > > > > > > > > > > > > > > > > > design, which partiton the 
> > > > > > > > > > > > > > > > > > > > > > > > > record would persist ?
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > [1]
> > > > > > > > > > > > > > > > > >
> > > > > > > > https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html
> > > > > > > > > > > > > > > > > > > > > > > > > > [2]
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270
> > > >
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > > > > > > > Danny Chan
> > > > > > > > > > > > > > > > > > > > > > > > > > 在 2020年3月2日 +0800 
> > > > > > > > > > > > > > > > > > > > > > > > > > PM6:16,Dawid Wysakowicz <
> > > > > > > > > > > > > > dwysakow...@apache.org
> > > > > > > > > > > > > > > > > > > > > > > > > ,写道:
> > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Jark,
> > > > > > > > > > > > > > > > > > > > > > > > > > > Ad. 2 I added a section 
> > > > > > > > > > > > > > > > > > > > > > > > > > > to discuss relation to
> > > > > > > > FLIP-63
> > > > > > > > > > > > > > > > > > > > > > > > > > > Ad. 3 Yes, I also tried 
> > > > > > > > > > > > > > > > > > > > > > > > > > > to somewhat keep
> > > > > > > > > > > > > > > > > > > > > > > > > > > hierarchy
> > > > of
> > > > > > > > > > > > > > properties.
> > > > > > > > > > > > > > > > > > > > > > > > > Therefore you have the 
> > > > > > > > > > > > > > > > > > > > > > > > > key.format.type.
> > > > > > > > > > > > > > > > > > > > > > > > > > > I also considered exactly 
> > > > > > > > > > > > > > > > > > > > > > > > > > > what you are suggesting
> > > > > > > > > > > > (prefixing
> > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > > > > > > > connector or kafka). I 
> > > > > > > > > > > > > > > > > > > > > > > > > should've put that into an
> > > > > > > > > > > > > > Option/Rejected
> > > > > > > > > > > > > > > > > > > > > > > > > alternatives.
> > > > > > > > > > > > > > > > > > > > > > > > > > > I agree timestamp, key.*, 
> > > > > > > > > > > > > > > > > > > > > > > > > > > value.* are connector
> > > > > > > > > > properties.
> > > > > > > > > > > > > > Why I
> > > > > > > > > > > > > > > > > > > > > > > > > wanted to suggest not adding 
> > > > > > > > > > > > > > > > > > > > > > > > > that prefix in the
> > > > > > > > > > > > > > > > > > > > > > > > > first
> > > > > > > > > > version
> > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > > > actually all the properties 
> > > > > > > > > > > > > > > > > > > > > > > > > in the WITH section are
> > > > > > > > > > connector
> > > > > > > > > > > > > > > > > > > > > > > > properties.
> > > > > > > > > > > > > > > > > > > > > > > > > Even format is in the end a 
> > > > > > > > > > > > > > > > > > > > > > > > > connector property as
> > > > some
> > > > > > > > of
> > > > > > > > > > the
> > > > > > > > > > > > > > > > > > sources
> > > > > > > > > > > > > > > > > > > > > > > > might
> > > > > > > > > > > > > > > > > > > > > > > > > not have a format, imo. The 
> > > > > > > > > > > > > > > > > > > > > > > > > benefit of not
> > > > > > > > > > > > > > > > > > > > > > > > > adding the
> > > > > > > > > > prefix
> > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > that it
> > > > > > > > > > > > > > > > > > > > > > > > > makes the keys a bit shorter. 
> > > > > > > > > > > > > > > > > > > > > > > > > Imagine prefixing all
> > > > the
> > > > > > > > > > > > > > properties
> > > > > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > > > > > > > connector (or if we go with 
> > > > > > > > > > > > > > > > > > > > > > > > > FLINK-12557:
> > > > > > > > elasticsearch):
> > > > > > > > > > > > > > > > > > > > > > > > > > > elasticsearch.key.format.type:
> > > > > > > > > > > > > > > > > > > > > > > > > > >  csv
> > > > > > > > > > > > > > > > > > > > > > > > > > > elasticsearch.key.format.field:
> > > > > > > > > > > > > > > > > > > > > > > > > > >  ....
> > > > > > > > > > > > > > > > > > > > > > > > > > > elasticsearch.key.format.delimiter:
> > > > > > > > > > > > > > > > > > > > > > > > > > >  ....
> > > > > > > > > > > > > > > > > > > > > > > > > > > elasticsearch.key.format.*:
> > > > > > > > > > > > > > > > > > > > > > > > > > >  ....
> > > > > > > > > > > > > > > > > > > > > > > > > > > I am fine with doing it 
> > > > > > > > > > > > > > > > > > > > > > > > > > > though if this is a
> > > > preferred
> > > > > > > > > > > > > > approach
> > > > > > > > > > > > > > > > > > in the
> > > > > > > > > > > > > > > > > > > > > > > > > community.
> > > > > > > > > > > > > > > > > > > > > > > > > > > Ad in-line comments:
> > > > > > > > > > > > > > > > > > > > > > > > > > > I forgot to update the 
> > > > > > > > > > > > > > > > > > > > > > > > > > > `value.fields.include`
> > > > > > > > property.
> > > > > > > > > > It
> > > > > > > > > > > > > > > > > > should be
> > > > > > > > > > > > > > > > > > > > > > > > > value.fields-include. Which I 
> > > > > > > > > > > > > > > > > > > > > > > > > think you also
> > > > suggested
> > > > > > > > in
> > > > > > > > > > the
> > > > > > > > > > > > > > > > > > comment,
> > > > > > > > > > > > > > > > > > > > > > > > > right?
> > > > > > > > > > > > > > > > > > > > > > > > > > > As for the cast vs 
> > > > > > > > > > > > > > > > > > > > > > > > > > > declaring output type of
> > > > computed
> > > > > > > > > > > > column.
> > > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > > > think
> > > > > > > > > > > > > > > > > > > > > > > > > it's better not to use CAST, 
> > > > > > > > > > > > > > > > > > > > > > > > > but declare a type
> > > > > > > > > > > > > > > > > > > > > > > > > of an
> > > > > > > > > > > > > > expression
> > > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > > > > > later
> > > > > > > > > > > > > > > > > > > > > > > > > on infer the output type of 
> > > > > > > > > > > > > > > > > > > > > > > > > SYSTEM_METADATA. The
> > > > reason
> > > > > > > > > is
> > > > > > > > > > I
> > > > > > > > > > > > > > think
> > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > > > > way
> > > > > > > > > > > > > > > > > > > > > > > > > it will be easier to 
> > > > > > > > > > > > > > > > > > > > > > > > > implement e.g. filter push
> > > > > > > > > > > > > > > > > > > > > > > > > downs
> > > > > > > > > when
> > > > > > > > > > > > > > working
> > > > > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > native types of the source, 
> > > > > > > > > > > > > > > > > > > > > > > > > e.g. in case of Kafka's
> > > > > > > > > > offset, i
> > > > > > > > > > > > > > > > > > think it's
> > > > > > > > > > > > > > > > > > > > > > > > > better to pushdown long 
> > > > > > > > > > > > > > > > > > > > > > > > > rather than string. This
> > > > could
> > > > > > > > > let
> > > > > > > > > > us
> > > > > > > > > > > > > > push
> > > > > > > > > > > > > > > > > > > > > > > > > expression like e.g. offset > 
> > > > > > > > > > > > > > > > > > > > > > > > > 12345 & offset <
> > > > > > > > > > > > > > > > > > > > > > > > > 59382.
> > > > > > > > > > > > > > Otherwise we
> > > > > > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > > > > > > > > > > have to push down 
> > > > > > > > > > > > > > > > > > > > > > > > > cast(offset, long) > 12345 &&
> > > > > > > > > > cast(offset,
> > > > > > > > > > > > > > long)
> > > > > > > > > > > > > > > > > > <
> > > > > > > > > > > > > > > > > > > > > > > > 59382.
> > > > > > > > > > > > > > > > > > > > > > > > > Moreover I think we need to 
> > > > > > > > > > > > > > > > > > > > > > > > > introduce the type for
> > > > > > > > > computed
> > > > > > > > > > > > > > columns
> > > > > > > > > > > > > > > > > > > > > > > > anyway
> > > > > > > > > > > > > > > > > > > > > > > > > to support functions that 
> > > > > > > > > > > > > > > > > > > > > > > > > infer output type
> > > > > > > > > > > > > > > > > > > > > > > > > based on
> > > > > > > > > > expected
> > > > > > > > > > > > > > > > > > return
> > > > > > > > > > > > > > > > > > > > > > > > type.
> > > > > > > > > > > > > > > > > > > > > > > > > > > As for the computed 
> > > > > > > > > > > > > > > > > > > > > > > > > > > column push down. Yes,
> > > > > > > > > > SYSTEM_METADATA
> > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > > > have
> > > > > > > > > > > > > > > > > > > > > > > > > to be pushed down to the 
> > > > > > > > > > > > > > > > > > > > > > > > > source. If it is not
> > > > possible
> > > > > > > > > the
> > > > > > > > > > > > > > planner
> > > > > > > > > > > > > > > > > > > > > > > > should
> > > > > > > > > > > > > > > > > > > > > > > > > fail. As far as I know 
> > > > > > > > > > > > > > > > > > > > > > > > > computed columns push down
> > > > will
> > > > > > > > be
> > > > > > > > > > > > part
> > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > source
> > > > > > > > > > > > > > > > > > > > > > > > > rework, won't it? ;)
> > > > > > > > > > > > > > > > > > > > > > > > > > > As for the persisted 
> > > > > > > > > > > > > > > > > > > > > > > > > > > computed column. I think
> > > > > > > > > > > > > > > > > > > > > > > > > > > it is
> > > > > > > > > > > > > > completely
> > > > > > > > > > > > > > > > > > > > > > > > > orthogonal. In my current 
> > > > > > > > > > > > > > > > > > > > > > > > > proposal you can also
> > > > > > > > partition
> > > > > > > > > > by
> > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > computed
> > > > > > > > > > > > > > > > > > > > > > > > > column. The difference 
> > > > > > > > > > > > > > > > > > > > > > > > > between using a udf in
> > > > > > > > partitioned
> > > > > > > > > > by
> > > > > > > > > > > > vs
> > > > > > > > > > > > > > > > > > > > > > > > partitioned
> > > > > > > > > > > > > > > > > > > > > > > > > by a computed column is that 
> > > > > > > > > > > > > > > > > > > > > > > > > when you partition
> > > > > > > > > > > > > > > > > > > > > > > > > by a
> > > > > > > > > > computed
> > > > > > > > > > > > > > > > > > column
> > > > > > > > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > > > > > column must be also computed 
> > > > > > > > > > > > > > > > > > > > > > > > > when reading the
> > > > > > > > > > > > > > > > > > > > > > > > > table.
> > > > If
> > > > > > > > > you
> > > > > > > > > > > > > > use a
> > > > > > > > > > > > > > > > > > udf in
> > > > > > > > > > > > > > > > > > > > > > > > > the partitioned by, the 
> > > > > > > > > > > > > > > > > > > > > > > > > expression is computed only
> > > > > > > > when
> > > > > > > > > > > > > > inserting
> > > > > > > > > > > > > > > > > > into
> > > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > table.
> > > > > > > > > > > > > > > > > > > > > > > > > > > Hope this answers some of 
> > > > > > > > > > > > > > > > > > > > > > > > > > > your questions. Looking
> > > > > > > > > forward
> > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > further
> > > > > > > > > > > > > > > > > > > > > > > > > suggestions.
> > > > > > > > > > > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > > > > > > > > Dawid
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > On 02/03/2020 05:18, Jark 
> > > > > > > > > > > > > > > > > > > > > > > > > > > Wu wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi,
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Dawid for 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > starting such a great
> > > > > > > > > > > > > > > > > > > > > > > > > > > > discussion.
> > > > > > > > > > Reaing
> > > > > > > > > > > > > > > > > > metadata
> > > > > > > > > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > > > > > > > > > key-part information 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > from source is an 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > important
> > > > > > > > > feature
> > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > > > > streaming
> > > > > > > > > > > > > > > > > > > > > > > > > > > > users.
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > In general, I agree 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > with the proposal of the
> > > > > > > > > > > > > > > > > > > > > > > > > > > > FLIP.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > I will leave my 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > thoughts and comments 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > here:
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > 1) +1 to use connector 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > properties instead of
> > > > > > > > > introducing
> > > > > > > > > > > > > > HEADER
> > > > > > > > > > > > > > > > > > > > > > > > > keyword as
> > > > > > > > > > > > > > > > > > > > > > > > > > > > the reason you 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > mentioned in the FLIP.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > 2) we already 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > introduced PARTITIONED 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > BY in
> > > > FLIP-63.
> > > > > > > > > > Maybe
> > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > should
> > > > > > > > > > > > > > > > > > > > > > > > > add a
> > > > > > > > > > > > > > > > > > > > > > > > > > > > section to explain 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > what's the relationship
> > > > > > > > > > > > > > > > > > > > > > > > > > > > between
> > > > > > > > > them.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Do their concepts 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > conflict? Could INSERT
> > > > > > > > > > > > > > > > > > > > > > > > > > > > PARTITION
> > > > > > > > be
> > > > > > > > > > used
> > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > > > > PARTITIONED table in 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > this FLIP?
> > > > > > > > > > > > > > > > > > > > > > > > > > > > 3) Currently, 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > properties are 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > hierarchical in
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Flink
> > > > > > > > > SQL.
> > > > > > > > > > > > > > Shall we
> > > > > > > > > > > > > > > > > > > > > > > > make
> > > > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > > > > new introduced 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > properties more 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > hierarchical?
> > > > > > > > > > > > > > > > > > > > > > > > > > > > For example, 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > "timestamp" =>
> > > > > > > > > > > > > > > > > > > > > > > > > > > > "connector.timestamp"?
> > > > > > > > > > > > > > (actually, I
> > > > > > > > > > > > > > > > > > > > > > > > > prefer
> > > > > > > > > > > > > > > > > > > > > > > > > > > > "kafka.timestamp" which 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > is another
> > > > > > > > > > > > > > > > > > > > > > > > > > > > improvement for
> > > > > > > > > > > > > > properties
> > > > > > > > > > > > > > > > > > > > > > > > > FLINK-12557)
> > > > > > > > > > > > > > > > > > > > > > > > > > > > A single "timestamp" in 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > properties may mislead
> > > > users
> > > > > > > > > > that
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > field
> > > > > > > > > > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > > > > > > > > > > a rowtime attribute.
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > I also left some minor 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > comments in the FLIP.
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Jark
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sun, 1 Mar 2020 at 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > 22:30, Dawid Wysakowicz 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > <
> > > > > > > > > > > > > > > > > > > > > > > > dwysakow...@apache.org>
> > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > I would like to 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > propose an 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > improvement that
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > would
> > > > > > > > > > enable
> > > > > > > > > > > > > > > > > > reading
> > > > > > > > > > > > > > > > > > > > > > > > table
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > columns from 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > different parts of 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > source records.
> > > > > > > > > Besides
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > main
> > > > > > > > > > > > > > > > > > > > > > > > > payload
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > majority (if not all 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > of the sources) expose
> > > > > > > > > additional
> > > > > > > > > > > > > > > > > > > > > > > > information. It
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > can be simply a 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > read-only metadata 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > such as
> > > > offset,
> > > > > > > > > > > > > > ingestion
> > > > > > > > > > > > > > > > > > time
> > > > > > > > > > > > > > > > > > > > > > > > or a
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > read and write parts 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > of the record that 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > contain
> > > > > > > > data
> > > > > > > > > > but
> > > > > > > > > > > > > > > > > > > > > > > > additionally
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > serve different 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > purposes 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > (partitioning,
> > > > compaction
> > > > > > > > > > etc.),
> > > > > > > > > > > > > > e.g.
> > > > > > > > > > > > > > > > > > key
> > > > > > > > > > > > > > > > > > > > > > > > or
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > timestamp in Kafka.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > We should make it 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > possible to read and 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > write
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > data
> > > > > > > > > from
> > > > > > > > > > > > all
> > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > those
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > locations. In this 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > proposal I discuss 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > reading
> > > > > > > > > > > > partitioning
> > > > > > > > > > > > > > > > > > data,
> > > > > > > > > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > completeness this 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > proposal discusses 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > also the
> > > > > > > > > > > > partitioning
> > > > > > > > > > > > > > when
> > > > > > > > > > > > > > > > > > > > > > > > > writing
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > data out.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am looking forward 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > to your comments.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > You can access the 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > FLIP here:
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode
> > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Dawid
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > >
> > > >
> > >
> >
>

Reply via email to