Hi, Timo

Thanks for your explanation, it makes sense to me.

Best,
Leonard


>> Hi, Timo
>> Thanks for the update
>> I have a minor suggestion about the debezium metadata key,
>> Could we use the original  debezium key rather than import new key?
>> debezium-json.schema                            => debezium-json.schema
>> debezium-json.ingestion-timestamp  =>  debezium-json.ts_ms
>> debezium-json.source.database       =>  debezium-json.source.db
>> debezium-json.source.schema         =>  debezium-json.source.schema
>> debezium-json.source.table              =>  debezium-json.source.table
>> debezium-json.source.timestamp     =>  debezium-json.source.ts_ms
>> debezium-json.source.properties      =>  debezium-json.source MAP<STRING, 
>> STRING>
>>  User who familiar with debezium will understand the key easier,  and the 
>> key syntax is more json-path like. HDYT?
>> The other part looks really good to me.
>> Regards,
>> Leonard
>>> 在 2020年9月10日,18:26,Aljoscha Krettek <aljos...@apache.org> 写道:
>>> 
>>> I've only been watching this from the sidelines but that latest proposal 
>>> looks very good to me!
>>> 
>>> Aljoscha
>>> 
>>> On 10.09.20 12:20, Kurt Young wrote:
>>>> The new syntax looks good to me.
>>>> Best,
>>>> Kurt
>>>> On Thu, Sep 10, 2020 at 5:57 PM Jark Wu <imj...@gmail.com> wrote:
>>>>> Hi Timo,
>>>>> 
>>>>> I have one minor suggestion.
>>>>> Maybe the default data type of `timestamp`  can be `TIMESTAMP(3) WITH
>>>>> LOCAL TIME ZONE`, because this is the type that users want to use, this 
>>>>> can
>>>>> avoid unnecessary casting.
>>>>> Besides, currently, the bigint is casted to timestamp in seconds, so the
>>>>> implicit cast may not work...
>>>>> 
>>>>> I don't have other objections. But maybe we should wait for the
>>>>> opinion from @Kurt for the new syntax.
>>>>> 
>>>>> Best,
>>>>> Jark
>>>>> 
>>>>> 
>>>>> On Thu, 10 Sep 2020 at 16:21, Danny Chan <yuzhao....@gmail.com> wrote:
>>>>> 
>>>>>> Thanks for driving this Timo, +1 for voting ~
>>>>>> 
>>>>>> Best,
>>>>>> Danny Chan
>>>>>> 在 2020年9月10日 +0800 PM3:47,Timo Walther <twal...@apache.org>,写道:
>>>>>>> Thanks everyone for this healthy discussion. I updated the FLIP with the
>>>>>>> outcome. I think the result is very powerful but also very easy to
>>>>>>> declare. Thanks for all the contributions.
>>>>>>> 
>>>>>>> If there are no objections, I would continue with a voting.
>>>>>>> 
>>>>>>> What do you think?
>>>>>>> 
>>>>>>> Regards,
>>>>>>> Timo
>>>>>>> 
>>>>>>> 
>>>>>>> On 09.09.20 16:52, Timo Walther wrote:
>>>>>>>> "If virtual by default, when a user types "timestamp int" ==>
>>>>>> persisted
>>>>>>>> column, then adds a "metadata" after that ==> virtual column, then
>>>>>> adds
>>>>>>>> a "persisted" after that ==> persisted column."
>>>>>>>> 
>>>>>>>> Thanks for this nice mental model explanation, Jark. This makes total
>>>>>>>> sense to me. Also making the the most common case as short at just
>>>>>>>> adding `METADATA` is a very good idea. Thanks, Danny!
>>>>>>>> 
>>>>>>>> Let me update the FLIP again with all these ideas.
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> Timo
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On 09.09.20 15:03, Jark Wu wrote:
>>>>>>>>> I'm also +1 to Danny's proposal: timestamp INT METADATA [FROM
>>>>>>>>> 'my-timestamp-field'] [VIRTUAL]
>>>>>>>>> Especially I like the shortcut: timestamp INT METADATA, this makes
>>>>>> the
>>>>>>>>> most
>>>>>>>>> common case to be supported in the simplest way.
>>>>>>>>> 
>>>>>>>>> I also think the default should be "PERSISTED", so VIRTUAL is
>>>>>> optional
>>>>>>>>> when
>>>>>>>>> you are accessing a read-only metadata. Because:
>>>>>>>>> 1. The "timestamp INT METADATA" should be a normal column, because
>>>>>>>>> "METADATA" is just a modifier to indicate it is from metadata, a
>>>>>> normal
>>>>>>>>> column should be persisted.
>>>>>>>>>      If virtual by default, when a user types "timestamp int" ==>
>>>>>>>>> persisted
>>>>>>>>> column, then adds a "metadata" after that ==> virtual column, then
>>>>>> adds a
>>>>>>>>> "persisted" after that ==> persisted column.
>>>>>>>>>      I think this looks reversed several times and makes users
>>>>>> confused.
>>>>>>>>> Physical fields are also prefixed with "fieldName TYPE", so
>>>>>> "timestamp
>>>>>>>>> INT
>>>>>>>>> METADATA" is persisted is very straightforward.
>>>>>>>>> 2. From the collected user question [1], we can see that "timestamp"
>>>>>>>>> is the
>>>>>>>>> most common use case. "timestamp" is a read-write metadata.
>>>>>> Persisted by
>>>>>>>>> default doesn't break the reading behavior.
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> Jark
>>>>>>>>> 
>>>>>>>>> [1]: https://issues.apache.org/jira/browse/FLINK-15869
>>>>>>>>> 
>>>>>>>>> On Wed, 9 Sep 2020 at 20:56, Leonard Xu <xbjt...@gmail.com> wrote:
>>>>>>>>> 
>>>>>>>>>> Thanks @Dawid for the nice summary, I think you catch all
>>>>>> opinions of
>>>>>>>>>> the
>>>>>>>>>> long discussion well.
>>>>>>>>>> 
>>>>>>>>>> @Danny
>>>>>>>>>> “ timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL]
>>>>>>>>>>   Note that the "FROM 'field name'" is only needed when the name
>>>>>>>>>> conflict
>>>>>>>>>>   with the declared table column name, when there are no
>>>>>> conflicts,
>>>>>>>>>> we can
>>>>>>>>>> simplify it to
>>>>>>>>>>        timestamp INT METADATA"
>>>>>>>>>> 
>>>>>>>>>> I really like the proposal, there is no confusion with computed
>>>>>>>>>> column any
>>>>>>>>>> more,  and it’s concise enough.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> @Timo @Dawid
>>>>>>>>>> “We use `SYSTEM_TIME` for temporal tables. I think prefixing with
>>>>>> SYSTEM
>>>>>>>>>> makes it clearer that it comes magically from the system.”
>>>>>>>>>> “As for the issue of shortening the SYSTEM_METADATA to METADATA.
>>>>>> Here I
>>>>>>>>>> very much prefer the SYSTEM_ prefix.”
>>>>>>>>>> 
>>>>>>>>>> I think `SYSTEM_TIME` is different with `SYSTEM_METADATA ` a lot,
>>>>>>>>>> First of all,  the word `TIME` has broad meanings but the word
>>>>>>>>>> `METADATA `
>>>>>>>>>> not,  `METADATA ` has specific meaning,
>>>>>>>>>> Secondly, `FOR SYSTEM_TIME AS OF` exists in SQL standard but
>>>>>>>>>> `SYSTEM_METADATA ` not.
>>>>>>>>>> Personally, I like more simplify way,sometimes  less is more.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Best,
>>>>>>>>>> Leonard
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Timo Walther <twal...@apache.org> 于2020年9月9日周三 下午6:41写道:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>> 
>>>>>>>>>>>> "key" and "value" in the properties are a special case
>>>>>> because they
>>>>>>>>>>>> need
>>>>>>>>>>>> to configure a format. So key and value are more than just
>>>>>> metadata.
>>>>>>>>>>>> Jark's example for setting a timestamp would work but as the
>>>>>> FLIP
>>>>>>>>>>>> discusses, we have way more metadata fields like headers,
>>>>>>>>>>>> epoch-leader,
>>>>>>>>>>>> etc. Having a property for all of this metadata would mess up
>>>>>> the WITH
>>>>>>>>>>>> section entirely. Furthermore, we also want to deal with
>>>>>> metadata from
>>>>>>>>>>>> the formats. Solving this through properties as well would
>>>>>> further
>>>>>>>>>>>> complicate the property design.
>>>>>>>>>>>> 
>>>>>>>>>>>> Personally, I still like the computed column design more
>>>>>> because it
>>>>>>>>>>>> allows to have full flexibility to compute the final column:
>>>>>>>>>>>> 
>>>>>>>>>>>> timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS
>>>>>>>>>> TIMESTAMP(3)))
>>>>>>>>>>>> 
>>>>>>>>>>>> Instead of having a helper column and a real column in the
>>>>>> table:
>>>>>>>>>>>> 
>>>>>>>>>>>> helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3))
>>>>>>>>>>>> realTimestamp AS adjustTimestamp(helperTimestamp)
>>>>>>>>>>>> 
>>>>>>>>>>>> But I see that the discussion leans towards:
>>>>>>>>>>>> 
>>>>>>>>>>>> timestamp INT SYSTEM_METADATA("ts")
>>>>>>>>>>>> 
>>>>>>>>>>>> Which is fine with me. It is the shortest solution, because
>>>>>> we don't
>>>>>>>>>>>> need additional CAST. We can discuss the syntax, so that
>>>>>> confusion
>>>>>>>>>>>> with
>>>>>>>>>>>> computed columns can be avoided.
>>>>>>>>>>>> 
>>>>>>>>>>>> timestamp INT USING SYSTEM_METADATA("ts")
>>>>>>>>>>>> timestamp INT FROM SYSTEM_METADATA("ts")
>>>>>>>>>>>> timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED
>>>>>>>>>>>> 
>>>>>>>>>>>> We use `SYSTEM_TIME` for temporal tables. I think prefixing
>>>>>> with
>>>>>>>>>>>> SYSTEM
>>>>>>>>>>>> makes it clearer that it comes magically from the system.
>>>>>>>>>>>> 
>>>>>>>>>>>> What do you think?
>>>>>>>>>>>> 
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Timo
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On 09.09.20 11:41, Jark Wu wrote:
>>>>>>>>>>>>> Hi Danny,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> This is not Oracle and MySQL computed column syntax,
>>>>>> because there is
>>>>>>>>>> no
>>>>>>>>>>>>> "AS" after the type.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> If we want to use "offset INT SYSTEM_METADATA("offset")",
>>>>>> then I
>>>>>>>>>>>>> think
>>>>>>>>>> we
>>>>>>>>>>>>> must further discuss about "PERSISED" or "VIRTUAL" keyword
>>>>>> for
>>>>>>>>>> query-sink
>>>>>>>>>>>>> schema problem.
>>>>>>>>>>>>> Personally, I think we can use a shorter keyword "METADATA"
>>>>>> for
>>>>>>>>>>>>> "SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a
>>>>>> system
>>>>>>>>>>>> function
>>>>>>>>>>>>> and confuse users this looks like a computed column.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Jark
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Wed, 9 Sep 2020 at 17:23, Danny Chan <
>>>>>> danny0...@apache.org> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> "offset INT SYSTEM_METADATA("offset")"
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> This is actually Oracle or MySQL style computed column
>>>>>> syntax.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> "You are right that one could argue that "timestamp",
>>>>>> "headers" are
>>>>>>>>>>>>>> something like "key" and "value""
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I have the same feeling, both key value and headers
>>>>>> timestamp are
>>>>>>>>>> *real*
>>>>>>>>>>>>>> data
>>>>>>>>>>>>>> stored in the consumed record, they are not computed or
>>>>>> generated.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> "Trying to solve everything via properties sounds rather
>>>>>> like a hack
>>>>>>>>>> to
>>>>>>>>>>>>>> me"
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Things are not that hack if we can unify the routines or
>>>>>> the
>>>>>>>>>> definitions
>>>>>>>>>>>>>> (all from the computed column way or all from the table
>>>>>> options), i
>>>>>>>>>> also
>>>>>>>>>>>>>> think that it is a hacky that we mix in 2 kinds of syntax
>>>>>> for
>>>>>>>>>> different
>>>>>>>>>>>>>> kinds of metadata (read-only and read-write). In this
>>>>>> FLIP, we
>>>>>>>>>>>>>> declare
>>>>>>>>>>>> the
>>>>>>>>>>>>>> Kafka key fields with table options but SYSTEM_METADATA
>>>>>> for other
>>>>>>>>>>>> metadata,
>>>>>>>>>>>>>> that is a hacky thing or something in-consistent.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Kurt Young <ykt...@gmail.com> 于2020年9月9日周三 下午4:48写道:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>   I would vote for `offset INT
>>>>>> SYSTEM_METADATA("offset")`.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I don't think we can stick with the SQL standard in DDL
>>>>>> part
>>>>>>>>>>>>>>> forever,
>>>>>>>>>>>>>>> especially as there are more and more
>>>>>>>>>>>>>>> requirements coming from different connectors and
>>>>>> external systems.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Kurt
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Wed, Sep 9, 2020 at 4:40 PM Timo Walther <
>>>>>> twal...@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi Jark,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> now we are back at the original design proposed by
>>>>>> Dawid :D
>>>>>>>>>>>>>>>> Yes, we
>>>>>>>>>>>>>>>> should be cautious about adding new syntax. But the
>>>>>> length of this
>>>>>>>>>>>>>>>> discussion shows that we are looking for a good
>>>>>> long-term
>>>>>>>>>>>>>>>> solution.
>>>>>>>>>> In
>>>>>>>>>>>>>>>> this case I would rather vote for a deep integration
>>>>>> into the
>>>>>>>>>> syntax.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Computed columns are also not SQL standard compliant.
>>>>>> And our
>>>>>>>>>>>>>>>> DDL is
>>>>>>>>>>>>>>>> neither, so we have some degree of freedom here.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Trying to solve everything via properties sounds
>>>>>> rather like a
>>>>>>>>>>>>>>>> hack
>>>>>>>>>> to
>>>>>>>>>>>>>>>> me. You are right that one could argue that
>>>>>> "timestamp", "headers"
>>>>>>>>>> are
>>>>>>>>>>>>>>>> something like "key" and "value". However, mixing
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> `offset AS SYSTEM_METADATA("offset")`
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> `'timestamp.field' = 'ts'`
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> looks more confusing to users that an explicit
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> `offset AS CAST(SYSTEM_METADATA("offset") AS INT)`
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> `offset INT SYSTEM_METADATA("offset")`
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> that is symetric for both source and sink.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> What do others think?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On 09.09.20 10:09, Jark Wu wrote:
>>>>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I think we have a conclusion that the writable
>>>>>> metadata shouldn't
>>>>>>>>>> be
>>>>>>>>>>>>>>>>> defined as a computed column, but a normal column.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> "timestamp STRING SYSTEM_METADATA('timestamp')" is
>>>>>> one of the
>>>>>>>>>>>>>>> approaches.
>>>>>>>>>>>>>>>>> However, it is not SQL standard compliant, we need
>>>>>> to be cautious
>>>>>>>>>>>>>>> enough
>>>>>>>>>>>>>>>>> when adding new syntax.
>>>>>>>>>>>>>>>>> Besides, we have to introduce the `PERSISTED` or
>>>>>> `VIRTUAL`
>>>>>>>>>>>>>>>>> keyword
>>>>>>>>>> to
>>>>>>>>>>>>>>>>> resolve the query-sink schema problem if it is
>>>>>> read-only
>>>>>>>>>>>>>>>>> metadata.
>>>>>>>>>>>>>> That
>>>>>>>>>>>>>>>>> adds more stuff to learn for users.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>  From my point of view, the "timestamp",
>>>>>> "headers" are something
>>>>>>>>>> like
>>>>>>>>>>>>>>>> "key"
>>>>>>>>>>>>>>>>> and "value" that stores with the real data. So why
>>>>>> not define the
>>>>>>>>>>>>>>>>> "timestamp" in the same way with "key" by using a
>>>>>>>>>>>>>>>>> "timestamp.field"
>>>>>>>>>>>>>>>>> connector option?
>>>>>>>>>>>>>>>>> On the other side, the read-only metadata, such as
>>>>>> "offset",
>>>>>>>>>>>>>> shouldn't
>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> defined as a normal column. So why not use the
>>>>>> existing computed
>>>>>>>>>>>>>> column
>>>>>>>>>>>>>>>>> syntax for such metadata? Then we don't have the
>>>>>> query-sink
>>>>>>>>>>>>>>>>> schema
>>>>>>>>>>>>>>>> problem.
>>>>>>>>>>>>>>>>> So here is my proposal:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> CREATE TABLE kafka_table (
>>>>>>>>>>>>>>>>>     id BIGINT,
>>>>>>>>>>>>>>>>>     name STRING,
>>>>>>>>>>>>>>>>>     col1 STRING,
>>>>>>>>>>>>>>>>>     col2 STRING,
>>>>>>>>>>>>>>>>>     ts TIMESTAMP(3) WITH LOCAL TIME ZONE,    -- ts
>>>>>> is a normal
>>>>>>>>>> field,
>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>> be read and written.
>>>>>>>>>>>>>>>>>     offset AS SYSTEM_METADATA("offset")
>>>>>>>>>>>>>>>>> ) WITH (
>>>>>>>>>>>>>>>>>     'connector' = 'kafka',
>>>>>>>>>>>>>>>>>     'topic' = 'test-topic',
>>>>>>>>>>>>>>>>>     'key.fields' = 'id, name',
>>>>>>>>>>>>>>>>>     'key.format' = 'csv',
>>>>>>>>>>>>>>>>>     'value.format' = 'avro',
>>>>>>>>>>>>>>>>>     'timestamp.field' = 'ts'    -- define the
>>>>>> mapping of Kafka
>>>>>>>>>>>>>> timestamp
>>>>>>>>>>>>>>>>> );
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> INSERT INTO kafka_table
>>>>>>>>>>>>>>>>> SELECT id, name, col1, col2, rowtime FROM
>>>>>> another_table;
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I think this can solve all the problems without
>>>>>> introducing
>>>>>>>>>>>>>>>>> any new
>>>>>>>>>>>>>>>> syntax.
>>>>>>>>>>>>>>>>> The only minor disadvantage is that we separate the
>>>>>> definition
>>>>>>>>>>>>>>> way/syntax
>>>>>>>>>>>>>>>>> of read-only metadata and read-write fields.
>>>>>>>>>>>>>>>>> However, I don't think this is a big problem.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Jark
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Wed, 9 Sep 2020 at 15:09, Timo Walther <
>>>>>> twal...@apache.org>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Hi Kurt,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> thanks for sharing your opinion. I'm totally up
>>>>>> for not reusing
>>>>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>>>> columns. I think Jark was a big supporter of this
>>>>>> syntax, @Jark
>>>>>>>>>> are
>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>> fine with this as well? The non-computed column
>>>>>> approach was
>>>>>>>>>>>>>>>>>> only
>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> "slightly rejected alternative".
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Furthermore, we would need to think about how
>>>>>> such a new design
>>>>>>>>>>>>>>>>>> influences the LIKE clause though.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> However, we should still keep the `PERSISTED`
>>>>>> keyword as it
>>>>>>>>>>>>>> influences
>>>>>>>>>>>>>>>>>> the query->sink schema. If you look at the list
>>>>>> of metadata for
>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>> connectors and formats, we currently offer only
>>>>>> two writable
>>>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>>>>> fields. Otherwise, one would need to declare two
>>>>>> tables
>>>>>>>>>>>>>>>>>> whenever a
>>>>>>>>>>>>>>>>>> metadata columns is read (one for the source, one
>>>>>> for the sink).
>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>> can be quite inconvientient e.g. for just reading
>>>>>> the topic.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On 09.09.20 08:52, Kurt Young wrote:
>>>>>>>>>>>>>>>>>>> I also share the concern that reusing the
>>>>>> computed column
>>>>>>>>>>>>>>>>>>> syntax
>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>> different semantics
>>>>>>>>>>>>>>>>>>> would confuse users a lot.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Besides, I think metadata fields are
>>>>>> conceptually not the same
>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>> computed columns. The metadata
>>>>>>>>>>>>>>>>>>> field is a connector specific thing and it only
>>>>>> contains the
>>>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>>>>>> that where does the field come
>>>>>>>>>>>>>>>>>>> from (during source) or where does the field
>>>>>> need to write to
>>>>>>>>>>>>>> (during
>>>>>>>>>>>>>>>>>>> sink). It's more similar with normal
>>>>>>>>>>>>>>>>>>> fields, with assumption that all these fields
>>>>>> need going to the
>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>> part.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thus I'm more lean to the rejected alternative
>>>>>> that Timo
>>>>>>>>>> mentioned.
>>>>>>>>>>>>>>>> And I
>>>>>>>>>>>>>>>>>>> think we don't need the
>>>>>>>>>>>>>>>>>>> PERSISTED keyword, SYSTEM_METADATA should be
>>>>>> enough.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> During implementation, the framework only needs
>>>>>> to pass such
>>>>>>>>>>>>>> <field,
>>>>>>>>>>>>>>>>>>> metadata field> information to the
>>>>>>>>>>>>>>>>>>> connector, and the logic of handling such
>>>>>> fields inside the
>>>>>>>>>>>>>> connector
>>>>>>>>>>>>>>>>>>> should be straightforward.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Regarding the downside Timo mentioned:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> The disadvantage is that users cannot call
>>>>>> UDFs or parse
>>>>>>>>>>>>>> timestamps.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> I think this is fairly simple to solve. Since
>>>>>> the metadata
>>>>>>>>>>>>>>>>>>> field
>>>>>>>>>>>>>>> isn't
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>> computed column anymore, we can support
>>>>>>>>>>>>>>>>>>> referencing such fields in the computed column.
>>>>>> For example:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> CREATE TABLE kafka_table (
>>>>>>>>>>>>>>>>>>>         id BIGINT,
>>>>>>>>>>>>>>>>>>>         name STRING,
>>>>>>>>>>>>>>>>>>>         timestamp STRING
>>>>>> SYSTEM_METADATA("timestamp"),  //
>>>>>>>>>>>>>>>>>>> get the
>>>>>>>>>>>>>>>>>> timestamp
>>>>>>>>>>>>>>>>>>> field from metadata
>>>>>>>>>>>>>>>>>>>         ts AS to_timestamp(timestamp) // normal
>>>>>> computed
>>>>>>>>>>>>>>>>>>> column,
>>>>>>>>>>>>>> parse
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> string to TIMESTAMP type by using the metadata
>>>>>> field
>>>>>>>>>>>>>>>>>>> ) WITH (
>>>>>>>>>>>>>>>>>>>        ...
>>>>>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>> Kurt
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> On Tue, Sep 8, 2020 at 11:57 PM Timo Walther
>>>>>>>>>>>>>>>>>>> <twal...@apache.org
>>>>>>>>>>> 
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Hi Leonard,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> the only alternative I see is that we
>>>>>> introduce a concept that
>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>> completely different to computed columns.
>>>>>> This is also
>>>>>>>>>>>>>>>>>>>> mentioned
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> rejected alternative section of the FLIP.
>>>>>> Something like:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> CREATE TABLE kafka_table (
>>>>>>>>>>>>>>>>>>>>         id BIGINT,
>>>>>>>>>>>>>>>>>>>>         name STRING,
>>>>>>>>>>>>>>>>>>>>         timestamp INT
>>>>>> SYSTEM_METADATA("timestamp") PERSISTED,
>>>>>>>>>>>>>>>>>>>>         headers MAP<STRING, BYTES>
>>>>>> SYSTEM_METADATA("headers")
>>>>>>>>>>>>>>> PERSISTED
>>>>>>>>>>>>>>>>>>>> ) WITH (
>>>>>>>>>>>>>>>>>>>>        ...
>>>>>>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> This way we would avoid confusion at all and
>>>>>> can easily map
>>>>>>>>>>>>>> columns
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> metadata columns. The disadvantage is that
>>>>>> users cannot call
>>>>>>>>>> UDFs
>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>> parse timestamps. This would need to be done
>>>>>> in a real
>>>>>>>>>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>> column.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> I'm happy about better alternatives.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On 08.09.20 15:37, Leonard Xu wrote:
>>>>>>>>>>>>>>>>>>>>> HI, Timo
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Thanks for driving this FLIP.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Sorry but I have a concern about Writing
>>>>>> metadata via
>>>>>>>>>>>>>>>> DynamicTableSink
>>>>>>>>>>>>>>>>>>>> section:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> CREATE TABLE kafka_table (
>>>>>>>>>>>>>>>>>>>>>       id BIGINT,
>>>>>>>>>>>>>>>>>>>>>       name STRING,
>>>>>>>>>>>>>>>>>>>>>       timestamp AS
>>>>>> CAST(SYSTEM_METADATA("timestamp") AS
>>>>>>>>>>>>>>>>>>>>> BIGINT)
>>>>>>>>>>>>>>>>>> PERSISTED,
>>>>>>>>>>>>>>>>>>>>>       headers AS
>>>>>> CAST(SYSTEM_METADATA("headers") AS
>>>>>>>>>>>>>>>>>>>>> MAP<STRING,
>>>>>>>>>>>>>>>> BYTES>)
>>>>>>>>>>>>>>>>>>>> PERSISTED
>>>>>>>>>>>>>>>>>>>>> ) WITH (
>>>>>>>>>>>>>>>>>>>>>       ...
>>>>>>>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>>>>> An insert statement could look like:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> INSERT INTO kafka_table VALUES (
>>>>>>>>>>>>>>>>>>>>>       (1, "ABC", 1599133672, MAP('checksum',
>>>>>>>>>>>>>> computeChecksum(...)))
>>>>>>>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> The proposed INERT syntax does not make
>>>>>> sense to me,
>>>>>>>>>>>>>>>>>>>>> because it
>>>>>>>>>>>>>>>>>> contains
>>>>>>>>>>>>>>>>>>>> computed(generated) column.
>>>>>>>>>>>>>>>>>>>>> Both SQL server and Postgresql do not allow
>>>>>> to insert
>>>>>>>>>>>>>>>>>>>>> value to
>>>>>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>>>>>> columns even they are persisted, this boke
>>>>>> the generated
>>>>>>>>>>>>>>>>>>>> column
>>>>>>>>>>>>>>>>>> semantics
>>>>>>>>>>>>>>>>>>>> and may confuse user much.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> For SQL server computed column[1]:
>>>>>>>>>>>>>>>>>>>>>> column_name AS computed_column_expression
>>>>>> [ PERSISTED [ NOT
>>>>>>>>>>>>>> NULL ]
>>>>>>>>>>>>>>>>>> ]...
>>>>>>>>>>>>>>>>>>>>>> NOTE: A computed column cannot be the
>>>>>> target of an INSERT or
>>>>>>>>>>>>>>> UPDATE
>>>>>>>>>>>>>>>>>>>> statement.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> For Postgresql generated column[2]:
>>>>>>>>>>>>>>>>>>>>>>      height_in numeric GENERATED ALWAYS
>>>>>> AS (height_cm /
>>>>>>>>>>>>>>>>>>>>>> 2.54)
>>>>>>>>>>>>>>> STORED
>>>>>>>>>>>>>>>>>>>>>> NOTE: A generated column cannot be
>>>>>> written to directly. In
>>>>>>>>>>>>>> INSERT
>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>> UPDATE commands, a value cannot be specified
>>>>>> for a generated
>>>>>>>>>>>>>> column,
>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>> the keyword DEFAULT may be specified.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> It shouldn't be allowed to set/update value
>>>>>> for generated
>>>>>>>>>> column
>>>>>>>>>>>>>>>> after
>>>>>>>>>>>>>>>>>>>> lookup the SQL 2016:
>>>>>>>>>>>>>>>>>>>>>> <insert statement> ::=
>>>>>>>>>>>>>>>>>>>>>> INSERT INTO <insertion target> <insert
>>>>>> columns and source>
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> If <contextually typed table value
>>>>>> constructor> CTTVC is
>>>>>>>>>>>>>>> specified,
>>>>>>>>>>>>>>>>>>>> then every <contextually typed row
>>>>>>>>>>>>>>>>>>>>>> value constructor element> simply
>>>>>> contained in CTTVC whose
>>>>>>>>>>>>>>>>>> positionally
>>>>>>>>>>>>>>>>>>>> corresponding <column name>
>>>>>>>>>>>>>>>>>>>>>> in <insert column list> references a
>>>>>> column of which some
>>>>>>>>>>>>>>> underlying
>>>>>>>>>>>>>>>>>>>> column is a generated column shall
>>>>>>>>>>>>>>>>>>>>>> be a <default specification>.
>>>>>>>>>>>>>>>>>>>>>> A <default specification> specifies the
>>>>>> default value of
>>>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>> associated item.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>> 
>>>>>> https://www.postgresql.org/docs/12/ddl-generated-columns.html
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>> 
>>>>>> https://www.postgresql.org/docs/12/ddl-generated-columns.html>
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 在 2020年9月8日,17:31,Timo Walther <
>>>>>> twal...@apache.org>
>>>>>>>>>>>>>>>>>>>>>> 写道:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Hi Jark,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> according to Flink's and Calcite's
>>>>>> casting definition in
>>>>>>>>>> [1][2]
>>>>>>>>>>>>>>>>>>>> TIMESTAMP WITH LOCAL TIME ZONE should be
>>>>>> castable from BIGINT.
>>>>>>>>>> If
>>>>>>>>>>>>>>> not,
>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>> will make it possible ;-)
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> I'm aware of
>>>>>> DeserializationSchema.getProducedType but I
>>>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> this method is actually misplaced. The type
>>>>>> should rather be
>>>>>>>>>>>>>> passed
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> source itself.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> For our Kafka SQL source, we will also
>>>>>> not use this method
>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> Kafka source will add own metadata in
>>>>>> addition to the
>>>>>>>>>>>>>>>>>>>> DeserializationSchema. So
>>>>>>>>>>>>>>>>>>>> DeserializationSchema.getProducedType
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>> never
>>>>>>>>>>>>>>>>>>>> be read.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> For now I suggest to leave out the
>>>>>> `DataType` from
>>>>>>>>>>>>>>>>>>>> DecodingFormat.applyReadableMetadata. Also
>>>>>> because the
>>>>>>>>>>>>>>>>>>>> format's
>>>>>>>>>>>>>>>> physical
>>>>>>>>>>>>>>>>>>>> type is passed later in
>>>>>> `createRuntimeDecoder`. If
>>>>>>>>>>>>>>>>>>>> necessary, it
>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>> computed manually by consumedType + metadata
>>>>>> types. We will
>>>>>>>>>>>>>> provide
>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>> metadata utility class for that.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200
>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254
>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> On 08.09.20 10:52, Jark Wu wrote:
>>>>>>>>>>>>>>>>>>>>>>> Hi Timo,
>>>>>>>>>>>>>>>>>>>>>>> The updated CAST SYSTEM_METADATA
>>>>>> behavior sounds good to
>>>>>>>>>>>>>>>>>>>>>>> me.
>>>>>>>>>> I
>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>> noticed
>>>>>>>>>>>>>>>>>>>>>>> that a BIGINT can't be converted to
>>>>>> "TIMESTAMP(3) WITH
>>>>>>>>>>>>>>>>>>>>>>> LOCAL
>>>>>>>>>>>>>> TIME
>>>>>>>>>>>>>>>>>>>> ZONE".
>>>>>>>>>>>>>>>>>>>>>>> So maybe we need to support this, or
>>>>>> use "TIMESTAMP(3) WITH
>>>>>>>>>>>>>> LOCAL
>>>>>>>>>>>>>>>>>> TIME
>>>>>>>>>>>>>>>>>>>>>>> ZONE" as the defined type of Kafka
>>>>>> timestamp? I think this
>>>>>>>>>>>>>> makes
>>>>>>>>>>>>>>>>>> sense,
>>>>>>>>>>>>>>>>>>>>>>> because it represents the milli-seconds
>>>>>> since epoch.
>>>>>>>>>>>>>>>>>>>>>>> Regarding "DeserializationSchema
>>>>>> doesn't need TypeInfo", I
>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>> so.
>>>>>>>>>>>>>>>>>>>>>>> The DeserializationSchema implements
>>>>>> ResultTypeQueryable,
>>>>>>>>>> thus
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> implementation needs to return an
>>>>>> output TypeInfo.
>>>>>>>>>>>>>>>>>>>>>>> Besides, FlinkKafkaConsumer also
>>>>>>>>>>>>>>>>>>>>>>> calls
>>>>>> DeserializationSchema.getProducedType as the produced
>>>>>>>>>>>>>> type
>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> source function [1].
>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>> Jark
>>>>>>>>>>>>>>>>>>>>>>> [1]:
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066
>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> On Tue, 8 Sep 2020 at 16:35, Timo
>>>>>> Walther <
>>>>>>>>>> twal...@apache.org>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> I updated the FLIP again and hope
>>>>>> that I could address the
>>>>>>>>>>>>>>>> mentioned
>>>>>>>>>>>>>>>>>>>>>>>> concerns.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> @Leonard: Thanks for the explanation.
>>>>>> I wasn't aware that
>>>>>>>>>>>>>> ts_ms
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>> source.ts_ms have different
>>>>>> semantics. I updated the FLIP
>>>>>>>>>> and
>>>>>>>>>>>>>>>> expose
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> most commonly used properties
>>>>>> separately. So frequently
>>>>>>>>>>>>>>>>>>>>>>>> used
>>>>>>>>>>>>>>>>>>>> properties
>>>>>>>>>>>>>>>>>>>>>>>> are not hidden in the MAP anymore:
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> debezium-json.ingestion-timestamp
>>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.timestamp
>>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.database
>>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.schema
>>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.table
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> However, since other properties
>>>>>> depend on the used
>>>>>>>>>>>>>>>> connector/vendor,
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> remaining options are stored in:
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.properties
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> And accessed with:
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>> CAST(SYSTEM_METADATA('debezium-json.source.properties') AS
>>>>>>>>>>>>>>>>>> MAP<STRING,
>>>>>>>>>>>>>>>>>>>>>>>> STRING>)['table']
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Otherwise it is not possible to
>>>>>> figure out the value and
>>>>>>>>>>>>>> column
>>>>>>>>>>>>>>>> type
>>>>>>>>>>>>>>>>>>>>>>>> during validation.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> @Jark: You convinced me in relaxing
>>>>>> the CAST
>>>>>>>>>>>>>>>>>>>>>>>> constraints. I
>>>>>>>>>>>>>>> added
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>> dedicacated sub-section to the FLIP:
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> For making the use of SYSTEM_METADATA
>>>>>> easier and avoid
>>>>>>>>>> nested
>>>>>>>>>>>>>>>>>> casting
>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>> allow explicit casting to a target
>>>>>> data type:
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> rowtime AS
>>>>>> CAST(SYSTEM_METADATA("timestamp") AS
>>>>>>>>>>>>>>>>>>>>>>>> TIMESTAMP(3)
>>>>>>>>>>>>>>> WITH
>>>>>>>>>>>>>>>>>>>> LOCAL
>>>>>>>>>>>>>>>>>>>>>>>> TIME ZONE)
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> A connector still produces and
>>>>>> consumes the data type
>>>>>>>>>> returned
>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>>>>>>> `listMetadata()`. The planner will
>>>>>> insert necessary
>>>>>>>>>>>>>>>>>>>>>>>> explicit
>>>>>>>>>>>>>>>> casts.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> In any case, the user must provide a
>>>>>> CAST such that the
>>>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>>>>>> column
>>>>>>>>>>>>>>>>>>>>>>>> receives a valid data type when
>>>>>> constructing the table
>>>>>>>>>> schema.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> "I don't see a reason why
>>>>>>>>>>>>>> `DecodingFormat#applyReadableMetadata`
>>>>>>>>>>>>>>>>>>>> needs a
>>>>>>>>>>>>>>>>>>>>>>>> DataType argument."
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Correct he DeserializationSchema
>>>>>> doesn't need TypeInfo, it
>>>>>>>>>> is
>>>>>>>>>>>>>>>> always
>>>>>>>>>>>>>>>>>>>>>>>> executed locally. It is the source
>>>>>> that needs TypeInfo for
>>>>>>>>>>>>>>>>>> serializing
>>>>>>>>>>>>>>>>>>>>>>>> the record to the next operator. And
>>>>>> that's this is
>>>>>>>>>>>>>>>>>>>>>>>> what we
>>>>>>>>>>>>>>>> provide.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> @Danny:
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> “SYSTEM_METADATA("offset")` returns
>>>>>> the NULL type by
>>>>>>>>>> default”
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> We can also use some other means to
>>>>>> represent an UNKNOWN
>>>>>>>>>> data
>>>>>>>>>>>>>>>> type.
>>>>>>>>>>>>>>>>>> In
>>>>>>>>>>>>>>>>>>>>>>>> the Flink type system, we use the
>>>>>> NullType for it. The
>>>>>>>>>>>>>> important
>>>>>>>>>>>>>>>>>> part
>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>> that the final data type is known for
>>>>>> the entire computed
>>>>>>>>>>>>>>> column.
>>>>>>>>>>>>>>>>>> As I
>>>>>>>>>>>>>>>>>>>>>>>> mentioned before, I would avoid the
>>>>>> suggested option b)
>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>> similar to your suggestion. The CAST
>>>>>> should be enough and
>>>>>>>>>>>>>> allows
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>> complex expressions in the computed
>>>>>> column. Option b)
>>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>>>> parser
>>>>>>>>>>>>>>>>>>>>>>>> changes.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> On 08.09.20 06:21, Leonard Xu wrote:
>>>>>>>>>>>>>>>>>>>>>>>>> Hi, Timo
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for you explanation and
>>>>>> update,  I have only one
>>>>>>>>>>>>>>> question
>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>> the latest FLIP.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> About the MAP<STRING, STRING>
>>>>>> DataType of key
>>>>>>>>>>>>>>>>>>>> 'debezium-json.source', if
>>>>>>>>>>>>>>>>>>>>>>>> user want to use the table name
>>>>>> metadata, they need to
>>>>>>>>>> write:
>>>>>>>>>>>>>>>>>>>>>>>>> tableName STRING AS
>>>>>>>>>>>>>> CAST(SYSTEM_METADATA('debeuim-json.source')
>>>>>>>>>>>>>>>> AS
>>>>>>>>>>>>>>>>>>>>>>>> MAP<STRING, STRING>)['table']
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> the expression is a little complex
>>>>>> for user, Could we
>>>>>>>>>>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>>>>>>>>>> necessary metas with simple DataType
>>>>>> as following?
>>>>>>>>>>>>>>>>>>>>>>>>> tableName STRING AS
>>>>>>>>>>>>>>>>>>>> 
>>>>>> CAST(SYSTEM_METADATA('debeuim-json.source.table') AS
>>>>>>>>>>>>>>>>>>>>>>>> STRING),
>>>>>>>>>>>>>>>>>>>>>>>>> transactionTime LONG AS
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>> CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS
>>>>>>>>>> BIGINT),
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> In this way, we can simplify the
>>>>>> expression, the mainly
>>>>>>>>>> used
>>>>>>>>>>>>>>>>>>>> metadata in
>>>>>>>>>>>>>>>>>>>>>>>> changelog format may include
>>>>>>>>>>>>>>>>>>>> 'database','table','source.ts_ms','ts_ms' from
>>>>>>>>>>>>>>>>>>>>>>>> my side,
>>>>>>>>>>>>>>>>>>>>>>>>> maybe we could only support them at
>>>>>> first version.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Both Debezium and Canal have above
>>>>>> four metadata, and I‘m
>>>>>>>>>>>>>>> willing
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> take some subtasks in next
>>>>>> development if necessary.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Debezium:
>>>>>>>>>>>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>>>>>>>>>>>        "before": null,
>>>>>>>>>>>>>>>>>>>>>>>>>        "after": {  "id":
>>>>>> 101,"name": "scooter"},
>>>>>>>>>>>>>>>>>>>>>>>>>        "source": {
>>>>>>>>>>>>>>>>>>>>>>>>>          "db":
>>>>>> "inventory",                  # 1.
>>>>>>>>>>>>>>>>>>>>>>>>> database
>>>>>>>>>>>>>> name
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> changelog belongs to.
>>>>>>>>>>>>>>>>>>>>>>>>>          "table":
>>>>>> "products",                # 2.
>>>>>>>>>>>>>>>>>>>>>>>>> table name
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> changelog
>>>>>>>>>>>>>>>>>>>>>>>> belongs to.
>>>>>>>>>>>>>>>>>>>>>>>>>          "ts_ms":
>>>>>> 1589355504100,             # 3.
>>>>>>>>>>>>>>>>>>>>>>>>> timestamp
>>>>>>>>>>>> of
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> change
>>>>>>>>>>>>>>>>>>>>>>>> happened in database system, i.e.:
>>>>>> transaction time in
>>>>>>>>>>>>>> database.
>>>>>>>>>>>>>>>>>>>>>>>>>          "connector": "mysql",
>>>>>>>>>>>>>>>>>>>>>>>>>          ….
>>>>>>>>>>>>>>>>>>>>>>>>>        },
>>>>>>>>>>>>>>>>>>>>>>>>>        "ts_ms":
>>>>>> 1589355606100,              # 4.
>>>>>>>>>>>>>>>>>>>>>>>>> timestamp
>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> debezium
>>>>>>>>>>>>>>>>>>>>>>>> processed the changelog.
>>>>>>>>>>>>>>>>>>>>>>>>>        "op": "c",
>>>>>>>>>>>>>>>>>>>>>>>>>        "transaction": null
>>>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Canal:
>>>>>>>>>>>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>>>>>>>>>>>        "data": [{  "id": "102",
>>>>>> "name": "car battery" }],
>>>>>>>>>>>>>>>>>>>>>>>>>        "database":
>>>>>> "inventory",      # 1. database
>>>>>>>>>>>>>>>>>>>>>>>>> name the
>>>>>>>>>>>>>>>> changelog
>>>>>>>>>>>>>>>>>>>>>>>> belongs to.
>>>>>>>>>>>>>>>>>>>>>>>>>        "table":
>>>>>> "products",          # 2. table name the
>>>>>>>>>>>>>>> changelog
>>>>>>>>>>>>>>>>>>>> belongs
>>>>>>>>>>>>>>>>>>>>>>>> to.
>>>>>>>>>>>>>>>>>>>>>>>>>        "es":
>>>>>> 1589374013000,          # 3. execution
>>>>>>>>>>>>>>>>>>>>>>>>> time of
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> change
>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>> database system, i.e.: transaction
>>>>>> time in database.
>>>>>>>>>>>>>>>>>>>>>>>>>        "ts":
>>>>>> 1589374013680,          # 4. timestamp
>>>>>>>>>>>>>>>>>>>>>>>>> when the
>>>>>>>>>>>>>>>> cannal
>>>>>>>>>>>>>>>>>>>>>>>> processed the changelog.
>>>>>>>>>>>>>>>>>>>>>>>>>        "isDdl": false,
>>>>>>>>>>>>>>>>>>>>>>>>>        "mysqlType": {},
>>>>>>>>>>>>>>>>>>>>>>>>>        ....
>>>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Best
>>>>>>>>>>>>>>>>>>>>>>>>> Leonard
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 在 2020年9月8日,11:57,Danny Chan
>>>>>>>>>>>>>>>>>>>>>>>>>> <yuzhao....@gmail.com> 写道:
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Timo ~
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> The FLIP was already in pretty
>>>>>> good shape, I have only 2
>>>>>>>>>>>>>>>> questions
>>>>>>>>>>>>>>>>>>>> here:
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 1.
>>>>>> “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a
>>>>>>>>>>>>>> valid
>>>>>>>>>>>>>>>>>>>> read-only
>>>>>>>>>>>>>>>>>>>>>>>> computed column for Kafka and can be
>>>>>> extracted by the
>>>>>>>>>>>>>> planner.”
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> What is the pros we follow the
>>>>>> SQL-SERVER syntax here ?
>>>>>>>>>>>>>>> Usually
>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>>> expression return type can be
>>>>>> inferred automatically.
>>>>>>>>>>>>>>>>>>>>>>>> But I
>>>>>>>>>>>>>>> guess
>>>>>>>>>>>>>>>>>>>>>>>> SQL-SERVER does not have function
>>>>>> like SYSTEM_METADATA
>>>>>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>> actually
>>>>>>>>>>>>>>>>>>>> does
>>>>>>>>>>>>>>>>>>>>>>>> not have a specific return type.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> And why not use the Oracle or
>>>>>> MySQL syntax there ?
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> column_name [datatype] [GENERATED
>>>>>> ALWAYS] AS
>>>>>>>>>>>>>>>>>>>>>>>>>> (expression)
>>>>>>>>>>>>>>>>>> [VIRTUAL]
>>>>>>>>>>>>>>>>>>>>>>>>>> Which is more straight-forward.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 2. “SYSTEM_METADATA("offset")`
>>>>>> returns the NULL type by
>>>>>>>>>>>>>>> default”
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> The default type should not be
>>>>>> NULL because only NULL
>>>>>>>>>>>>>> literal
>>>>>>>>>>>>>>>> does
>>>>>>>>>>>>>>>>>>>>>>>> that. Usually we use ANY as the type
>>>>>> if we do not know the
>>>>>>>>>>>>>>>> specific
>>>>>>>>>>>>>>>>>>>> type in
>>>>>>>>>>>>>>>>>>>>>>>> the SQL context. ANY means the
>>>>>> physical value can be any
>>>>>>>>>> java
>>>>>>>>>>>>>>>>>> object.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>> 
>>>>>> https://oracle-base.com/articles/11g/virtual-columns-11gr1
>>>>>>>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>> https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html
>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>> Danny Chan
>>>>>>>>>>>>>>>>>>>>>>>>>> 在 2020年9月4日 +0800 PM4:48,Timo
>>>>>> Walther
>>>>>>>>>>>>>>>>>>>>>>>>>> <twal...@apache.org
>>>>>>>>>>>>>>>> ,写道:
>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> I completely reworked FLIP-107.
>>>>>> It now covers the full
>>>>>>>>>>>>>> story
>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> read
>>>>>>>>>>>>>>>>>>>>>>>>>>> and write metadata from/to
>>>>>> connectors and formats. It
>>>>>>>>>>>>>>> considers
>>>>>>>>>>>>>>>>>>>> all of
>>>>>>>>>>>>>>>>>>>>>>>>>>> the latest FLIPs, namely
>>>>>> FLIP-95, FLIP-132 and
>>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP-122.
>>>>>>>>>> It
>>>>>>>>>>>>>>>>>>>> introduces
>>>>>>>>>>>>>>>>>>>>>>>>>>> the concept of PERSISTED
>>>>>> computed columns and leaves
>>>>>>>>>>>>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>>>>> partitioning
>>>>>>>>>>>>>>>>>>>>>>>>>>> for now.
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your
>>>>>> feedback.
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> On 04.03.20 09:45, Kurt Young
>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry, forgot one question.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. Can we make the
>>>>>> value.fields-include more
>>>>>>>>>>>>>>>>>>>>>>>>>>>> orthogonal?
>>>>>>>>>>>>>>> Like
>>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>>>> specify it as "EXCEPT_KEY,
>>>>>> EXCEPT_TIMESTAMP".
>>>>>>>>>>>>>>>>>>>>>>>>>>>> With current EXCEPT_KEY and
>>>>>> EXCEPT_KEY_TIMESTAMP,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>> config to
>>>>>>>>>>>>>>>>>>>>>>>>>>>> just ignore timestamp but
>>>>>> keep key.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kurt
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 4, 2020 at 4:42
>>>>>> PM Kurt Young <
>>>>>>>>>>>>>> ykt...@gmail.com
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Dawid,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have a couple of
>>>>>> questions around key fields,
>>>>>>>>>> actually
>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> other questions but want to
>>>>>> be focused on key fields
>>>>>>>>>>>>>> first.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. I don't fully understand
>>>>>> the usage of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "key.fields".
>>>>>>>>>> Is
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>> option only
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> valid during write
>>>>>> operation? Because for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reading, I can't imagine
>>>>>> how such options can be
>>>>>>>>>>>>>> applied. I
>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>> expect
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that there might be a
>>>>>> SYSTEM_METADATA("key")
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to read and assign the key
>>>>>> to a normal field?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. If "key.fields" is only
>>>>>> valid in write
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> operation, I
>>>>>>>>>>>>>> want
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> propose we
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can simplify the options to
>>>>>> not introducing
>>>>>>>>>>>>>> key.format.type
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> other related options. I
>>>>>> think a single "key.field"
>>>>>>>>>> (not
>>>>>>>>>>>>>>>>>> fields)
>>>>>>>>>>>>>>>>>>>>>>>> would be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> enough, users can use UDF
>>>>>> to calculate whatever key
>>>>>>>>>> they
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> want before sink.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Also I don't want to
>>>>>> introduce "value.format.type"
>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "value.format.xxx" with the
>>>>>> "value" prefix. Not every
>>>>>>>>>>>>>>>> connector
>>>>>>>>>>>>>>>>>>>> has a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> concept
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of key and values. The old
>>>>>> parameter "format.type"
>>>>>>>>>>>>>> already
>>>>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>>>>>>>>>> enough to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kurt
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Mar 3, 2020 at
>>>>>> 10:40 PM Jark Wu <
>>>>>>>>>>>>>> imj...@gmail.com>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dawid,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have two more questions.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> SupportsMetadata
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Introducing
>>>>>> SupportsMetadata sounds good to me.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> But I
>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>> questions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> regarding to this
>>>>>> interface.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) How do the source know
>>>>>> what the expected return
>>>>>>>>>> type
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>> each
>>>>>>>>>>>>>>>>>>>>>>>> metadata?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Where to put the
>>>>>> metadata fields? Append to the
>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>>>> physical
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fields?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If yes, I would suggest
>>>>>> to change the signature to
>>>>>>>>>>>>>>>>>> `TableSource
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>> appendMetadataFields(String[] metadataNames,
>>>>>>>>>> DataType[]
>>>>>>>>>>>>>>>>>>>>>>>> metadataTypes)`
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>> SYSTEM_METADATA("partition")
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Can SYSTEM_METADATA()
>>>>>> function be used nested in a
>>>>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>>>>>> column
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> expression? If yes, how
>>>>>> to specify the return
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> type of
>>>>>>>>>>>>>>>>>>>>>>>> SYSTEM_METADATA?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jark
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, 3 Mar 2020 at
>>>>>> 17:06, Dawid Wysakowicz <
>>>>>>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. I thought a bit more
>>>>>> on how the source would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> emit
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> columns
>>>>>>>>>>>>>>>>>>>>>>>> and I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> now see its not exactly
>>>>>> the same as regular
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns.
>>>>>>>>>> I
>>>>>>>>>>>>>>> see
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> elaborate a bit more on
>>>>>> that in the FLIP as you
>>>>>>>>>> asked,
>>>>>>>>>>>>>>>> Jark.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I do agree mostly with
>>>>>> Danny on how we should do
>>>>>>>>>> that.
>>>>>>>>>>>>>>> One
>>>>>>>>>>>>>>>>>>>>>>>> additional
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> things I would
>>>>>> introduce is an
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> interface
>>>>>> SupportsMetadata {
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> boolean
>>>>>> supportsMetadata(Set<String>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadataFields);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> TableSource
>>>>>> generateMetadataFields(Set<String>
>>>>>>>>>>>>>>>>>> metadataFields);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This way the source
>>>>>> would have to declare/emit only
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> requested
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadata fields. In
>>>>>> order not to clash with user
>>>>>>>>>>>>>> defined
>>>>>>>>>>>>>>>>>>>> fields.
>>>>>>>>>>>>>>>>>>>>>>>> When
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> emitting the metadata
>>>>>> field I would prepend the
>>>>>>>>>> column
>>>>>>>>>>>>>>> name
>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>> __system_{property_name}. Therefore when requested
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>> SYSTEM_METADATA("partition") the source would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> append
>>>>>>>>>> a
>>>>>>>>>>>>>>>> field
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> __system_partition to
>>>>>> the schema. This would be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> never
>>>>>>>>>>>>>>>> visible
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> user as it would be
>>>>>> used only for the subsequent
>>>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>>>>>> columns.
>>>>>>>>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that makes sense to
>>>>>> you, I will update the FLIP
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>> description.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. CAST vs explicit
>>>>>> type in computed columns
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Here I agree with
>>>>>> Danny. It is also the current
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> proposal.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Partitioning on
>>>>>> computed column vs function
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Here I also agree with
>>>>>> Danny. I also think those
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>> orthogonal. I
>>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> leave out the STORED
>>>>>> computed columns out of the
>>>>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>> don't see
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> how do they relate to
>>>>>> the partitioning. I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> already put
>>>>>>>>>>>>>>> both
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> those
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cases in the document.
>>>>>> We can either partition on a
>>>>>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>>>>>>>>>> column or
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use a udf in a
>>>>>> partioned by clause. I am fine with
>>>>>>>>>>>>>>> leaving
>>>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioning by udf in
>>>>>> the first version if you
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> still
>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> concerns.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for your question
>>>>>> Danny. It depends which
>>>>>>>>>>>>>> partitioning
>>>>>>>>>>>>>>>>>>>> strategy
>>>>>>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For the HASH
>>>>>> partitioning strategy I thought it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>> work
>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> explained. It would be
>>>>>> N = MOD(expr, num). I am not
>>>>>>>>>>>>>> sure
>>>>>>>>>>>>>>>>>>>> though if
>>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should introduce the
>>>>>> PARTITIONS clause. Usually
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>>>> does
>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>> own
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data and the partitions
>>>>>> are already an intrinsic
>>>>>>>>>>>>>> property
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> underlying source e.g.
>>>>>> for kafka we do not create
>>>>>>>>>>>>>> topics,
>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> describe pre-existing
>>>>>> pre-partitioned topic.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. timestamp vs
>>>>>> timestamp.field vs
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.field vs
>>>>>>>>>>>>>> ...
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am fine with changing
>>>>>> it to timestamp.field to be
>>>>>>>>>>>>>>>>>> consistent
>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> other value.fields and
>>>>>> key.fields. Actually that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> was
>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>>>>>>> initial
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposal in a first
>>>>>> draft I prepared. I changed it
>>>>>>>>>>>>>>>> afterwards
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> shorten
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the key.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dawid
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 03/03/2020 09:00,
>>>>>> Danny Chan wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dawid for
>>>>>> bringing up this discussion, I
>>>>>>>>>> think
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>> useful
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> feature ~
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About how the
>>>>>> metadata outputs from source
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think it is
>>>>>> completely orthogonal, computed
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> column
>>>>>>>>>>>>>>> push
>>>>>>>>>>>>>>>>>>>> down is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> another topic, this
>>>>>> should not be a blocker but a
>>>>>>>>>>>>>>>> promotion,
>>>>>>>>>>>>>>>>>>>> if we
>>>>>>>>>>>>>>>>>>>>>>>> do
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have any filters on the
>>>>>> computed column, there
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is no
>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> do any
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> pushings; the source
>>>>>> node just emit the complete
>>>>>>>>>> record
>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>> full
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with the declared
>>>>>> physical schema, then when
>>>>>>>>>> generating
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> virtual
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns, we would
>>>>>> extract the metadata info and
>>>>>>>>>> output
>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>> full
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns(with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> full schema).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About the type of
>>>>>> metadata column
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Personally i prefer
>>>>>> explicit type instead of CAST,
>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>> symantic
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> equivalent though,
>>>>>> explict type is more
>>>>>>>>>>>>>> straight-forward
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> we can
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> declare
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the nullable attribute
>>>>>> there.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About option A:
>>>>>> partitioning based on acomputed
>>>>>>>>>> column
>>>>>>>>>>>>>>> VS
>>>>>>>>>>>>>>>>>>>> option
>>>>>>>>>>>>>>>>>>>>>>>> B:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioning with just
>>>>>> a function
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>      From the FLIP,
>>>>>> it seems that B's
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioning is
>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>> strategy
>>>>>>>>>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> writing data, the
>>>>>> partiton column is not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> included in
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>> schema,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's just useless when
>>>>>> reading from that.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Compared to A, we
>>>>>> do not need to generate the
>>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>>>>> column
>>>>>>>>>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> selecting from the
>>>>>> table(but insert into)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - For A we can also
>>>>>> mark the column as STORED when
>>>>>>>>>> we
>>>>>>>>>>>>>>> want
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> persist
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So in my opition they
>>>>>> are orthogonal, we can
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>> both, i
>>>>>>>>>>>>>>>>>>>> saw
>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> MySQL/Oracle[1][2]
>>>>>> would suggest to also define the
>>>>>>>>>>>>>>>>>> PARTITIONS
>>>>>>>>>>>>>>>>>>>>>>>> num, and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions are managed
>>>>>> under a "tablenamespace",
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> record is stored is
>>>>>> partition number N, where N =
>>>>>>>>>>>>>>> MOD(expr,
>>>>>>>>>>>>>>>>>>>> num),
>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> your
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> design, which partiton
>>>>>> the record would persist ?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>> https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270
>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Danny Chan
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 在 2020年3月2日 +0800
>>>>>> PM6:16,Dawid Wysakowicz <
>>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ,写道:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jark,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ad. 2 I added a
>>>>>> section to discuss relation to
>>>>>>>>>>>>>> FLIP-63
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ad. 3 Yes, I also
>>>>>> tried to somewhat keep
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> hierarchy
>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> properties.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore you have the
>>>>>> key.format.type.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I also considered
>>>>>> exactly what you are suggesting
>>>>>>>>>>>>>>>>>> (prefixing
>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector or kafka). I
>>>>>> should've put that into an
>>>>>>>>>>>>>>>>>>>> Option/Rejected
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> alternatives.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I agree timestamp,
>>>>>> key.*, value.* are connector
>>>>>>>>>>>>>>>> properties.
>>>>>>>>>>>>>>>>>>>> Why I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wanted to suggest not
>>>>>> adding that prefix in the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> first
>>>>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> actually all the
>>>>>> properties in the WITH section are
>>>>>>>>>>>>>>>> connector
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> properties.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Even format is in the
>>>>>> end a connector property as
>>>>>>>>>> some
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> sources
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not have a format, imo.
>>>>>> The benefit of not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> adding the
>>>>>>>>>>>>>>>> prefix
>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>> that it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> makes the keys a bit
>>>>>> shorter. Imagine prefixing all
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> properties
>>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector (or if we go
>>>>>> with FLINK-12557:
>>>>>>>>>>>>>> elasticsearch):
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>> elasticsearch.key.format.type: csv
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>> elasticsearch.key.format.field: ....
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>> elasticsearch.key.format.delimiter: ....
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>> elasticsearch.key.format.*: ....
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am fine with
>>>>>> doing it though if this is a
>>>>>>>>>> preferred
>>>>>>>>>>>>>>>>>>>> approach
>>>>>>>>>>>>>>>>>>>>>>>> in the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> community.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ad in-line comments:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I forgot to update
>>>>>> the `value.fields.include`
>>>>>>>>>>>>>> property.
>>>>>>>>>>>>>>>> It
>>>>>>>>>>>>>>>>>>>>>>>> should be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> value.fields-include.
>>>>>> Which I think you also
>>>>>>>>>> suggested
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> comment,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> right?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the cast vs
>>>>>> declaring output type of
>>>>>>>>>> computed
>>>>>>>>>>>>>>>>>> column.
>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's better not to use
>>>>>> CAST, but declare a type
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of an
>>>>>>>>>>>>>>>>>>>> expression
>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> later
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on infer the output
>>>>>> type of SYSTEM_METADATA. The
>>>>>>>>>> reason
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> way
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it will be easier to
>>>>>> implement e.g. filter push
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> downs
>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>>> working
>>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> native types of the
>>>>>> source, e.g. in case of Kafka's
>>>>>>>>>>>>>>>> offset, i
>>>>>>>>>>>>>>>>>>>>>>>> think it's
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> better to pushdown long
>>>>>> rather than string. This
>>>>>>>>>> could
>>>>>>>>>>>>>>> let
>>>>>>>>>>>>>>>> us
>>>>>>>>>>>>>>>>>>>> push
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> expression like e.g.
>>>>>> offset > 12345 & offset <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 59382.
>>>>>>>>>>>>>>>>>>>> Otherwise we
>>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have to push down
>>>>>> cast(offset, long) > 12345 &&
>>>>>>>>>>>>>>>> cast(offset,
>>>>>>>>>>>>>>>>>>>> long)
>>>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 59382.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Moreover I think we
>>>>>> need to introduce the type for
>>>>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>>>>>> columns
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> anyway
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to support functions
>>>>>> that infer output type
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> based on
>>>>>>>>>>>>>>>> expected
>>>>>>>>>>>>>>>>>>>>>>>> return
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> type.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the computed
>>>>>> column push down. Yes,
>>>>>>>>>>>>>>>> SYSTEM_METADATA
>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to be pushed down to
>>>>>> the source. If it is not
>>>>>>>>>> possible
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> planner
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fail. As far as I know
>>>>>> computed columns push down
>>>>>>>>>> will
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>> part
>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>> source
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rework, won't it? ;)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the
>>>>>> persisted computed column. I think
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it is
>>>>>>>>>>>>>>>>>>>> completely
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> orthogonal. In my
>>>>>> current proposal you can also
>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> column. The difference
>>>>>> between using a udf in
>>>>>>>>>>>>>> partitioned
>>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>> vs
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioned
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by a computed column is
>>>>>> that when you partition
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by a
>>>>>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>>>>>>>>>> column
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> column must be also
>>>>>> computed when reading the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table.
>>>>>>>>>> If
>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>> use a
>>>>>>>>>>>>>>>>>>>>>>>> udf in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the partitioned by, the
>>>>>> expression is computed only
>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>>> inserting
>>>>>>>>>>>>>>>>>>>>>>>> into
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hope this answers
>>>>>> some of your questions. Looking
>>>>>>>>>>>>>>> forward
>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>> further
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggestions.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dawid
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 02/03/2020
>>>>>> 05:18, Jark Wu wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dawid for
>>>>>> starting such a great
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>>>>> Reaing
>>>>>>>>>>>>>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key-part
>>>>>> information from source is an important
>>>>>>>>>>>>>>> feature
>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> streaming
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In general, I
>>>>>> agree with the proposal of the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I will leave my
>>>>>> thoughts and comments here:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) +1 to use
>>>>>> connector properties instead of
>>>>>>>>>>>>>>> introducing
>>>>>>>>>>>>>>>>>>>> HEADER
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> keyword as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the reason you
>>>>>> mentioned in the FLIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) we already
>>>>>> introduced PARTITIONED BY in
>>>>>>>>>> FLIP-63.
>>>>>>>>>>>>>>>> Maybe
>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> add a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> section to
>>>>>> explain what's the relationship
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>> them.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Do their concepts
>>>>>> conflict? Could INSERT
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PARTITION
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>> used
>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PARTITIONED table
>>>>>> in this FLIP?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Currently,
>>>>>> properties are hierarchical in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>>>> SQL.
>>>>>>>>>>>>>>>>>>>> Shall we
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> new introduced
>>>>>> properties more hierarchical?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For example,
>>>>>> "timestamp" =>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>> "connector.timestamp"?
>>>>>>>>>>>>>>>>>>>> (actually, I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> prefer
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "kafka.timestamp"
>>>>>> which is another
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> improvement for
>>>>>>>>>>>>>>>>>>>> properties
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLINK-12557)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> A single
>>>>>> "timestamp" in properties may mislead
>>>>>>>>>> users
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> field
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a rowtime
>>>>>> attribute.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I also left some
>>>>>> minor comments in the FLIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jark
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, 1 Mar
>>>>>> 2020 at 22:30, Dawid Wysakowicz <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I would like to
>>>>>> propose an improvement that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>> enable
>>>>>>>>>>>>>>>>>>>>>>>> reading
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns from
>>>>>> different parts of source records.
>>>>>>>>>>>>>>> Besides
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> main
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> payload
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> majority (if
>>>>>> not all of the sources) expose
>>>>>>>>>>>>>>> additional
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> information. It
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can be simply a
>>>>>> read-only metadata such as
>>>>>>>>>> offset,
>>>>>>>>>>>>>>>>>>>> ingestion
>>>>>>>>>>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> read and write
>>>>>> parts of the record that contain
>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> additionally
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> serve different
>>>>>> purposes (partitioning,
>>>>>>>>>> compaction
>>>>>>>>>>>>>>>> etc.),
>>>>>>>>>>>>>>>>>>>> e.g.
>>>>>>>>>>>>>>>>>>>>>>>> key
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> timestamp in
>>>>>> Kafka.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> We should make
>>>>>> it possible to read and write
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>> those
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> locations. In
>>>>>> this proposal I discuss reading
>>>>>>>>>>>>>>>>>> partitioning
>>>>>>>>>>>>>>>>>>>>>>>> data,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> completeness
>>>>>> this proposal discusses also the
>>>>>>>>>>>>>>>>>> partitioning
>>>>>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> writing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data out.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am looking
>>>>>> forward to your comments.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> You can access
>>>>>> the FLIP here:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode
>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dawid
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>> 
> 

Reply via email to