Hi, Timo Thanks for your explanation, it makes sense to me.
Best, Leonard >> Hi, Timo >> Thanks for the update >> I have a minor suggestion about the debezium metadata key, >> Could we use the original debezium key rather than import new key? >> debezium-json.schema => debezium-json.schema >> debezium-json.ingestion-timestamp => debezium-json.ts_ms >> debezium-json.source.database => debezium-json.source.db >> debezium-json.source.schema => debezium-json.source.schema >> debezium-json.source.table => debezium-json.source.table >> debezium-json.source.timestamp => debezium-json.source.ts_ms >> debezium-json.source.properties => debezium-json.source MAP<STRING, >> STRING> >> User who familiar with debezium will understand the key easier, and the >> key syntax is more json-path like. HDYT? >> The other part looks really good to me. >> Regards, >> Leonard >>> 在 2020年9月10日,18:26,Aljoscha Krettek <aljos...@apache.org> 写道: >>> >>> I've only been watching this from the sidelines but that latest proposal >>> looks very good to me! >>> >>> Aljoscha >>> >>> On 10.09.20 12:20, Kurt Young wrote: >>>> The new syntax looks good to me. >>>> Best, >>>> Kurt >>>> On Thu, Sep 10, 2020 at 5:57 PM Jark Wu <imj...@gmail.com> wrote: >>>>> Hi Timo, >>>>> >>>>> I have one minor suggestion. >>>>> Maybe the default data type of `timestamp` can be `TIMESTAMP(3) WITH >>>>> LOCAL TIME ZONE`, because this is the type that users want to use, this >>>>> can >>>>> avoid unnecessary casting. >>>>> Besides, currently, the bigint is casted to timestamp in seconds, so the >>>>> implicit cast may not work... >>>>> >>>>> I don't have other objections. But maybe we should wait for the >>>>> opinion from @Kurt for the new syntax. >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>> >>>>> On Thu, 10 Sep 2020 at 16:21, Danny Chan <yuzhao....@gmail.com> wrote: >>>>> >>>>>> Thanks for driving this Timo, +1 for voting ~ >>>>>> >>>>>> Best, >>>>>> Danny Chan >>>>>> 在 2020年9月10日 +0800 PM3:47,Timo Walther <twal...@apache.org>,写道: >>>>>>> Thanks everyone for this healthy discussion. I updated the FLIP with the >>>>>>> outcome. I think the result is very powerful but also very easy to >>>>>>> declare. Thanks for all the contributions. >>>>>>> >>>>>>> If there are no objections, I would continue with a voting. >>>>>>> >>>>>>> What do you think? >>>>>>> >>>>>>> Regards, >>>>>>> Timo >>>>>>> >>>>>>> >>>>>>> On 09.09.20 16:52, Timo Walther wrote: >>>>>>>> "If virtual by default, when a user types "timestamp int" ==> >>>>>> persisted >>>>>>>> column, then adds a "metadata" after that ==> virtual column, then >>>>>> adds >>>>>>>> a "persisted" after that ==> persisted column." >>>>>>>> >>>>>>>> Thanks for this nice mental model explanation, Jark. This makes total >>>>>>>> sense to me. Also making the the most common case as short at just >>>>>>>> adding `METADATA` is a very good idea. Thanks, Danny! >>>>>>>> >>>>>>>> Let me update the FLIP again with all these ideas. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Timo >>>>>>>> >>>>>>>> >>>>>>>> On 09.09.20 15:03, Jark Wu wrote: >>>>>>>>> I'm also +1 to Danny's proposal: timestamp INT METADATA [FROM >>>>>>>>> 'my-timestamp-field'] [VIRTUAL] >>>>>>>>> Especially I like the shortcut: timestamp INT METADATA, this makes >>>>>> the >>>>>>>>> most >>>>>>>>> common case to be supported in the simplest way. >>>>>>>>> >>>>>>>>> I also think the default should be "PERSISTED", so VIRTUAL is >>>>>> optional >>>>>>>>> when >>>>>>>>> you are accessing a read-only metadata. Because: >>>>>>>>> 1. The "timestamp INT METADATA" should be a normal column, because >>>>>>>>> "METADATA" is just a modifier to indicate it is from metadata, a >>>>>> normal >>>>>>>>> column should be persisted. >>>>>>>>> If virtual by default, when a user types "timestamp int" ==> >>>>>>>>> persisted >>>>>>>>> column, then adds a "metadata" after that ==> virtual column, then >>>>>> adds a >>>>>>>>> "persisted" after that ==> persisted column. >>>>>>>>> I think this looks reversed several times and makes users >>>>>> confused. >>>>>>>>> Physical fields are also prefixed with "fieldName TYPE", so >>>>>> "timestamp >>>>>>>>> INT >>>>>>>>> METADATA" is persisted is very straightforward. >>>>>>>>> 2. From the collected user question [1], we can see that "timestamp" >>>>>>>>> is the >>>>>>>>> most common use case. "timestamp" is a read-write metadata. >>>>>> Persisted by >>>>>>>>> default doesn't break the reading behavior. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Jark >>>>>>>>> >>>>>>>>> [1]: https://issues.apache.org/jira/browse/FLINK-15869 >>>>>>>>> >>>>>>>>> On Wed, 9 Sep 2020 at 20:56, Leonard Xu <xbjt...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Thanks @Dawid for the nice summary, I think you catch all >>>>>> opinions of >>>>>>>>>> the >>>>>>>>>> long discussion well. >>>>>>>>>> >>>>>>>>>> @Danny >>>>>>>>>> “ timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL] >>>>>>>>>> Note that the "FROM 'field name'" is only needed when the name >>>>>>>>>> conflict >>>>>>>>>> with the declared table column name, when there are no >>>>>> conflicts, >>>>>>>>>> we can >>>>>>>>>> simplify it to >>>>>>>>>> timestamp INT METADATA" >>>>>>>>>> >>>>>>>>>> I really like the proposal, there is no confusion with computed >>>>>>>>>> column any >>>>>>>>>> more, and it’s concise enough. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> @Timo @Dawid >>>>>>>>>> “We use `SYSTEM_TIME` for temporal tables. I think prefixing with >>>>>> SYSTEM >>>>>>>>>> makes it clearer that it comes magically from the system.” >>>>>>>>>> “As for the issue of shortening the SYSTEM_METADATA to METADATA. >>>>>> Here I >>>>>>>>>> very much prefer the SYSTEM_ prefix.” >>>>>>>>>> >>>>>>>>>> I think `SYSTEM_TIME` is different with `SYSTEM_METADATA ` a lot, >>>>>>>>>> First of all, the word `TIME` has broad meanings but the word >>>>>>>>>> `METADATA ` >>>>>>>>>> not, `METADATA ` has specific meaning, >>>>>>>>>> Secondly, `FOR SYSTEM_TIME AS OF` exists in SQL standard but >>>>>>>>>> `SYSTEM_METADATA ` not. >>>>>>>>>> Personally, I like more simplify way,sometimes less is more. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Leonard >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Timo Walther <twal...@apache.org> 于2020年9月9日周三 下午6:41写道: >>>>>>>>>>> >>>>>>>>>>>> Hi everyone, >>>>>>>>>>>> >>>>>>>>>>>> "key" and "value" in the properties are a special case >>>>>> because they >>>>>>>>>>>> need >>>>>>>>>>>> to configure a format. So key and value are more than just >>>>>> metadata. >>>>>>>>>>>> Jark's example for setting a timestamp would work but as the >>>>>> FLIP >>>>>>>>>>>> discusses, we have way more metadata fields like headers, >>>>>>>>>>>> epoch-leader, >>>>>>>>>>>> etc. Having a property for all of this metadata would mess up >>>>>> the WITH >>>>>>>>>>>> section entirely. Furthermore, we also want to deal with >>>>>> metadata from >>>>>>>>>>>> the formats. Solving this through properties as well would >>>>>> further >>>>>>>>>>>> complicate the property design. >>>>>>>>>>>> >>>>>>>>>>>> Personally, I still like the computed column design more >>>>>> because it >>>>>>>>>>>> allows to have full flexibility to compute the final column: >>>>>>>>>>>> >>>>>>>>>>>> timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS >>>>>>>>>> TIMESTAMP(3))) >>>>>>>>>>>> >>>>>>>>>>>> Instead of having a helper column and a real column in the >>>>>> table: >>>>>>>>>>>> >>>>>>>>>>>> helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3)) >>>>>>>>>>>> realTimestamp AS adjustTimestamp(helperTimestamp) >>>>>>>>>>>> >>>>>>>>>>>> But I see that the discussion leans towards: >>>>>>>>>>>> >>>>>>>>>>>> timestamp INT SYSTEM_METADATA("ts") >>>>>>>>>>>> >>>>>>>>>>>> Which is fine with me. It is the shortest solution, because >>>>>> we don't >>>>>>>>>>>> need additional CAST. We can discuss the syntax, so that >>>>>> confusion >>>>>>>>>>>> with >>>>>>>>>>>> computed columns can be avoided. >>>>>>>>>>>> >>>>>>>>>>>> timestamp INT USING SYSTEM_METADATA("ts") >>>>>>>>>>>> timestamp INT FROM SYSTEM_METADATA("ts") >>>>>>>>>>>> timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED >>>>>>>>>>>> >>>>>>>>>>>> We use `SYSTEM_TIME` for temporal tables. I think prefixing >>>>>> with >>>>>>>>>>>> SYSTEM >>>>>>>>>>>> makes it clearer that it comes magically from the system. >>>>>>>>>>>> >>>>>>>>>>>> What do you think? >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> Timo >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 09.09.20 11:41, Jark Wu wrote: >>>>>>>>>>>>> Hi Danny, >>>>>>>>>>>>> >>>>>>>>>>>>> This is not Oracle and MySQL computed column syntax, >>>>>> because there is >>>>>>>>>> no >>>>>>>>>>>>> "AS" after the type. >>>>>>>>>>>>> >>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>> >>>>>>>>>>>>> If we want to use "offset INT SYSTEM_METADATA("offset")", >>>>>> then I >>>>>>>>>>>>> think >>>>>>>>>> we >>>>>>>>>>>>> must further discuss about "PERSISED" or "VIRTUAL" keyword >>>>>> for >>>>>>>>>> query-sink >>>>>>>>>>>>> schema problem. >>>>>>>>>>>>> Personally, I think we can use a shorter keyword "METADATA" >>>>>> for >>>>>>>>>>>>> "SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a >>>>>> system >>>>>>>>>>>> function >>>>>>>>>>>>> and confuse users this looks like a computed column. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Jark >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, 9 Sep 2020 at 17:23, Danny Chan < >>>>>> danny0...@apache.org> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> "offset INT SYSTEM_METADATA("offset")" >>>>>>>>>>>>>> >>>>>>>>>>>>>> This is actually Oracle or MySQL style computed column >>>>>> syntax. >>>>>>>>>>>>>> >>>>>>>>>>>>>> "You are right that one could argue that "timestamp", >>>>>> "headers" are >>>>>>>>>>>>>> something like "key" and "value"" >>>>>>>>>>>>>> >>>>>>>>>>>>>> I have the same feeling, both key value and headers >>>>>> timestamp are >>>>>>>>>> *real* >>>>>>>>>>>>>> data >>>>>>>>>>>>>> stored in the consumed record, they are not computed or >>>>>> generated. >>>>>>>>>>>>>> >>>>>>>>>>>>>> "Trying to solve everything via properties sounds rather >>>>>> like a hack >>>>>>>>>> to >>>>>>>>>>>>>> me" >>>>>>>>>>>>>> >>>>>>>>>>>>>> Things are not that hack if we can unify the routines or >>>>>> the >>>>>>>>>> definitions >>>>>>>>>>>>>> (all from the computed column way or all from the table >>>>>> options), i >>>>>>>>>> also >>>>>>>>>>>>>> think that it is a hacky that we mix in 2 kinds of syntax >>>>>> for >>>>>>>>>> different >>>>>>>>>>>>>> kinds of metadata (read-only and read-write). In this >>>>>> FLIP, we >>>>>>>>>>>>>> declare >>>>>>>>>>>> the >>>>>>>>>>>>>> Kafka key fields with table options but SYSTEM_METADATA >>>>>> for other >>>>>>>>>>>> metadata, >>>>>>>>>>>>>> that is a hacky thing or something in-consistent. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Kurt Young <ykt...@gmail.com> 于2020年9月9日周三 下午4:48写道: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I would vote for `offset INT >>>>>> SYSTEM_METADATA("offset")`. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I don't think we can stick with the SQL standard in DDL >>>>>> part >>>>>>>>>>>>>>> forever, >>>>>>>>>>>>>>> especially as there are more and more >>>>>>>>>>>>>>> requirements coming from different connectors and >>>>>> external systems. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> Kurt >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Sep 9, 2020 at 4:40 PM Timo Walther < >>>>>> twal...@apache.org> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Jark, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> now we are back at the original design proposed by >>>>>> Dawid :D >>>>>>>>>>>>>>>> Yes, we >>>>>>>>>>>>>>>> should be cautious about adding new syntax. But the >>>>>> length of this >>>>>>>>>>>>>>>> discussion shows that we are looking for a good >>>>>> long-term >>>>>>>>>>>>>>>> solution. >>>>>>>>>> In >>>>>>>>>>>>>>>> this case I would rather vote for a deep integration >>>>>> into the >>>>>>>>>> syntax. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Computed columns are also not SQL standard compliant. >>>>>> And our >>>>>>>>>>>>>>>> DDL is >>>>>>>>>>>>>>>> neither, so we have some degree of freedom here. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Trying to solve everything via properties sounds >>>>>> rather like a >>>>>>>>>>>>>>>> hack >>>>>>>>>> to >>>>>>>>>>>>>>>> me. You are right that one could argue that >>>>>> "timestamp", "headers" >>>>>>>>>> are >>>>>>>>>>>>>>>> something like "key" and "value". However, mixing >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> `offset AS SYSTEM_METADATA("offset")` >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> `'timestamp.field' = 'ts'` >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> looks more confusing to users that an explicit >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> `offset AS CAST(SYSTEM_METADATA("offset") AS INT)` >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> `offset INT SYSTEM_METADATA("offset")` >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> that is symetric for both source and sink. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> What do others think? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>> Timo >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 09.09.20 10:09, Jark Wu wrote: >>>>>>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I think we have a conclusion that the writable >>>>>> metadata shouldn't >>>>>>>>>> be >>>>>>>>>>>>>>>>> defined as a computed column, but a normal column. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> "timestamp STRING SYSTEM_METADATA('timestamp')" is >>>>>> one of the >>>>>>>>>>>>>>> approaches. >>>>>>>>>>>>>>>>> However, it is not SQL standard compliant, we need >>>>>> to be cautious >>>>>>>>>>>>>>> enough >>>>>>>>>>>>>>>>> when adding new syntax. >>>>>>>>>>>>>>>>> Besides, we have to introduce the `PERSISTED` or >>>>>> `VIRTUAL` >>>>>>>>>>>>>>>>> keyword >>>>>>>>>> to >>>>>>>>>>>>>>>>> resolve the query-sink schema problem if it is >>>>>> read-only >>>>>>>>>>>>>>>>> metadata. >>>>>>>>>>>>>> That >>>>>>>>>>>>>>>>> adds more stuff to learn for users. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> From my point of view, the "timestamp", >>>>>> "headers" are something >>>>>>>>>> like >>>>>>>>>>>>>>>> "key" >>>>>>>>>>>>>>>>> and "value" that stores with the real data. So why >>>>>> not define the >>>>>>>>>>>>>>>>> "timestamp" in the same way with "key" by using a >>>>>>>>>>>>>>>>> "timestamp.field" >>>>>>>>>>>>>>>>> connector option? >>>>>>>>>>>>>>>>> On the other side, the read-only metadata, such as >>>>>> "offset", >>>>>>>>>>>>>> shouldn't >>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> defined as a normal column. So why not use the >>>>>> existing computed >>>>>>>>>>>>>> column >>>>>>>>>>>>>>>>> syntax for such metadata? Then we don't have the >>>>>> query-sink >>>>>>>>>>>>>>>>> schema >>>>>>>>>>>>>>>> problem. >>>>>>>>>>>>>>>>> So here is my proposal: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> CREATE TABLE kafka_table ( >>>>>>>>>>>>>>>>> id BIGINT, >>>>>>>>>>>>>>>>> name STRING, >>>>>>>>>>>>>>>>> col1 STRING, >>>>>>>>>>>>>>>>> col2 STRING, >>>>>>>>>>>>>>>>> ts TIMESTAMP(3) WITH LOCAL TIME ZONE, -- ts >>>>>> is a normal >>>>>>>>>> field, >>>>>>>>>>>>>> so >>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>> be read and written. >>>>>>>>>>>>>>>>> offset AS SYSTEM_METADATA("offset") >>>>>>>>>>>>>>>>> ) WITH ( >>>>>>>>>>>>>>>>> 'connector' = 'kafka', >>>>>>>>>>>>>>>>> 'topic' = 'test-topic', >>>>>>>>>>>>>>>>> 'key.fields' = 'id, name', >>>>>>>>>>>>>>>>> 'key.format' = 'csv', >>>>>>>>>>>>>>>>> 'value.format' = 'avro', >>>>>>>>>>>>>>>>> 'timestamp.field' = 'ts' -- define the >>>>>> mapping of Kafka >>>>>>>>>>>>>> timestamp >>>>>>>>>>>>>>>>> ); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> INSERT INTO kafka_table >>>>>>>>>>>>>>>>> SELECT id, name, col1, col2, rowtime FROM >>>>>> another_table; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I think this can solve all the problems without >>>>>> introducing >>>>>>>>>>>>>>>>> any new >>>>>>>>>>>>>>>> syntax. >>>>>>>>>>>>>>>>> The only minor disadvantage is that we separate the >>>>>> definition >>>>>>>>>>>>>>> way/syntax >>>>>>>>>>>>>>>>> of read-only metadata and read-write fields. >>>>>>>>>>>>>>>>> However, I don't think this is a big problem. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>> Jark >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, 9 Sep 2020 at 15:09, Timo Walther < >>>>>> twal...@apache.org> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Kurt, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> thanks for sharing your opinion. I'm totally up >>>>>> for not reusing >>>>>>>>>>>>>>> computed >>>>>>>>>>>>>>>>>> columns. I think Jark was a big supporter of this >>>>>> syntax, @Jark >>>>>>>>>> are >>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>> fine with this as well? The non-computed column >>>>>> approach was >>>>>>>>>>>>>>>>>> only >>>>>>>>>> a >>>>>>>>>>>>>>>>>> "slightly rejected alternative". >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Furthermore, we would need to think about how >>>>>> such a new design >>>>>>>>>>>>>>>>>> influences the LIKE clause though. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> However, we should still keep the `PERSISTED` >>>>>> keyword as it >>>>>>>>>>>>>> influences >>>>>>>>>>>>>>>>>> the query->sink schema. If you look at the list >>>>>> of metadata for >>>>>>>>>>>>>>> existing >>>>>>>>>>>>>>>>>> connectors and formats, we currently offer only >>>>>> two writable >>>>>>>>>>>>>> metadata >>>>>>>>>>>>>>>>>> fields. Otherwise, one would need to declare two >>>>>> tables >>>>>>>>>>>>>>>>>> whenever a >>>>>>>>>>>>>>>>>> metadata columns is read (one for the source, one >>>>>> for the sink). >>>>>>>>>>>>>> This >>>>>>>>>>>>>>>>>> can be quite inconvientient e.g. for just reading >>>>>> the topic. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>> Timo >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On 09.09.20 08:52, Kurt Young wrote: >>>>>>>>>>>>>>>>>>> I also share the concern that reusing the >>>>>> computed column >>>>>>>>>>>>>>>>>>> syntax >>>>>>>>>>>>>> but >>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>> different semantics >>>>>>>>>>>>>>>>>>> would confuse users a lot. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Besides, I think metadata fields are >>>>>> conceptually not the same >>>>>>>>>> with >>>>>>>>>>>>>>>>>>> computed columns. The metadata >>>>>>>>>>>>>>>>>>> field is a connector specific thing and it only >>>>>> contains the >>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>>>> that where does the field come >>>>>>>>>>>>>>>>>>> from (during source) or where does the field >>>>>> need to write to >>>>>>>>>>>>>> (during >>>>>>>>>>>>>>>>>>> sink). It's more similar with normal >>>>>>>>>>>>>>>>>>> fields, with assumption that all these fields >>>>>> need going to the >>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>>> part. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thus I'm more lean to the rejected alternative >>>>>> that Timo >>>>>>>>>> mentioned. >>>>>>>>>>>>>>>> And I >>>>>>>>>>>>>>>>>>> think we don't need the >>>>>>>>>>>>>>>>>>> PERSISTED keyword, SYSTEM_METADATA should be >>>>>> enough. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> During implementation, the framework only needs >>>>>> to pass such >>>>>>>>>>>>>> <field, >>>>>>>>>>>>>>>>>>> metadata field> information to the >>>>>>>>>>>>>>>>>>> connector, and the logic of handling such >>>>>> fields inside the >>>>>>>>>>>>>> connector >>>>>>>>>>>>>>>>>>> should be straightforward. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Regarding the downside Timo mentioned: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The disadvantage is that users cannot call >>>>>> UDFs or parse >>>>>>>>>>>>>> timestamps. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I think this is fairly simple to solve. Since >>>>>> the metadata >>>>>>>>>>>>>>>>>>> field >>>>>>>>>>>>>>> isn't >>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>> computed column anymore, we can support >>>>>>>>>>>>>>>>>>> referencing such fields in the computed column. >>>>>> For example: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> CREATE TABLE kafka_table ( >>>>>>>>>>>>>>>>>>> id BIGINT, >>>>>>>>>>>>>>>>>>> name STRING, >>>>>>>>>>>>>>>>>>> timestamp STRING >>>>>> SYSTEM_METADATA("timestamp"), // >>>>>>>>>>>>>>>>>>> get the >>>>>>>>>>>>>>>>>> timestamp >>>>>>>>>>>>>>>>>>> field from metadata >>>>>>>>>>>>>>>>>>> ts AS to_timestamp(timestamp) // normal >>>>>> computed >>>>>>>>>>>>>>>>>>> column, >>>>>>>>>>>>>> parse >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> string to TIMESTAMP type by using the metadata >>>>>> field >>>>>>>>>>>>>>>>>>> ) WITH ( >>>>>>>>>>>>>>>>>>> ... >>>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>> Kurt >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Sep 8, 2020 at 11:57 PM Timo Walther >>>>>>>>>>>>>>>>>>> <twal...@apache.org >>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Leonard, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> the only alternative I see is that we >>>>>> introduce a concept that >>>>>>>>>> is >>>>>>>>>>>>>>>>>>>> completely different to computed columns. >>>>>> This is also >>>>>>>>>>>>>>>>>>>> mentioned >>>>>>>>>>>>>> in >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> rejected alternative section of the FLIP. >>>>>> Something like: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> CREATE TABLE kafka_table ( >>>>>>>>>>>>>>>>>>>> id BIGINT, >>>>>>>>>>>>>>>>>>>> name STRING, >>>>>>>>>>>>>>>>>>>> timestamp INT >>>>>> SYSTEM_METADATA("timestamp") PERSISTED, >>>>>>>>>>>>>>>>>>>> headers MAP<STRING, BYTES> >>>>>> SYSTEM_METADATA("headers") >>>>>>>>>>>>>>> PERSISTED >>>>>>>>>>>>>>>>>>>> ) WITH ( >>>>>>>>>>>>>>>>>>>> ... >>>>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> This way we would avoid confusion at all and >>>>>> can easily map >>>>>>>>>>>>>> columns >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> metadata columns. The disadvantage is that >>>>>> users cannot call >>>>>>>>>> UDFs >>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>> parse timestamps. This would need to be done >>>>>> in a real >>>>>>>>>>>>>>>>>>>> computed >>>>>>>>>>>>>>>> column. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I'm happy about better alternatives. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>> Timo >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On 08.09.20 15:37, Leonard Xu wrote: >>>>>>>>>>>>>>>>>>>>> HI, Timo >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks for driving this FLIP. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Sorry but I have a concern about Writing >>>>>> metadata via >>>>>>>>>>>>>>>> DynamicTableSink >>>>>>>>>>>>>>>>>>>> section: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> CREATE TABLE kafka_table ( >>>>>>>>>>>>>>>>>>>>> id BIGINT, >>>>>>>>>>>>>>>>>>>>> name STRING, >>>>>>>>>>>>>>>>>>>>> timestamp AS >>>>>> CAST(SYSTEM_METADATA("timestamp") AS >>>>>>>>>>>>>>>>>>>>> BIGINT) >>>>>>>>>>>>>>>>>> PERSISTED, >>>>>>>>>>>>>>>>>>>>> headers AS >>>>>> CAST(SYSTEM_METADATA("headers") AS >>>>>>>>>>>>>>>>>>>>> MAP<STRING, >>>>>>>>>>>>>>>> BYTES>) >>>>>>>>>>>>>>>>>>>> PERSISTED >>>>>>>>>>>>>>>>>>>>> ) WITH ( >>>>>>>>>>>>>>>>>>>>> ... >>>>>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>>>>> An insert statement could look like: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> INSERT INTO kafka_table VALUES ( >>>>>>>>>>>>>>>>>>>>> (1, "ABC", 1599133672, MAP('checksum', >>>>>>>>>>>>>> computeChecksum(...))) >>>>>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> The proposed INERT syntax does not make >>>>>> sense to me, >>>>>>>>>>>>>>>>>>>>> because it >>>>>>>>>>>>>>>>>> contains >>>>>>>>>>>>>>>>>>>> computed(generated) column. >>>>>>>>>>>>>>>>>>>>> Both SQL server and Postgresql do not allow >>>>>> to insert >>>>>>>>>>>>>>>>>>>>> value to >>>>>>>>>>>>>>>> computed >>>>>>>>>>>>>>>>>>>> columns even they are persisted, this boke >>>>>> the generated >>>>>>>>>>>>>>>>>>>> column >>>>>>>>>>>>>>>>>> semantics >>>>>>>>>>>>>>>>>>>> and may confuse user much. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> For SQL server computed column[1]: >>>>>>>>>>>>>>>>>>>>>> column_name AS computed_column_expression >>>>>> [ PERSISTED [ NOT >>>>>>>>>>>>>> NULL ] >>>>>>>>>>>>>>>>>> ]... >>>>>>>>>>>>>>>>>>>>>> NOTE: A computed column cannot be the >>>>>> target of an INSERT or >>>>>>>>>>>>>>> UPDATE >>>>>>>>>>>>>>>>>>>> statement. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> For Postgresql generated column[2]: >>>>>>>>>>>>>>>>>>>>>> height_in numeric GENERATED ALWAYS >>>>>> AS (height_cm / >>>>>>>>>>>>>>>>>>>>>> 2.54) >>>>>>>>>>>>>>> STORED >>>>>>>>>>>>>>>>>>>>>> NOTE: A generated column cannot be >>>>>> written to directly. In >>>>>>>>>>>>>> INSERT >>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>> UPDATE commands, a value cannot be specified >>>>>> for a generated >>>>>>>>>>>>>> column, >>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>> the keyword DEFAULT may be specified. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> It shouldn't be allowed to set/update value >>>>>> for generated >>>>>>>>>> column >>>>>>>>>>>>>>>> after >>>>>>>>>>>>>>>>>>>> lookup the SQL 2016: >>>>>>>>>>>>>>>>>>>>>> <insert statement> ::= >>>>>>>>>>>>>>>>>>>>>> INSERT INTO <insertion target> <insert >>>>>> columns and source> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> If <contextually typed table value >>>>>> constructor> CTTVC is >>>>>>>>>>>>>>> specified, >>>>>>>>>>>>>>>>>>>> then every <contextually typed row >>>>>>>>>>>>>>>>>>>>>> value constructor element> simply >>>>>> contained in CTTVC whose >>>>>>>>>>>>>>>>>> positionally >>>>>>>>>>>>>>>>>>>> corresponding <column name> >>>>>>>>>>>>>>>>>>>>>> in <insert column list> references a >>>>>> column of which some >>>>>>>>>>>>>>> underlying >>>>>>>>>>>>>>>>>>>> column is a generated column shall >>>>>>>>>>>>>>>>>>>>>> be a <default specification>. >>>>>>>>>>>>>>>>>>>>>> A <default specification> specifies the >>>>>> default value of >>>>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>> associated item. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15 >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15 >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> [2] >>>>>>>>>>>>>> >>>>>> https://www.postgresql.org/docs/12/ddl-generated-columns.html >>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>> >>>>>> https://www.postgresql.org/docs/12/ddl-generated-columns.html> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 在 2020年9月8日,17:31,Timo Walther < >>>>>> twal...@apache.org> >>>>>>>>>>>>>>>>>>>>>> 写道: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi Jark, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> according to Flink's and Calcite's >>>>>> casting definition in >>>>>>>>>> [1][2] >>>>>>>>>>>>>>>>>>>> TIMESTAMP WITH LOCAL TIME ZONE should be >>>>>> castable from BIGINT. >>>>>>>>>> If >>>>>>>>>>>>>>> not, >>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>> will make it possible ;-) >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I'm aware of >>>>>> DeserializationSchema.getProducedType but I >>>>>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>> this method is actually misplaced. The type >>>>>> should rather be >>>>>>>>>>>>>> passed >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> source itself. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> For our Kafka SQL source, we will also >>>>>> not use this method >>>>>>>>>>>>>> because >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> Kafka source will add own metadata in >>>>>> addition to the >>>>>>>>>>>>>>>>>>>> DeserializationSchema. So >>>>>>>>>>>>>>>>>>>> DeserializationSchema.getProducedType >>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>> never >>>>>>>>>>>>>>>>>>>> be read. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> For now I suggest to leave out the >>>>>> `DataType` from >>>>>>>>>>>>>>>>>>>> DecodingFormat.applyReadableMetadata. Also >>>>>> because the >>>>>>>>>>>>>>>>>>>> format's >>>>>>>>>>>>>>>> physical >>>>>>>>>>>>>>>>>>>> type is passed later in >>>>>> `createRuntimeDecoder`. If >>>>>>>>>>>>>>>>>>>> necessary, it >>>>>>>>>>>>>> can >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>> computed manually by consumedType + metadata >>>>>> types. We will >>>>>>>>>>>>>> provide >>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>> metadata utility class for that. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>> Timo >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200 >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> [2] >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254 >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On 08.09.20 10:52, Jark Wu wrote: >>>>>>>>>>>>>>>>>>>>>>> Hi Timo, >>>>>>>>>>>>>>>>>>>>>>> The updated CAST SYSTEM_METADATA >>>>>> behavior sounds good to >>>>>>>>>>>>>>>>>>>>>>> me. >>>>>>>>>> I >>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>>>> noticed >>>>>>>>>>>>>>>>>>>>>>> that a BIGINT can't be converted to >>>>>> "TIMESTAMP(3) WITH >>>>>>>>>>>>>>>>>>>>>>> LOCAL >>>>>>>>>>>>>> TIME >>>>>>>>>>>>>>>>>>>> ZONE". >>>>>>>>>>>>>>>>>>>>>>> So maybe we need to support this, or >>>>>> use "TIMESTAMP(3) WITH >>>>>>>>>>>>>> LOCAL >>>>>>>>>>>>>>>>>> TIME >>>>>>>>>>>>>>>>>>>>>>> ZONE" as the defined type of Kafka >>>>>> timestamp? I think this >>>>>>>>>>>>>> makes >>>>>>>>>>>>>>>>>> sense, >>>>>>>>>>>>>>>>>>>>>>> because it represents the milli-seconds >>>>>> since epoch. >>>>>>>>>>>>>>>>>>>>>>> Regarding "DeserializationSchema >>>>>> doesn't need TypeInfo", I >>>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>> so. >>>>>>>>>>>>>>>>>>>>>>> The DeserializationSchema implements >>>>>> ResultTypeQueryable, >>>>>>>>>> thus >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> implementation needs to return an >>>>>> output TypeInfo. >>>>>>>>>>>>>>>>>>>>>>> Besides, FlinkKafkaConsumer also >>>>>>>>>>>>>>>>>>>>>>> calls >>>>>> DeserializationSchema.getProducedType as the produced >>>>>>>>>>>>>> type >>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> source function [1]. >>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>> Jark >>>>>>>>>>>>>>>>>>>>>>> [1]: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066 >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Tue, 8 Sep 2020 at 16:35, Timo >>>>>> Walther < >>>>>>>>>> twal...@apache.org> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I updated the FLIP again and hope >>>>>> that I could address the >>>>>>>>>>>>>>>> mentioned >>>>>>>>>>>>>>>>>>>>>>>> concerns. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> @Leonard: Thanks for the explanation. >>>>>> I wasn't aware that >>>>>>>>>>>>>> ts_ms >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>> source.ts_ms have different >>>>>> semantics. I updated the FLIP >>>>>>>>>> and >>>>>>>>>>>>>>>> expose >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> most commonly used properties >>>>>> separately. So frequently >>>>>>>>>>>>>>>>>>>>>>>> used >>>>>>>>>>>>>>>>>>>> properties >>>>>>>>>>>>>>>>>>>>>>>> are not hidden in the MAP anymore: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> debezium-json.ingestion-timestamp >>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.timestamp >>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.database >>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.schema >>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.table >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> However, since other properties >>>>>> depend on the used >>>>>>>>>>>>>>>> connector/vendor, >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> remaining options are stored in: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.properties >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> And accessed with: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> CAST(SYSTEM_METADATA('debezium-json.source.properties') AS >>>>>>>>>>>>>>>>>> MAP<STRING, >>>>>>>>>>>>>>>>>>>>>>>> STRING>)['table'] >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Otherwise it is not possible to >>>>>> figure out the value and >>>>>>>>>>>>>> column >>>>>>>>>>>>>>>> type >>>>>>>>>>>>>>>>>>>>>>>> during validation. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> @Jark: You convinced me in relaxing >>>>>> the CAST >>>>>>>>>>>>>>>>>>>>>>>> constraints. I >>>>>>>>>>>>>>> added >>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>> dedicacated sub-section to the FLIP: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> For making the use of SYSTEM_METADATA >>>>>> easier and avoid >>>>>>>>>> nested >>>>>>>>>>>>>>>>>> casting >>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>> allow explicit casting to a target >>>>>> data type: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> rowtime AS >>>>>> CAST(SYSTEM_METADATA("timestamp") AS >>>>>>>>>>>>>>>>>>>>>>>> TIMESTAMP(3) >>>>>>>>>>>>>>> WITH >>>>>>>>>>>>>>>>>>>> LOCAL >>>>>>>>>>>>>>>>>>>>>>>> TIME ZONE) >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> A connector still produces and >>>>>> consumes the data type >>>>>>>>>> returned >>>>>>>>>>>>>>> by >>>>>>>>>>>>>>>>>>>>>>>> `listMetadata()`. The planner will >>>>>> insert necessary >>>>>>>>>>>>>>>>>>>>>>>> explicit >>>>>>>>>>>>>>>> casts. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> In any case, the user must provide a >>>>>> CAST such that the >>>>>>>>>>>>>> computed >>>>>>>>>>>>>>>>>>>> column >>>>>>>>>>>>>>>>>>>>>>>> receives a valid data type when >>>>>> constructing the table >>>>>>>>>> schema. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> "I don't see a reason why >>>>>>>>>>>>>> `DecodingFormat#applyReadableMetadata` >>>>>>>>>>>>>>>>>>>> needs a >>>>>>>>>>>>>>>>>>>>>>>> DataType argument." >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Correct he DeserializationSchema >>>>>> doesn't need TypeInfo, it >>>>>>>>>> is >>>>>>>>>>>>>>>> always >>>>>>>>>>>>>>>>>>>>>>>> executed locally. It is the source >>>>>> that needs TypeInfo for >>>>>>>>>>>>>>>>>> serializing >>>>>>>>>>>>>>>>>>>>>>>> the record to the next operator. And >>>>>> that's this is >>>>>>>>>>>>>>>>>>>>>>>> what we >>>>>>>>>>>>>>>> provide. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> @Danny: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> “SYSTEM_METADATA("offset")` returns >>>>>> the NULL type by >>>>>>>>>> default” >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> We can also use some other means to >>>>>> represent an UNKNOWN >>>>>>>>>> data >>>>>>>>>>>>>>>> type. >>>>>>>>>>>>>>>>>> In >>>>>>>>>>>>>>>>>>>>>>>> the Flink type system, we use the >>>>>> NullType for it. The >>>>>>>>>>>>>> important >>>>>>>>>>>>>>>>>> part >>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>> that the final data type is known for >>>>>> the entire computed >>>>>>>>>>>>>>> column. >>>>>>>>>>>>>>>>>> As I >>>>>>>>>>>>>>>>>>>>>>>> mentioned before, I would avoid the >>>>>> suggested option b) >>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>> similar to your suggestion. The CAST >>>>>> should be enough and >>>>>>>>>>>>>> allows >>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>> complex expressions in the computed >>>>>> column. Option b) >>>>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>>>>> parser >>>>>>>>>>>>>>>>>>>>>>>> changes. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>> Timo >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On 08.09.20 06:21, Leonard Xu wrote: >>>>>>>>>>>>>>>>>>>>>>>>> Hi, Timo >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for you explanation and >>>>>> update, I have only one >>>>>>>>>>>>>>> question >>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>> the latest FLIP. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> About the MAP<STRING, STRING> >>>>>> DataType of key >>>>>>>>>>>>>>>>>>>> 'debezium-json.source', if >>>>>>>>>>>>>>>>>>>>>>>> user want to use the table name >>>>>> metadata, they need to >>>>>>>>>> write: >>>>>>>>>>>>>>>>>>>>>>>>> tableName STRING AS >>>>>>>>>>>>>> CAST(SYSTEM_METADATA('debeuim-json.source') >>>>>>>>>>>>>>>> AS >>>>>>>>>>>>>>>>>>>>>>>> MAP<STRING, STRING>)['table'] >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> the expression is a little complex >>>>>> for user, Could we >>>>>>>>>>>>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>> support >>>>>>>>>>>>>>>>>>>>>>>> necessary metas with simple DataType >>>>>> as following? >>>>>>>>>>>>>>>>>>>>>>>>> tableName STRING AS >>>>>>>>>>>>>>>>>>>> >>>>>> CAST(SYSTEM_METADATA('debeuim-json.source.table') AS >>>>>>>>>>>>>>>>>>>>>>>> STRING), >>>>>>>>>>>>>>>>>>>>>>>>> transactionTime LONG AS >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS >>>>>>>>>> BIGINT), >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> In this way, we can simplify the >>>>>> expression, the mainly >>>>>>>>>> used >>>>>>>>>>>>>>>>>>>> metadata in >>>>>>>>>>>>>>>>>>>>>>>> changelog format may include >>>>>>>>>>>>>>>>>>>> 'database','table','source.ts_ms','ts_ms' from >>>>>>>>>>>>>>>>>>>>>>>> my side, >>>>>>>>>>>>>>>>>>>>>>>>> maybe we could only support them at >>>>>> first version. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Both Debezium and Canal have above >>>>>> four metadata, and I‘m >>>>>>>>>>>>>>> willing >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> take some subtasks in next >>>>>> development if necessary. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Debezium: >>>>>>>>>>>>>>>>>>>>>>>>> { >>>>>>>>>>>>>>>>>>>>>>>>> "before": null, >>>>>>>>>>>>>>>>>>>>>>>>> "after": { "id": >>>>>> 101,"name": "scooter"}, >>>>>>>>>>>>>>>>>>>>>>>>> "source": { >>>>>>>>>>>>>>>>>>>>>>>>> "db": >>>>>> "inventory", # 1. >>>>>>>>>>>>>>>>>>>>>>>>> database >>>>>>>>>>>>>> name >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> changelog belongs to. >>>>>>>>>>>>>>>>>>>>>>>>> "table": >>>>>> "products", # 2. >>>>>>>>>>>>>>>>>>>>>>>>> table name >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> changelog >>>>>>>>>>>>>>>>>>>>>>>> belongs to. >>>>>>>>>>>>>>>>>>>>>>>>> "ts_ms": >>>>>> 1589355504100, # 3. >>>>>>>>>>>>>>>>>>>>>>>>> timestamp >>>>>>>>>>>> of >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> change >>>>>>>>>>>>>>>>>>>>>>>> happened in database system, i.e.: >>>>>> transaction time in >>>>>>>>>>>>>> database. >>>>>>>>>>>>>>>>>>>>>>>>> "connector": "mysql", >>>>>>>>>>>>>>>>>>>>>>>>> …. >>>>>>>>>>>>>>>>>>>>>>>>> }, >>>>>>>>>>>>>>>>>>>>>>>>> "ts_ms": >>>>>> 1589355606100, # 4. >>>>>>>>>>>>>>>>>>>>>>>>> timestamp >>>>>>>>>>>>>> when >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> debezium >>>>>>>>>>>>>>>>>>>>>>>> processed the changelog. >>>>>>>>>>>>>>>>>>>>>>>>> "op": "c", >>>>>>>>>>>>>>>>>>>>>>>>> "transaction": null >>>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Canal: >>>>>>>>>>>>>>>>>>>>>>>>> { >>>>>>>>>>>>>>>>>>>>>>>>> "data": [{ "id": "102", >>>>>> "name": "car battery" }], >>>>>>>>>>>>>>>>>>>>>>>>> "database": >>>>>> "inventory", # 1. database >>>>>>>>>>>>>>>>>>>>>>>>> name the >>>>>>>>>>>>>>>> changelog >>>>>>>>>>>>>>>>>>>>>>>> belongs to. >>>>>>>>>>>>>>>>>>>>>>>>> "table": >>>>>> "products", # 2. table name the >>>>>>>>>>>>>>> changelog >>>>>>>>>>>>>>>>>>>> belongs >>>>>>>>>>>>>>>>>>>>>>>> to. >>>>>>>>>>>>>>>>>>>>>>>>> "es": >>>>>> 1589374013000, # 3. execution >>>>>>>>>>>>>>>>>>>>>>>>> time of >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> change >>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>> database system, i.e.: transaction >>>>>> time in database. >>>>>>>>>>>>>>>>>>>>>>>>> "ts": >>>>>> 1589374013680, # 4. timestamp >>>>>>>>>>>>>>>>>>>>>>>>> when the >>>>>>>>>>>>>>>> cannal >>>>>>>>>>>>>>>>>>>>>>>> processed the changelog. >>>>>>>>>>>>>>>>>>>>>>>>> "isDdl": false, >>>>>>>>>>>>>>>>>>>>>>>>> "mysqlType": {}, >>>>>>>>>>>>>>>>>>>>>>>>> .... >>>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Best >>>>>>>>>>>>>>>>>>>>>>>>> Leonard >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> 在 2020年9月8日,11:57,Danny Chan >>>>>>>>>>>>>>>>>>>>>>>>>> <yuzhao....@gmail.com> 写道: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Timo ~ >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> The FLIP was already in pretty >>>>>> good shape, I have only 2 >>>>>>>>>>>>>>>> questions >>>>>>>>>>>>>>>>>>>> here: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> 1. >>>>>> “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a >>>>>>>>>>>>>> valid >>>>>>>>>>>>>>>>>>>> read-only >>>>>>>>>>>>>>>>>>>>>>>> computed column for Kafka and can be >>>>>> extracted by the >>>>>>>>>>>>>> planner.” >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> What is the pros we follow the >>>>>> SQL-SERVER syntax here ? >>>>>>>>>>>>>>> Usually >>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>>>> expression return type can be >>>>>> inferred automatically. >>>>>>>>>>>>>>>>>>>>>>>> But I >>>>>>>>>>>>>>> guess >>>>>>>>>>>>>>>>>>>>>>>> SQL-SERVER does not have function >>>>>> like SYSTEM_METADATA >>>>>>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>> actually >>>>>>>>>>>>>>>>>>>> does >>>>>>>>>>>>>>>>>>>>>>>> not have a specific return type. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> And why not use the Oracle or >>>>>> MySQL syntax there ? >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> column_name [datatype] [GENERATED >>>>>> ALWAYS] AS >>>>>>>>>>>>>>>>>>>>>>>>>> (expression) >>>>>>>>>>>>>>>>>> [VIRTUAL] >>>>>>>>>>>>>>>>>>>>>>>>>> Which is more straight-forward. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> 2. “SYSTEM_METADATA("offset")` >>>>>> returns the NULL type by >>>>>>>>>>>>>>> default” >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> The default type should not be >>>>>> NULL because only NULL >>>>>>>>>>>>>> literal >>>>>>>>>>>>>>>> does >>>>>>>>>>>>>>>>>>>>>>>> that. Usually we use ANY as the type >>>>>> if we do not know the >>>>>>>>>>>>>>>> specific >>>>>>>>>>>>>>>>>>>> type in >>>>>>>>>>>>>>>>>>>>>>>> the SQL context. ANY means the >>>>>> physical value can be any >>>>>>>>>> java >>>>>>>>>>>>>>>>>> object. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>> >>>>>> https://oracle-base.com/articles/11g/virtual-columns-11gr1 >>>>>>>>>>>>>>>>>>>>>>>>>> [2] >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>> https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>> Danny Chan >>>>>>>>>>>>>>>>>>>>>>>>>> 在 2020年9月4日 +0800 PM4:48,Timo >>>>>> Walther >>>>>>>>>>>>>>>>>>>>>>>>>> <twal...@apache.org >>>>>>>>>>>>>>>> ,写道: >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I completely reworked FLIP-107. >>>>>> It now covers the full >>>>>>>>>>>>>> story >>>>>>>>>>>>>>>> how >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> read >>>>>>>>>>>>>>>>>>>>>>>>>>> and write metadata from/to >>>>>> connectors and formats. It >>>>>>>>>>>>>>> considers >>>>>>>>>>>>>>>>>>>> all of >>>>>>>>>>>>>>>>>>>>>>>>>>> the latest FLIPs, namely >>>>>> FLIP-95, FLIP-132 and >>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP-122. >>>>>>>>>> It >>>>>>>>>>>>>>>>>>>> introduces >>>>>>>>>>>>>>>>>>>>>>>>>>> the concept of PERSISTED >>>>>> computed columns and leaves >>>>>>>>>>>>>>>>>>>>>>>>>>> out >>>>>>>>>>>>>>>>>>>> partitioning >>>>>>>>>>>>>>>>>>>>>>>>>>> for now. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your >>>>>> feedback. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>> Timo >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On 04.03.20 09:45, Kurt Young >>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry, forgot one question. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. Can we make the >>>>>> value.fields-include more >>>>>>>>>>>>>>>>>>>>>>>>>>>> orthogonal? >>>>>>>>>>>>>>> Like >>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>>>>>> specify it as "EXCEPT_KEY, >>>>>> EXCEPT_TIMESTAMP". >>>>>>>>>>>>>>>>>>>>>>>>>>>> With current EXCEPT_KEY and >>>>>> EXCEPT_KEY_TIMESTAMP, >>>>>>>>>>>>>>>>>>>>>>>>>>>> users >>>>>>>>>>>>>> can >>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>> config to >>>>>>>>>>>>>>>>>>>>>>>>>>>> just ignore timestamp but >>>>>> keep key. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>> Kurt >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 4, 2020 at 4:42 >>>>>> PM Kurt Young < >>>>>>>>>>>>>> ykt...@gmail.com >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Dawid, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have a couple of >>>>>> questions around key fields, >>>>>>>>>> actually >>>>>>>>>>>>>> I >>>>>>>>>>>>>>>> also >>>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>>>>>>> other questions but want to >>>>>> be focused on key fields >>>>>>>>>>>>>> first. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. I don't fully understand >>>>>> the usage of >>>>>>>>>>>>>>>>>>>>>>>>>>>>> "key.fields". >>>>>>>>>> Is >>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>> option only >>>>>>>>>>>>>>>>>>>>>>>>>>>>> valid during write >>>>>> operation? Because for >>>>>>>>>>>>>>>>>>>>>>>>>>>>> reading, I can't imagine >>>>>> how such options can be >>>>>>>>>>>>>> applied. I >>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>> expect >>>>>>>>>>>>>>>>>>>>>>>>>>>>> that there might be a >>>>>> SYSTEM_METADATA("key") >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to read and assign the key >>>>>> to a normal field? >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. If "key.fields" is only >>>>>> valid in write >>>>>>>>>>>>>>>>>>>>>>>>>>>>> operation, I >>>>>>>>>>>>>> want >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> propose we >>>>>>>>>>>>>>>>>>>>>>>>>>>>> can simplify the options to >>>>>> not introducing >>>>>>>>>>>>>> key.format.type >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>> other related options. I >>>>>> think a single "key.field" >>>>>>>>>> (not >>>>>>>>>>>>>>>>>> fields) >>>>>>>>>>>>>>>>>>>>>>>> would be >>>>>>>>>>>>>>>>>>>>>>>>>>>>> enough, users can use UDF >>>>>> to calculate whatever key >>>>>>>>>> they >>>>>>>>>>>>>>>>>>>>>>>>>>>>> want before sink. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Also I don't want to >>>>>> introduce "value.format.type" >>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>> "value.format.xxx" with the >>>>>> "value" prefix. Not every >>>>>>>>>>>>>>>> connector >>>>>>>>>>>>>>>>>>>> has a >>>>>>>>>>>>>>>>>>>>>>>>>>>>> concept >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of key and values. The old >>>>>> parameter "format.type" >>>>>>>>>>>>>> already >>>>>>>>>>>>>>>> good >>>>>>>>>>>>>>>>>>>>>>>> enough to >>>>>>>>>>>>>>>>>>>>>>>>>>>>> use. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kurt >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Mar 3, 2020 at >>>>>> 10:40 PM Jark Wu < >>>>>>>>>>>>>> imj...@gmail.com> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dawid, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have two more questions. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> SupportsMetadata >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Introducing >>>>>> SupportsMetadata sounds good to me. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> But I >>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>> questions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> regarding to this >>>>>> interface. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) How do the source know >>>>>> what the expected return >>>>>>>>>> type >>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>> each >>>>>>>>>>>>>>>>>>>>>>>> metadata? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Where to put the >>>>>> metadata fields? Append to the >>>>>>>>>>>>>>> existing >>>>>>>>>>>>>>>>>>>> physical >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fields? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If yes, I would suggest >>>>>> to change the signature to >>>>>>>>>>>>>>>>>> `TableSource >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> appendMetadataFields(String[] metadataNames, >>>>>>>>>> DataType[] >>>>>>>>>>>>>>>>>>>>>>>> metadataTypes)` >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> SYSTEM_METADATA("partition") >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Can SYSTEM_METADATA() >>>>>> function be used nested in a >>>>>>>>>>>>>>> computed >>>>>>>>>>>>>>>>>>>> column >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> expression? If yes, how >>>>>> to specify the return >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> type of >>>>>>>>>>>>>>>>>>>>>>>> SYSTEM_METADATA? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jark >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, 3 Mar 2020 at >>>>>> 17:06, Dawid Wysakowicz < >>>>>>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. I thought a bit more >>>>>> on how the source would >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> emit >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> columns >>>>>>>>>>>>>>>>>>>>>>>> and I >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> now see its not exactly >>>>>> the same as regular >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns. >>>>>>>>>> I >>>>>>>>>>>>>>> see >>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> elaborate a bit more on >>>>>> that in the FLIP as you >>>>>>>>>> asked, >>>>>>>>>>>>>>>> Jark. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I do agree mostly with >>>>>> Danny on how we should do >>>>>>>>>> that. >>>>>>>>>>>>>>> One >>>>>>>>>>>>>>>>>>>>>>>> additional >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> things I would >>>>>> introduce is an >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> interface >>>>>> SupportsMetadata { >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> boolean >>>>>> supportsMetadata(Set<String> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadataFields); >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> TableSource >>>>>> generateMetadataFields(Set<String> >>>>>>>>>>>>>>>>>> metadataFields); >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This way the source >>>>>> would have to declare/emit only >>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> requested >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadata fields. In >>>>>> order not to clash with user >>>>>>>>>>>>>> defined >>>>>>>>>>>>>>>>>>>> fields. >>>>>>>>>>>>>>>>>>>>>>>> When >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> emitting the metadata >>>>>> field I would prepend the >>>>>>>>>> column >>>>>>>>>>>>>>> name >>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> __system_{property_name}. Therefore when requested >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> SYSTEM_METADATA("partition") the source would >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> append >>>>>>>>>> a >>>>>>>>>>>>>>>> field >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> __system_partition to >>>>>> the schema. This would be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> never >>>>>>>>>>>>>>>> visible >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> user as it would be >>>>>> used only for the subsequent >>>>>>>>>>>>>> computed >>>>>>>>>>>>>>>>>>>> columns. >>>>>>>>>>>>>>>>>>>>>>>> If >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that makes sense to >>>>>> you, I will update the FLIP >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>> description. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. CAST vs explicit >>>>>> type in computed columns >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Here I agree with >>>>>> Danny. It is also the current >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>> of >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> proposal. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Partitioning on >>>>>> computed column vs function >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Here I also agree with >>>>>> Danny. I also think those >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>> orthogonal. I >>>>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> leave out the STORED >>>>>> computed columns out of the >>>>>>>>>>>>>>>> discussion. >>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>> don't see >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> how do they relate to >>>>>> the partitioning. I >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> already put >>>>>>>>>>>>>>> both >>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>> those >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cases in the document. >>>>>> We can either partition on a >>>>>>>>>>>>>>>> computed >>>>>>>>>>>>>>>>>>>>>>>> column or >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use a udf in a >>>>>> partioned by clause. I am fine with >>>>>>>>>>>>>>> leaving >>>>>>>>>>>>>>>>>> out >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioning by udf in >>>>>> the first version if you >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> still >>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> concerns. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for your question >>>>>> Danny. It depends which >>>>>>>>>>>>>> partitioning >>>>>>>>>>>>>>>>>>>> strategy >>>>>>>>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For the HASH >>>>>> partitioning strategy I thought it >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>> work >>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> explained. It would be >>>>>> N = MOD(expr, num). I am not >>>>>>>>>>>>>> sure >>>>>>>>>>>>>>>>>>>> though if >>>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should introduce the >>>>>> PARTITIONS clause. Usually >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink >>>>>>>>>>>>>>> does >>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>> own >>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data and the partitions >>>>>> are already an intrinsic >>>>>>>>>>>>>> property >>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> underlying source e.g. >>>>>> for kafka we do not create >>>>>>>>>>>>>> topics, >>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> describe pre-existing >>>>>> pre-partitioned topic. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. timestamp vs >>>>>> timestamp.field vs >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.field vs >>>>>>>>>>>>>> ... >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am fine with changing >>>>>> it to timestamp.field to be >>>>>>>>>>>>>>>>>> consistent >>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> other value.fields and >>>>>> key.fields. Actually that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> was >>>>>>>>>>>>>> also >>>>>>>>>>>>>>>> my >>>>>>>>>>>>>>>>>>>>>>>> initial >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposal in a first >>>>>> draft I prepared. I changed it >>>>>>>>>>>>>>>> afterwards >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> shorten >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the key. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dawid >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 03/03/2020 09:00, >>>>>> Danny Chan wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dawid for >>>>>> bringing up this discussion, I >>>>>>>>>> think >>>>>>>>>>>>>> it >>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>> useful >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> feature ~ >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About how the >>>>>> metadata outputs from source >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think it is >>>>>> completely orthogonal, computed >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> column >>>>>>>>>>>>>>> push >>>>>>>>>>>>>>>>>>>> down is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> another topic, this >>>>>> should not be a blocker but a >>>>>>>>>>>>>>>> promotion, >>>>>>>>>>>>>>>>>>>> if we >>>>>>>>>>>>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have any filters on the >>>>>> computed column, there >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is no >>>>>>>>>>>>>> need >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> do any >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> pushings; the source >>>>>> node just emit the complete >>>>>>>>>> record >>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>> full >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadata >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with the declared >>>>>> physical schema, then when >>>>>>>>>> generating >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> virtual >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns, we would >>>>>> extract the metadata info and >>>>>>>>>> output >>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>> full >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns(with >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> full schema). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About the type of >>>>>> metadata column >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Personally i prefer >>>>>> explicit type instead of CAST, >>>>>>>>>>>>>> they >>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>> symantic >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> equivalent though, >>>>>> explict type is more >>>>>>>>>>>>>> straight-forward >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>> we can >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> declare >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the nullable attribute >>>>>> there. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About option A: >>>>>> partitioning based on acomputed >>>>>>>>>> column >>>>>>>>>>>>>>> VS >>>>>>>>>>>>>>>>>>>> option >>>>>>>>>>>>>>>>>>>>>>>> B: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioning with just >>>>>> a function >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> From the FLIP, >>>>>> it seems that B's >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioning is >>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>> strategy >>>>>>>>>>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> writing data, the >>>>>> partiton column is not >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> included in >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>>>>> schema, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's just useless when >>>>>> reading from that. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Compared to A, we >>>>>> do not need to generate the >>>>>>>>>>>>>>> partition >>>>>>>>>>>>>>>>>>>> column >>>>>>>>>>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> selecting from the >>>>>> table(but insert into) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - For A we can also >>>>>> mark the column as STORED when >>>>>>>>>> we >>>>>>>>>>>>>>> want >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> persist >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So in my opition they >>>>>> are orthogonal, we can >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> support >>>>>>>>>>>>>>>> both, i >>>>>>>>>>>>>>>>>>>> saw >>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> MySQL/Oracle[1][2] >>>>>> would suggest to also define the >>>>>>>>>>>>>>>>>> PARTITIONS >>>>>>>>>>>>>>>>>>>>>>>> num, and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions are managed >>>>>> under a "tablenamespace", >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> partition >>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> record is stored is >>>>>> partition number N, where N = >>>>>>>>>>>>>>> MOD(expr, >>>>>>>>>>>>>>>>>>>> num), >>>>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> your >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> design, which partiton >>>>>> the record would persist ? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>> https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2] >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Danny Chan >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 在 2020年3月2日 +0800 >>>>>> PM6:16,Dawid Wysakowicz < >>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ,写道: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jark, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ad. 2 I added a >>>>>> section to discuss relation to >>>>>>>>>>>>>> FLIP-63 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ad. 3 Yes, I also >>>>>> tried to somewhat keep >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> hierarchy >>>>>>>>>> of >>>>>>>>>>>>>>>>>>>> properties. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore you have the >>>>>> key.format.type. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I also considered >>>>>> exactly what you are suggesting >>>>>>>>>>>>>>>>>> (prefixing >>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector or kafka). I >>>>>> should've put that into an >>>>>>>>>>>>>>>>>>>> Option/Rejected >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> alternatives. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I agree timestamp, >>>>>> key.*, value.* are connector >>>>>>>>>>>>>>>> properties. >>>>>>>>>>>>>>>>>>>> Why I >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wanted to suggest not >>>>>> adding that prefix in the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> first >>>>>>>>>>>>>>>> version >>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> actually all the >>>>>> properties in the WITH section are >>>>>>>>>>>>>>>> connector >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> properties. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Even format is in the >>>>>> end a connector property as >>>>>>>>>> some >>>>>>>>>>>>>> of >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> sources >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not have a format, imo. >>>>>> The benefit of not >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> adding the >>>>>>>>>>>>>>>> prefix >>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>> that it >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> makes the keys a bit >>>>>> shorter. Imagine prefixing all >>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> properties >>>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector (or if we go >>>>>> with FLINK-12557: >>>>>>>>>>>>>> elasticsearch): >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> elasticsearch.key.format.type: csv >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> elasticsearch.key.format.field: .... >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> elasticsearch.key.format.delimiter: .... >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> elasticsearch.key.format.*: .... >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am fine with >>>>>> doing it though if this is a >>>>>>>>>> preferred >>>>>>>>>>>>>>>>>>>> approach >>>>>>>>>>>>>>>>>>>>>>>> in the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> community. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ad in-line comments: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I forgot to update >>>>>> the `value.fields.include` >>>>>>>>>>>>>> property. >>>>>>>>>>>>>>>> It >>>>>>>>>>>>>>>>>>>>>>>> should be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> value.fields-include. >>>>>> Which I think you also >>>>>>>>>> suggested >>>>>>>>>>>>>> in >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> comment, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> right? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the cast vs >>>>>> declaring output type of >>>>>>>>>> computed >>>>>>>>>>>>>>>>>> column. >>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's better not to use >>>>>> CAST, but declare a type >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of an >>>>>>>>>>>>>>>>>>>> expression >>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> later >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on infer the output >>>>>> type of SYSTEM_METADATA. The >>>>>>>>>> reason >>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> way >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it will be easier to >>>>>> implement e.g. filter push >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> downs >>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>>> working >>>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> native types of the >>>>>> source, e.g. in case of Kafka's >>>>>>>>>>>>>>>> offset, i >>>>>>>>>>>>>>>>>>>>>>>> think it's >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> better to pushdown long >>>>>> rather than string. This >>>>>>>>>> could >>>>>>>>>>>>>>> let >>>>>>>>>>>>>>>> us >>>>>>>>>>>>>>>>>>>> push >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> expression like e.g. >>>>>> offset > 12345 & offset < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 59382. >>>>>>>>>>>>>>>>>>>> Otherwise we >>>>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have to push down >>>>>> cast(offset, long) > 12345 && >>>>>>>>>>>>>>>> cast(offset, >>>>>>>>>>>>>>>>>>>> long) >>>>>>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 59382. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Moreover I think we >>>>>> need to introduce the type for >>>>>>>>>>>>>>> computed >>>>>>>>>>>>>>>>>>>> columns >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> anyway >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to support functions >>>>>> that infer output type >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> based on >>>>>>>>>>>>>>>> expected >>>>>>>>>>>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> type. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the computed >>>>>> column push down. Yes, >>>>>>>>>>>>>>>> SYSTEM_METADATA >>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to be pushed down to >>>>>> the source. If it is not >>>>>>>>>> possible >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> planner >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fail. As far as I know >>>>>> computed columns push down >>>>>>>>>> will >>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>> part >>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>> source >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rework, won't it? ;) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the >>>>>> persisted computed column. I think >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it is >>>>>>>>>>>>>>>>>>>> completely >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> orthogonal. In my >>>>>> current proposal you can also >>>>>>>>>>>>>> partition >>>>>>>>>>>>>>>> by >>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>> computed >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> column. The difference >>>>>> between using a udf in >>>>>>>>>>>>>> partitioned >>>>>>>>>>>>>>>> by >>>>>>>>>>>>>>>>>> vs >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioned >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by a computed column is >>>>>> that when you partition >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by a >>>>>>>>>>>>>>>> computed >>>>>>>>>>>>>>>>>>>>>>>> column >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> column must be also >>>>>> computed when reading the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table. >>>>>>>>>> If >>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>> use a >>>>>>>>>>>>>>>>>>>>>>>> udf in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the partitioned by, the >>>>>> expression is computed only >>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>>> inserting >>>>>>>>>>>>>>>>>>>>>>>> into >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hope this answers >>>>>> some of your questions. Looking >>>>>>>>>>>>>>> forward >>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>> further >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggestions. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dawid >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 02/03/2020 >>>>>> 05:18, Jark Wu wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dawid for >>>>>> starting such a great >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion. >>>>>>>>>>>>>>>> Reaing >>>>>>>>>>>>>>>>>>>>>>>> metadata >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key-part >>>>>> information from source is an important >>>>>>>>>>>>>>> feature >>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> streaming >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In general, I >>>>>> agree with the proposal of the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I will leave my >>>>>> thoughts and comments here: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) +1 to use >>>>>> connector properties instead of >>>>>>>>>>>>>>> introducing >>>>>>>>>>>>>>>>>>>> HEADER >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> keyword as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the reason you >>>>>> mentioned in the FLIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) we already >>>>>> introduced PARTITIONED BY in >>>>>>>>>> FLIP-63. >>>>>>>>>>>>>>>> Maybe >>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> add a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> section to >>>>>> explain what's the relationship >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between >>>>>>>>>>>>>>> them. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Do their concepts >>>>>> conflict? Could INSERT >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PARTITION >>>>>>>>>>>>>> be >>>>>>>>>>>>>>>> used >>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PARTITIONED table >>>>>> in this FLIP? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Currently, >>>>>> properties are hierarchical in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink >>>>>>>>>>>>>>> SQL. >>>>>>>>>>>>>>>>>>>> Shall we >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> make >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> new introduced >>>>>> properties more hierarchical? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For example, >>>>>> "timestamp" => >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> "connector.timestamp"? >>>>>>>>>>>>>>>>>>>> (actually, I >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> prefer >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "kafka.timestamp" >>>>>> which is another >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> improvement for >>>>>>>>>>>>>>>>>>>> properties >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLINK-12557) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> A single >>>>>> "timestamp" in properties may mislead >>>>>>>>>> users >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> field >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a rowtime >>>>>> attribute. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I also left some >>>>>> minor comments in the FLIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jark >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, 1 Mar >>>>>> 2020 at 22:30, Dawid Wysakowicz < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I would like to >>>>>> propose an improvement that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>> enable >>>>>>>>>>>>>>>>>>>>>>>> reading >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns from >>>>>> different parts of source records. >>>>>>>>>>>>>>> Besides >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> main >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> payload >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> majority (if >>>>>> not all of the sources) expose >>>>>>>>>>>>>>> additional >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> information. It >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can be simply a >>>>>> read-only metadata such as >>>>>>>>>> offset, >>>>>>>>>>>>>>>>>>>> ingestion >>>>>>>>>>>>>>>>>>>>>>>> time >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> read and write >>>>>> parts of the record that contain >>>>>>>>>>>>>> data >>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> additionally >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> serve different >>>>>> purposes (partitioning, >>>>>>>>>> compaction >>>>>>>>>>>>>>>> etc.), >>>>>>>>>>>>>>>>>>>> e.g. >>>>>>>>>>>>>>>>>>>>>>>> key >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> timestamp in >>>>>> Kafka. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> We should make >>>>>> it possible to read and write >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>> those >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> locations. In >>>>>> this proposal I discuss reading >>>>>>>>>>>>>>>>>> partitioning >>>>>>>>>>>>>>>>>>>>>>>> data, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> completeness >>>>>> this proposal discusses also the >>>>>>>>>>>>>>>>>> partitioning >>>>>>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> writing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data out. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am looking >>>>>> forward to your comments. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> You can access >>>>>> the FLIP here: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dawid >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>> >