“Personally, I still like the computed column design more because it allows to have full flexibility to compute the final column”
I have the same feeling, the non-standard syntax "timestamp INT SYSTEM_METADATA("ts")" is neither a computed column nor normal column. It looks very likely a computed column but it's not (no AS keyword there), we should be cautious for such syntax because we use a function as a column constraint. No SQL vendor has such a syntax. Can we just use the SQL keyword as a constraint to mark the column as metadata ? timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL] Note that the "FROM 'field name'" is only needed when the name conflicts with the declared table column name, when there are no conflicts, we can simplify it to: timestamp INT METADATA By default, the field is non-virtual and can be read and written, users need to mark the column as virtual when it is only readable. Timo Walther <twal...@apache.org> 于2020年9月9日周三 下午6:41写道: > Hi everyone, > > "key" and "value" in the properties are a special case because they need > to configure a format. So key and value are more than just metadata. > Jark's example for setting a timestamp would work but as the FLIP > discusses, we have way more metadata fields like headers, epoch-leader, > etc. Having a property for all of this metadata would mess up the WITH > section entirely. Furthermore, we also want to deal with metadata from > the formats. Solving this through properties as well would further > complicate the property design. > > Personally, I still like the computed column design more because it > allows to have full flexibility to compute the final column: > > timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3))) > > Instead of having a helper column and a real column in the table: > > helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3)) > realTimestamp AS adjustTimestamp(helperTimestamp) > > But I see that the discussion leans towards: > > timestamp INT SYSTEM_METADATA("ts") > > Which is fine with me. It is the shortest solution, because we don't > need additional CAST. We can discuss the syntax, so that confusion with > computed columns can be avoided. > > timestamp INT USING SYSTEM_METADATA("ts") > timestamp INT FROM SYSTEM_METADATA("ts") > timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED > > We use `SYSTEM_TIME` for temporal tables. I think prefixing with SYSTEM > makes it clearer that it comes magically from the system. > > What do you think? > > Regards, > Timo > > > > On 09.09.20 11:41, Jark Wu wrote: > > Hi Danny, > > > > This is not Oracle and MySQL computed column syntax, because there is no > > "AS" after the type. > > > > Hi everyone, > > > > If we want to use "offset INT SYSTEM_METADATA("offset")", then I think we > > must further discuss about "PERSISED" or "VIRTUAL" keyword for query-sink > > schema problem. > > Personally, I think we can use a shorter keyword "METADATA" for > > "SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a system > function > > and confuse users this looks like a computed column. > > > > > > Best, > > Jark > > > > > > > > On Wed, 9 Sep 2020 at 17:23, Danny Chan <danny0...@apache.org> wrote: > > > >> "offset INT SYSTEM_METADATA("offset")" > >> > >> This is actually Oracle or MySQL style computed column syntax. > >> > >> "You are right that one could argue that "timestamp", "headers" are > >> something like "key" and "value"" > >> > >> I have the same feeling, both key value and headers timestamp are *real* > >> data > >> stored in the consumed record, they are not computed or generated. > >> > >> "Trying to solve everything via properties sounds rather like a hack to > >> me" > >> > >> Things are not that hack if we can unify the routines or the definitions > >> (all from the computed column way or all from the table options), i also > >> think that it is a hacky that we mix in 2 kinds of syntax for different > >> kinds of metadata (read-only and read-write). In this FLIP, we declare > the > >> Kafka key fields with table options but SYSTEM_METADATA for other > metadata, > >> that is a hacky thing or something in-consistent. > >> > >> Kurt Young <ykt...@gmail.com> 于2020年9月9日周三 下午4:48写道: > >> > >>> I would vote for `offset INT SYSTEM_METADATA("offset")`. > >>> > >>> I don't think we can stick with the SQL standard in DDL part forever, > >>> especially as there are more and more > >>> requirements coming from different connectors and external systems. > >>> > >>> Best, > >>> Kurt > >>> > >>> > >>> On Wed, Sep 9, 2020 at 4:40 PM Timo Walther <twal...@apache.org> > wrote: > >>> > >>>> Hi Jark, > >>>> > >>>> now we are back at the original design proposed by Dawid :D Yes, we > >>>> should be cautious about adding new syntax. But the length of this > >>>> discussion shows that we are looking for a good long-term solution. In > >>>> this case I would rather vote for a deep integration into the syntax. > >>>> > >>>> Computed columns are also not SQL standard compliant. And our DDL is > >>>> neither, so we have some degree of freedom here. > >>>> > >>>> Trying to solve everything via properties sounds rather like a hack to > >>>> me. You are right that one could argue that "timestamp", "headers" are > >>>> something like "key" and "value". However, mixing > >>>> > >>>> `offset AS SYSTEM_METADATA("offset")` > >>>> > >>>> and > >>>> > >>>> `'timestamp.field' = 'ts'` > >>>> > >>>> looks more confusing to users that an explicit > >>>> > >>>> `offset AS CAST(SYSTEM_METADATA("offset") AS INT)` > >>>> > >>>> or > >>>> > >>>> `offset INT SYSTEM_METADATA("offset")` > >>>> > >>>> that is symetric for both source and sink. > >>>> > >>>> What do others think? > >>>> > >>>> Regards, > >>>> Timo > >>>> > >>>> > >>>> On 09.09.20 10:09, Jark Wu wrote: > >>>>> Hi everyone, > >>>>> > >>>>> I think we have a conclusion that the writable metadata shouldn't be > >>>>> defined as a computed column, but a normal column. > >>>>> > >>>>> "timestamp STRING SYSTEM_METADATA('timestamp')" is one of the > >>> approaches. > >>>>> However, it is not SQL standard compliant, we need to be cautious > >>> enough > >>>>> when adding new syntax. > >>>>> Besides, we have to introduce the `PERSISTED` or `VIRTUAL` keyword to > >>>>> resolve the query-sink schema problem if it is read-only metadata. > >> That > >>>>> adds more stuff to learn for users. > >>>>> > >>>>> >From my point of view, the "timestamp", "headers" are something like > >>>> "key" > >>>>> and "value" that stores with the real data. So why not define the > >>>>> "timestamp" in the same way with "key" by using a "timestamp.field" > >>>>> connector option? > >>>>> On the other side, the read-only metadata, such as "offset", > >> shouldn't > >>> be > >>>>> defined as a normal column. So why not use the existing computed > >> column > >>>>> syntax for such metadata? Then we don't have the query-sink schema > >>>> problem. > >>>>> So here is my proposal: > >>>>> > >>>>> CREATE TABLE kafka_table ( > >>>>> id BIGINT, > >>>>> name STRING, > >>>>> col1 STRING, > >>>>> col2 STRING, > >>>>> ts TIMESTAMP(3) WITH LOCAL TIME ZONE, -- ts is a normal field, > >> so > >>>> can > >>>>> be read and written. > >>>>> offset AS SYSTEM_METADATA("offset") > >>>>> ) WITH ( > >>>>> 'connector' = 'kafka', > >>>>> 'topic' = 'test-topic', > >>>>> 'key.fields' = 'id, name', > >>>>> 'key.format' = 'csv', > >>>>> 'value.format' = 'avro', > >>>>> 'timestamp.field' = 'ts' -- define the mapping of Kafka > >> timestamp > >>>>> ); > >>>>> > >>>>> INSERT INTO kafka_table > >>>>> SELECT id, name, col1, col2, rowtime FROM another_table; > >>>>> > >>>>> I think this can solve all the problems without introducing any new > >>>> syntax. > >>>>> The only minor disadvantage is that we separate the definition > >>> way/syntax > >>>>> of read-only metadata and read-write fields. > >>>>> However, I don't think this is a big problem. > >>>>> > >>>>> Best, > >>>>> Jark > >>>>> > >>>>> > >>>>> On Wed, 9 Sep 2020 at 15:09, Timo Walther <twal...@apache.org> > >> wrote: > >>>>> > >>>>>> Hi Kurt, > >>>>>> > >>>>>> thanks for sharing your opinion. I'm totally up for not reusing > >>> computed > >>>>>> columns. I think Jark was a big supporter of this syntax, @Jark are > >>> you > >>>>>> fine with this as well? The non-computed column approach was only a > >>>>>> "slightly rejected alternative". > >>>>>> > >>>>>> Furthermore, we would need to think about how such a new design > >>>>>> influences the LIKE clause though. > >>>>>> > >>>>>> However, we should still keep the `PERSISTED` keyword as it > >> influences > >>>>>> the query->sink schema. If you look at the list of metadata for > >>> existing > >>>>>> connectors and formats, we currently offer only two writable > >> metadata > >>>>>> fields. Otherwise, one would need to declare two tables whenever a > >>>>>> metadata columns is read (one for the source, one for the sink). > >> This > >>>>>> can be quite inconvientient e.g. for just reading the topic. > >>>>>> > >>>>>> Regards, > >>>>>> Timo > >>>>>> > >>>>>> > >>>>>> On 09.09.20 08:52, Kurt Young wrote: > >>>>>>> I also share the concern that reusing the computed column syntax > >> but > >>>> have > >>>>>>> different semantics > >>>>>>> would confuse users a lot. > >>>>>>> > >>>>>>> Besides, I think metadata fields are conceptually not the same with > >>>>>>> computed columns. The metadata > >>>>>>> field is a connector specific thing and it only contains the > >>>> information > >>>>>>> that where does the field come > >>>>>>> from (during source) or where does the field need to write to > >> (during > >>>>>>> sink). It's more similar with normal > >>>>>>> fields, with assumption that all these fields need going to the > >> data > >>>>>> part. > >>>>>>> > >>>>>>> Thus I'm more lean to the rejected alternative that Timo mentioned. > >>>> And I > >>>>>>> think we don't need the > >>>>>>> PERSISTED keyword, SYSTEM_METADATA should be enough. > >>>>>>> > >>>>>>> During implementation, the framework only needs to pass such > >> <field, > >>>>>>> metadata field> information to the > >>>>>>> connector, and the logic of handling such fields inside the > >> connector > >>>>>>> should be straightforward. > >>>>>>> > >>>>>>> Regarding the downside Timo mentioned: > >>>>>>> > >>>>>>>> The disadvantage is that users cannot call UDFs or parse > >> timestamps. > >>>>>>> > >>>>>>> I think this is fairly simple to solve. Since the metadata field > >>> isn't > >>>> a > >>>>>>> computed column anymore, we can support > >>>>>>> referencing such fields in the computed column. For example: > >>>>>>> > >>>>>>> CREATE TABLE kafka_table ( > >>>>>>> id BIGINT, > >>>>>>> name STRING, > >>>>>>> timestamp STRING SYSTEM_METADATA("timestamp"), // get the > >>>>>> timestamp > >>>>>>> field from metadata > >>>>>>> ts AS to_timestamp(timestamp) // normal computed column, > >> parse > >>>> the > >>>>>>> string to TIMESTAMP type by using the metadata field > >>>>>>> ) WITH ( > >>>>>>> ... > >>>>>>> ) > >>>>>>> > >>>>>>> Best, > >>>>>>> Kurt > >>>>>>> > >>>>>>> > >>>>>>> On Tue, Sep 8, 2020 at 11:57 PM Timo Walther <twal...@apache.org> > >>>> wrote: > >>>>>>> > >>>>>>>> Hi Leonard, > >>>>>>>> > >>>>>>>> the only alternative I see is that we introduce a concept that is > >>>>>>>> completely different to computed columns. This is also mentioned > >> in > >>>> the > >>>>>>>> rejected alternative section of the FLIP. Something like: > >>>>>>>> > >>>>>>>> CREATE TABLE kafka_table ( > >>>>>>>> id BIGINT, > >>>>>>>> name STRING, > >>>>>>>> timestamp INT SYSTEM_METADATA("timestamp") PERSISTED, > >>>>>>>> headers MAP<STRING, BYTES> SYSTEM_METADATA("headers") > >>> PERSISTED > >>>>>>>> ) WITH ( > >>>>>>>> ... > >>>>>>>> ) > >>>>>>>> > >>>>>>>> This way we would avoid confusion at all and can easily map > >> columns > >>> to > >>>>>>>> metadata columns. The disadvantage is that users cannot call UDFs > >> or > >>>>>>>> parse timestamps. This would need to be done in a real computed > >>>> column. > >>>>>>>> > >>>>>>>> I'm happy about better alternatives. > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> Timo > >>>>>>>> > >>>>>>>> > >>>>>>>> On 08.09.20 15:37, Leonard Xu wrote: > >>>>>>>>> HI, Timo > >>>>>>>>> > >>>>>>>>> Thanks for driving this FLIP. > >>>>>>>>> > >>>>>>>>> Sorry but I have a concern about Writing metadata via > >>>> DynamicTableSink > >>>>>>>> section: > >>>>>>>>> > >>>>>>>>> CREATE TABLE kafka_table ( > >>>>>>>>> id BIGINT, > >>>>>>>>> name STRING, > >>>>>>>>> timestamp AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT) > >>>>>> PERSISTED, > >>>>>>>>> headers AS CAST(SYSTEM_METADATA("headers") AS MAP<STRING, > >>>> BYTES>) > >>>>>>>> PERSISTED > >>>>>>>>> ) WITH ( > >>>>>>>>> ... > >>>>>>>>> ) > >>>>>>>>> An insert statement could look like: > >>>>>>>>> > >>>>>>>>> INSERT INTO kafka_table VALUES ( > >>>>>>>>> (1, "ABC", 1599133672, MAP('checksum', > >> computeChecksum(...))) > >>>>>>>>> ) > >>>>>>>>> > >>>>>>>>> The proposed INERT syntax does not make sense to me, because it > >>>>>> contains > >>>>>>>> computed(generated) column. > >>>>>>>>> Both SQL server and Postgresql do not allow to insert value to > >>>> computed > >>>>>>>> columns even they are persisted, this boke the generated column > >>>>>> semantics > >>>>>>>> and may confuse user much. > >>>>>>>>> > >>>>>>>>> For SQL server computed column[1]: > >>>>>>>>>> column_name AS computed_column_expression [ PERSISTED [ NOT > >> NULL ] > >>>>>> ]... > >>>>>>>>>> NOTE: A computed column cannot be the target of an INSERT or > >>> UPDATE > >>>>>>>> statement. > >>>>>>>>> > >>>>>>>>> For Postgresql generated column[2]: > >>>>>>>>>> height_in numeric GENERATED ALWAYS AS (height_cm / 2.54) > >>> STORED > >>>>>>>>>> NOTE: A generated column cannot be written to directly. In > >> INSERT > >>> or > >>>>>>>> UPDATE commands, a value cannot be specified for a generated > >> column, > >>>> but > >>>>>>>> the keyword DEFAULT may be specified. > >>>>>>>>> > >>>>>>>>> It shouldn't be allowed to set/update value for generated column > >>>> after > >>>>>>>> lookup the SQL 2016: > >>>>>>>>>> <insert statement> ::= > >>>>>>>>>> INSERT INTO <insertion target> <insert columns and source> > >>>>>>>>>> > >>>>>>>>>> If <contextually typed table value constructor> CTTVC is > >>> specified, > >>>>>>>> then every <contextually typed row > >>>>>>>>>> value constructor element> simply contained in CTTVC whose > >>>>>> positionally > >>>>>>>> corresponding <column name> > >>>>>>>>>> in <insert column list> references a column of which some > >>> underlying > >>>>>>>> column is a generated column shall > >>>>>>>>>> be a <default specification>. > >>>>>>>>>> A <default specification> specifies the default value of some > >>>>>>>> associated item. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> [1] > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15 > >>>>>>>> < > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15 > >>>>>>>>> > >>>>>>>>> [2] > >> https://www.postgresql.org/docs/12/ddl-generated-columns.html > >>> < > >>>>>>>> https://www.postgresql.org/docs/12/ddl-generated-columns.html> > >>>>>>>>> > >>>>>>>>>> 在 2020年9月8日,17:31,Timo Walther <twal...@apache.org> 写道: > >>>>>>>>>> > >>>>>>>>>> Hi Jark, > >>>>>>>>>> > >>>>>>>>>> according to Flink's and Calcite's casting definition in [1][2] > >>>>>>>> TIMESTAMP WITH LOCAL TIME ZONE should be castable from BIGINT. If > >>> not, > >>>>>> we > >>>>>>>> will make it possible ;-) > >>>>>>>>>> > >>>>>>>>>> I'm aware of DeserializationSchema.getProducedType but I think > >>> that > >>>>>>>> this method is actually misplaced. The type should rather be > >> passed > >>> to > >>>>>> the > >>>>>>>> source itself. > >>>>>>>>>> > >>>>>>>>>> For our Kafka SQL source, we will also not use this method > >> because > >>>> the > >>>>>>>> Kafka source will add own metadata in addition to the > >>>>>>>> DeserializationSchema. So DeserializationSchema.getProducedType > >> will > >>>>>> never > >>>>>>>> be read. > >>>>>>>>>> > >>>>>>>>>> For now I suggest to leave out the `DataType` from > >>>>>>>> DecodingFormat.applyReadableMetadata. Also because the format's > >>>> physical > >>>>>>>> type is passed later in `createRuntimeDecoder`. If necessary, it > >> can > >>>> be > >>>>>>>> computed manually by consumedType + metadata types. We will > >> provide > >>> a > >>>>>>>> metadata utility class for that. > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> Timo > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> [1] > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200 > >>>>>>>>>> [2] > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254 > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On 08.09.20 10:52, Jark Wu wrote: > >>>>>>>>>>> Hi Timo, > >>>>>>>>>>> The updated CAST SYSTEM_METADATA behavior sounds good to me. I > >>> just > >>>>>>>> noticed > >>>>>>>>>>> that a BIGINT can't be converted to "TIMESTAMP(3) WITH LOCAL > >> TIME > >>>>>>>> ZONE". > >>>>>>>>>>> So maybe we need to support this, or use "TIMESTAMP(3) WITH > >> LOCAL > >>>>>> TIME > >>>>>>>>>>> ZONE" as the defined type of Kafka timestamp? I think this > >> makes > >>>>>> sense, > >>>>>>>>>>> because it represents the milli-seconds since epoch. > >>>>>>>>>>> Regarding "DeserializationSchema doesn't need TypeInfo", I > >> don't > >>>>>> think > >>>>>>>> so. > >>>>>>>>>>> The DeserializationSchema implements ResultTypeQueryable, thus > >>> the > >>>>>>>>>>> implementation needs to return an output TypeInfo. > >>>>>>>>>>> Besides, FlinkKafkaConsumer also > >>>>>>>>>>> calls DeserializationSchema.getProducedType as the produced > >> type > >>> of > >>>>>> the > >>>>>>>>>>> source function [1]. > >>>>>>>>>>> Best, > >>>>>>>>>>> Jark > >>>>>>>>>>> [1]: > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066 > >>>>>>>>>>> On Tue, 8 Sep 2020 at 16:35, Timo Walther <twal...@apache.org> > >>>>>> wrote: > >>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>> > >>>>>>>>>>>> I updated the FLIP again and hope that I could address the > >>>> mentioned > >>>>>>>>>>>> concerns. > >>>>>>>>>>>> > >>>>>>>>>>>> @Leonard: Thanks for the explanation. I wasn't aware that > >> ts_ms > >>>> and > >>>>>>>>>>>> source.ts_ms have different semantics. I updated the FLIP and > >>>> expose > >>>>>>>> the > >>>>>>>>>>>> most commonly used properties separately. So frequently used > >>>>>>>> properties > >>>>>>>>>>>> are not hidden in the MAP anymore: > >>>>>>>>>>>> > >>>>>>>>>>>> debezium-json.ingestion-timestamp > >>>>>>>>>>>> debezium-json.source.timestamp > >>>>>>>>>>>> debezium-json.source.database > >>>>>>>>>>>> debezium-json.source.schema > >>>>>>>>>>>> debezium-json.source.table > >>>>>>>>>>>> > >>>>>>>>>>>> However, since other properties depend on the used > >>>> connector/vendor, > >>>>>>>> the > >>>>>>>>>>>> remaining options are stored in: > >>>>>>>>>>>> > >>>>>>>>>>>> debezium-json.source.properties > >>>>>>>>>>>> > >>>>>>>>>>>> And accessed with: > >>>>>>>>>>>> > >>>>>>>>>>>> CAST(SYSTEM_METADATA('debezium-json.source.properties') AS > >>>>>> MAP<STRING, > >>>>>>>>>>>> STRING>)['table'] > >>>>>>>>>>>> > >>>>>>>>>>>> Otherwise it is not possible to figure out the value and > >> column > >>>> type > >>>>>>>>>>>> during validation. > >>>>>>>>>>>> > >>>>>>>>>>>> @Jark: You convinced me in relaxing the CAST constraints. I > >>> added > >>>> a > >>>>>>>>>>>> dedicacated sub-section to the FLIP: > >>>>>>>>>>>> > >>>>>>>>>>>> For making the use of SYSTEM_METADATA easier and avoid nested > >>>>>> casting > >>>>>>>> we > >>>>>>>>>>>> allow explicit casting to a target data type: > >>>>>>>>>>>> > >>>>>>>>>>>> rowtime AS CAST(SYSTEM_METADATA("timestamp") AS TIMESTAMP(3) > >>> WITH > >>>>>>>> LOCAL > >>>>>>>>>>>> TIME ZONE) > >>>>>>>>>>>> > >>>>>>>>>>>> A connector still produces and consumes the data type returned > >>> by > >>>>>>>>>>>> `listMetadata()`. The planner will insert necessary explicit > >>>> casts. > >>>>>>>>>>>> > >>>>>>>>>>>> In any case, the user must provide a CAST such that the > >> computed > >>>>>>>> column > >>>>>>>>>>>> receives a valid data type when constructing the table schema. > >>>>>>>>>>>> > >>>>>>>>>>>> "I don't see a reason why > >> `DecodingFormat#applyReadableMetadata` > >>>>>>>> needs a > >>>>>>>>>>>> DataType argument." > >>>>>>>>>>>> > >>>>>>>>>>>> Correct he DeserializationSchema doesn't need TypeInfo, it is > >>>> always > >>>>>>>>>>>> executed locally. It is the source that needs TypeInfo for > >>>>>> serializing > >>>>>>>>>>>> the record to the next operator. And that's this is what we > >>>> provide. > >>>>>>>>>>>> > >>>>>>>>>>>> @Danny: > >>>>>>>>>>>> > >>>>>>>>>>>> “SYSTEM_METADATA("offset")` returns the NULL type by default” > >>>>>>>>>>>> > >>>>>>>>>>>> We can also use some other means to represent an UNKNOWN data > >>>> type. > >>>>>> In > >>>>>>>>>>>> the Flink type system, we use the NullType for it. The > >> important > >>>>>> part > >>>>>>>> is > >>>>>>>>>>>> that the final data type is known for the entire computed > >>> column. > >>>>>> As I > >>>>>>>>>>>> mentioned before, I would avoid the suggested option b) that > >>> would > >>>>>> be > >>>>>>>>>>>> similar to your suggestion. The CAST should be enough and > >> allows > >>>> for > >>>>>>>>>>>> complex expressions in the computed column. Option b) would > >> need > >>>>>>>> parser > >>>>>>>>>>>> changes. > >>>>>>>>>>>> > >>>>>>>>>>>> Regards, > >>>>>>>>>>>> Timo > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On 08.09.20 06:21, Leonard Xu wrote: > >>>>>>>>>>>>> Hi, Timo > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for you explanation and update, I have only one > >>> question > >>>>>> for > >>>>>>>>>>>> the latest FLIP. > >>>>>>>>>>>>> > >>>>>>>>>>>>> About the MAP<STRING, STRING> DataType of key > >>>>>>>> 'debezium-json.source', if > >>>>>>>>>>>> user want to use the table name metadata, they need to write: > >>>>>>>>>>>>> tableName STRING AS > >> CAST(SYSTEM_METADATA('debeuim-json.source') > >>>> AS > >>>>>>>>>>>> MAP<STRING, STRING>)['table'] > >>>>>>>>>>>>> > >>>>>>>>>>>>> the expression is a little complex for user, Could we only > >>>> support > >>>>>>>>>>>> necessary metas with simple DataType as following? > >>>>>>>>>>>>> tableName STRING AS > >>>>>>>> CAST(SYSTEM_METADATA('debeuim-json.source.table') AS > >>>>>>>>>>>> STRING), > >>>>>>>>>>>>> transactionTime LONG AS > >>>>>>>>>>>> CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS BIGINT), > >>>>>>>>>>>>> > >>>>>>>>>>>>> In this way, we can simplify the expression, the mainly used > >>>>>>>> metadata in > >>>>>>>>>>>> changelog format may include > >>>>>>>> 'database','table','source.ts_ms','ts_ms' from > >>>>>>>>>>>> my side, > >>>>>>>>>>>>> maybe we could only support them at first version. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Both Debezium and Canal have above four metadata, and I‘m > >>> willing > >>>>>> to > >>>>>>>>>>>> take some subtasks in next development if necessary. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Debezium: > >>>>>>>>>>>>> { > >>>>>>>>>>>>> "before": null, > >>>>>>>>>>>>> "after": { "id": 101,"name": "scooter"}, > >>>>>>>>>>>>> "source": { > >>>>>>>>>>>>> "db": "inventory", # 1. database > >> name > >>>> the > >>>>>>>>>>>> changelog belongs to. > >>>>>>>>>>>>> "table": "products", # 2. table name > >> the > >>>>>>>> changelog > >>>>>>>>>>>> belongs to. > >>>>>>>>>>>>> "ts_ms": 1589355504100, # 3. timestamp > of > >>> the > >>>>>>>> change > >>>>>>>>>>>> happened in database system, i.e.: transaction time in > >> database. > >>>>>>>>>>>>> "connector": "mysql", > >>>>>>>>>>>>> …. > >>>>>>>>>>>>> }, > >>>>>>>>>>>>> "ts_ms": 1589355606100, # 4. timestamp > >> when > >>>> the > >>>>>>>> debezium > >>>>>>>>>>>> processed the changelog. > >>>>>>>>>>>>> "op": "c", > >>>>>>>>>>>>> "transaction": null > >>>>>>>>>>>>> } > >>>>>>>>>>>>> > >>>>>>>>>>>>> Canal: > >>>>>>>>>>>>> { > >>>>>>>>>>>>> "data": [{ "id": "102", "name": "car battery" }], > >>>>>>>>>>>>> "database": "inventory", # 1. database name the > >>>> changelog > >>>>>>>>>>>> belongs to. > >>>>>>>>>>>>> "table": "products", # 2. table name the > >>> changelog > >>>>>>>> belongs > >>>>>>>>>>>> to. > >>>>>>>>>>>>> "es": 1589374013000, # 3. execution time of > >> the > >>>>>> change > >>>>>>>> in > >>>>>>>>>>>> database system, i.e.: transaction time in database. > >>>>>>>>>>>>> "ts": 1589374013680, # 4. timestamp when the > >>>> cannal > >>>>>>>>>>>> processed the changelog. > >>>>>>>>>>>>> "isDdl": false, > >>>>>>>>>>>>> "mysqlType": {}, > >>>>>>>>>>>>> .... > >>>>>>>>>>>>> } > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Best > >>>>>>>>>>>>> Leonard > >>>>>>>>>>>>> > >>>>>>>>>>>>>> 在 2020年9月8日,11:57,Danny Chan <yuzhao....@gmail.com> 写道: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks Timo ~ > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> The FLIP was already in pretty good shape, I have only 2 > >>>> questions > >>>>>>>> here: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a > >> valid > >>>>>>>> read-only > >>>>>>>>>>>> computed column for Kafka and can be extracted by the > >> planner.” > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> What is the pros we follow the SQL-SERVER syntax here ? > >>> Usually > >>>> an > >>>>>>>>>>>> expression return type can be inferred automatically. But I > >>> guess > >>>>>>>>>>>> SQL-SERVER does not have function like SYSTEM_METADATA which > >>>>>> actually > >>>>>>>> does > >>>>>>>>>>>> not have a specific return type. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> And why not use the Oracle or MySQL syntax there ? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> column_name [datatype] [GENERATED ALWAYS] AS (expression) > >>>>>> [VIRTUAL] > >>>>>>>>>>>>>> Which is more straight-forward. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2. “SYSTEM_METADATA("offset")` returns the NULL type by > >>> default” > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> The default type should not be NULL because only NULL > >> literal > >>>> does > >>>>>>>>>>>> that. Usually we use ANY as the type if we do not know the > >>>> specific > >>>>>>>> type in > >>>>>>>>>>>> the SQL context. ANY means the physical value can be any java > >>>>>> object. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> [1] > >>> https://oracle-base.com/articles/11g/virtual-columns-11gr1 > >>>>>>>>>>>>>> [2] > >>>>>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>> Danny Chan > >>>>>>>>>>>>>> 在 2020年9月4日 +0800 PM4:48,Timo Walther <twal...@apache.org > >>>> ,写道: > >>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I completely reworked FLIP-107. It now covers the full > >> story > >>>> how > >>>>>> to > >>>>>>>>>>>> read > >>>>>>>>>>>>>>> and write metadata from/to connectors and formats. It > >>> considers > >>>>>>>> all of > >>>>>>>>>>>>>>> the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It > >>>>>>>> introduces > >>>>>>>>>>>>>>> the concept of PERSISTED computed columns and leaves out > >>>>>>>> partitioning > >>>>>>>>>>>>>>> for now. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Looking forward to your feedback. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 04.03.20 09:45, Kurt Young wrote: > >>>>>>>>>>>>>>>> Sorry, forgot one question. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 4. Can we make the value.fields-include more orthogonal? > >>> Like > >>>>>> one > >>>>>>>> can > >>>>>>>>>>>>>>>> specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP". > >>>>>>>>>>>>>>>> With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users > >> can > >>>> not > >>>>>>>>>>>> config to > >>>>>>>>>>>>>>>> just ignore timestamp but keep key. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>> Kurt > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Wed, Mar 4, 2020 at 4:42 PM Kurt Young < > >> ykt...@gmail.com > >>>> > >>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi Dawid, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I have a couple of questions around key fields, actually > >> I > >>>> also > >>>>>>>> have > >>>>>>>>>>>> some > >>>>>>>>>>>>>>>>> other questions but want to be focused on key fields > >> first. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 1. I don't fully understand the usage of "key.fields". Is > >>>> this > >>>>>>>>>>>> option only > >>>>>>>>>>>>>>>>> valid during write operation? Because for > >>>>>>>>>>>>>>>>> reading, I can't imagine how such options can be > >> applied. I > >>>>>> would > >>>>>>>>>>>> expect > >>>>>>>>>>>>>>>>> that there might be a SYSTEM_METADATA("key") > >>>>>>>>>>>>>>>>> to read and assign the key to a normal field? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 2. If "key.fields" is only valid in write operation, I > >> want > >>>> to > >>>>>>>>>>>> propose we > >>>>>>>>>>>>>>>>> can simplify the options to not introducing > >> key.format.type > >>>> and > >>>>>>>>>>>>>>>>> other related options. I think a single "key.field" (not > >>>>>> fields) > >>>>>>>>>>>> would be > >>>>>>>>>>>>>>>>> enough, users can use UDF to calculate whatever key they > >>>>>>>>>>>>>>>>> want before sink. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 3. Also I don't want to introduce "value.format.type" and > >>>>>>>>>>>>>>>>> "value.format.xxx" with the "value" prefix. Not every > >>>> connector > >>>>>>>> has a > >>>>>>>>>>>>>>>>> concept > >>>>>>>>>>>>>>>>> of key and values. The old parameter "format.type" > >> already > >>>> good > >>>>>>>>>>>> enough to > >>>>>>>>>>>>>>>>> use. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>> Kurt > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Tue, Mar 3, 2020 at 10:40 PM Jark Wu < > >> imj...@gmail.com> > >>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks Dawid, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I have two more questions. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> SupportsMetadata > >>>>>>>>>>>>>>>>>> Introducing SupportsMetadata sounds good to me. But I > >> have > >>>>>> some > >>>>>>>>>>>> questions > >>>>>>>>>>>>>>>>>> regarding to this interface. > >>>>>>>>>>>>>>>>>> 1) How do the source know what the expected return type > >> of > >>>>>> each > >>>>>>>>>>>> metadata? > >>>>>>>>>>>>>>>>>> 2) Where to put the metadata fields? Append to the > >>> existing > >>>>>>>> physical > >>>>>>>>>>>>>>>>>> fields? > >>>>>>>>>>>>>>>>>> If yes, I would suggest to change the signature to > >>>>>> `TableSource > >>>>>>>>>>>>>>>>>> appendMetadataFields(String[] metadataNames, DataType[] > >>>>>>>>>>>> metadataTypes)` > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> SYSTEM_METADATA("partition") > >>>>>>>>>>>>>>>>>> Can SYSTEM_METADATA() function be used nested in a > >>> computed > >>>>>>>> column > >>>>>>>>>>>>>>>>>> expression? If yes, how to specify the return type of > >>>>>>>>>>>> SYSTEM_METADATA? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>> Jark > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz < > >>>>>>>>>>>> dwysakow...@apache.org> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 1. I thought a bit more on how the source would emit > >> the > >>>>>>>> columns > >>>>>>>>>>>> and I > >>>>>>>>>>>>>>>>>>> now see its not exactly the same as regular columns. I > >>> see > >>>> a > >>>>>>>> need > >>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> elaborate a bit more on that in the FLIP as you asked, > >>>> Jark. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I do agree mostly with Danny on how we should do that. > >>> One > >>>>>>>>>>>> additional > >>>>>>>>>>>>>>>>>>> things I would introduce is an > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> interface SupportsMetadata { > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> boolean supportsMetadata(Set<String> metadataFields); > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> TableSource generateMetadataFields(Set<String> > >>>>>> metadataFields); > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> This way the source would have to declare/emit only the > >>>>>>>> requested > >>>>>>>>>>>>>>>>>>> metadata fields. In order not to clash with user > >> defined > >>>>>>>> fields. > >>>>>>>>>>>> When > >>>>>>>>>>>>>>>>>>> emitting the metadata field I would prepend the column > >>> name > >>>>>>>> with > >>>>>>>>>>>>>>>>>>> __system_{property_name}. Therefore when requested > >>>>>>>>>>>>>>>>>>> SYSTEM_METADATA("partition") the source would append a > >>>> field > >>>>>>>>>>>>>>>>>>> __system_partition to the schema. This would be never > >>>> visible > >>>>>>>> to > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> user as it would be used only for the subsequent > >> computed > >>>>>>>> columns. > >>>>>>>>>>>> If > >>>>>>>>>>>>>>>>>>> that makes sense to you, I will update the FLIP with > >> this > >>>>>>>>>>>> description. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 2. CAST vs explicit type in computed columns > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Here I agree with Danny. It is also the current state > >> of > >>>> the > >>>>>>>>>>>> proposal. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 3. Partitioning on computed column vs function > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Here I also agree with Danny. I also think those are > >>>>>>>> orthogonal. I > >>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>> leave out the STORED computed columns out of the > >>>> discussion. > >>>>>> I > >>>>>>>>>>>> don't see > >>>>>>>>>>>>>>>>>>> how do they relate to the partitioning. I already put > >>> both > >>>> of > >>>>>>>> those > >>>>>>>>>>>>>>>>>>> cases in the document. We can either partition on a > >>>> computed > >>>>>>>>>>>> column or > >>>>>>>>>>>>>>>>>>> use a udf in a partioned by clause. I am fine with > >>> leaving > >>>>>> out > >>>>>>>> the > >>>>>>>>>>>>>>>>>>> partitioning by udf in the first version if you still > >>> have > >>>>>> some > >>>>>>>>>>>>>>>>>> concerns. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> As for your question Danny. It depends which > >> partitioning > >>>>>>>> strategy > >>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>> use. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> For the HASH partitioning strategy I thought it would > >>> work > >>>> as > >>>>>>>> you > >>>>>>>>>>>>>>>>>>> explained. It would be N = MOD(expr, num). I am not > >> sure > >>>>>>>> though if > >>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>> should introduce the PARTITIONS clause. Usually Flink > >>> does > >>>>>> not > >>>>>>>> own > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> data and the partitions are already an intrinsic > >> property > >>>> of > >>>>>>>> the > >>>>>>>>>>>>>>>>>>> underlying source e.g. for kafka we do not create > >> topics, > >>>> but > >>>>>>>> we > >>>>>>>>>>>> just > >>>>>>>>>>>>>>>>>>> describe pre-existing pre-partitioned topic. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 4. timestamp vs timestamp.field vs connector.field vs > >> ... > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I am fine with changing it to timestamp.field to be > >>>>>> consistent > >>>>>>>> with > >>>>>>>>>>>>>>>>>>> other value.fields and key.fields. Actually that was > >> also > >>>> my > >>>>>>>>>>>> initial > >>>>>>>>>>>>>>>>>>> proposal in a first draft I prepared. I changed it > >>>> afterwards > >>>>>>>> to > >>>>>>>>>>>> shorten > >>>>>>>>>>>>>>>>>>> the key. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Dawid > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On 03/03/2020 09:00, Danny Chan wrote: > >>>>>>>>>>>>>>>>>>>> Thanks Dawid for bringing up this discussion, I think > >> it > >>>> is > >>>>>> a > >>>>>>>>>>>> useful > >>>>>>>>>>>>>>>>>>> feature ~ > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> About how the metadata outputs from source > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I think it is completely orthogonal, computed column > >>> push > >>>>>>>> down is > >>>>>>>>>>>>>>>>>>> another topic, this should not be a blocker but a > >>>> promotion, > >>>>>>>> if we > >>>>>>>>>>>> do > >>>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>> have any filters on the computed column, there is no > >> need > >>>> to > >>>>>>>> do any > >>>>>>>>>>>>>>>>>>> pushings; the source node just emit the complete record > >>>> with > >>>>>>>> full > >>>>>>>>>>>>>>>>>> metadata > >>>>>>>>>>>>>>>>>>> with the declared physical schema, then when generating > >>> the > >>>>>>>> virtual > >>>>>>>>>>>>>>>>>>> columns, we would extract the metadata info and output > >> as > >>>>>> full > >>>>>>>>>>>>>>>>>> columns(with > >>>>>>>>>>>>>>>>>>> full schema). > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> About the type of metadata column > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Personally i prefer explicit type instead of CAST, > >> they > >>>> are > >>>>>>>>>>>> symantic > >>>>>>>>>>>>>>>>>>> equivalent though, explict type is more > >> straight-forward > >>>> and > >>>>>>>> we can > >>>>>>>>>>>>>>>>>> declare > >>>>>>>>>>>>>>>>>>> the nullable attribute there. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> About option A: partitioning based on acomputed column > >>> VS > >>>>>>>> option > >>>>>>>>>>>> B: > >>>>>>>>>>>>>>>>>>> partitioning with just a function > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> From the FLIP, it seems that B's partitioning is > >>> just > >>>> a > >>>>>>>> strategy > >>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>>> writing data, the partiton column is not included in > >> the > >>>>>> table > >>>>>>>>>>>> schema, > >>>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>> it's just useless when reading from that. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> - Compared to A, we do not need to generate the > >>> partition > >>>>>>>> column > >>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>>> selecting from the table(but insert into) > >>>>>>>>>>>>>>>>>>>> - For A we can also mark the column as STORED when we > >>> want > >>>>>> to > >>>>>>>>>>>> persist > >>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> So in my opition they are orthogonal, we can support > >>>> both, i > >>>>>>>> saw > >>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>> MySQL/Oracle[1][2] would suggest to also define the > >>>>>> PARTITIONS > >>>>>>>>>>>> num, and > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> partitions are managed under a "tablenamespace", the > >>>>>> partition > >>>>>>>> in > >>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> record is stored is partition number N, where N = > >>> MOD(expr, > >>>>>>>> num), > >>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>> your > >>>>>>>>>>>>>>>>>>> design, which partiton the record would persist ? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> [1] > >>>>>>>>>>>> > >> https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html > >>>>>>>>>>>>>>>>>>>> [2] > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>> Danny Chan > >>>>>>>>>>>>>>>>>>>> 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz < > >>>>>>>> dwysakow...@apache.org > >>>>>>>>>>>>>>>>>>> ,写道: > >>>>>>>>>>>>>>>>>>>>> Hi Jark, > >>>>>>>>>>>>>>>>>>>>> Ad. 2 I added a section to discuss relation to > >> FLIP-63 > >>>>>>>>>>>>>>>>>>>>> Ad. 3 Yes, I also tried to somewhat keep hierarchy of > >>>>>>>> properties. > >>>>>>>>>>>>>>>>>>> Therefore you have the key.format.type. > >>>>>>>>>>>>>>>>>>>>> I also considered exactly what you are suggesting > >>>>>> (prefixing > >>>>>>>> with > >>>>>>>>>>>>>>>>>>> connector or kafka). I should've put that into an > >>>>>>>> Option/Rejected > >>>>>>>>>>>>>>>>>>> alternatives. > >>>>>>>>>>>>>>>>>>>>> I agree timestamp, key.*, value.* are connector > >>>> properties. > >>>>>>>> Why I > >>>>>>>>>>>>>>>>>>> wanted to suggest not adding that prefix in the first > >>>> version > >>>>>>>> is > >>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>> actually all the properties in the WITH section are > >>>> connector > >>>>>>>>>>>>>>>>>> properties. > >>>>>>>>>>>>>>>>>>> Even format is in the end a connector property as some > >> of > >>>> the > >>>>>>>>>>>> sources > >>>>>>>>>>>>>>>>>> might > >>>>>>>>>>>>>>>>>>> not have a format, imo. The benefit of not adding the > >>>> prefix > >>>>>> is > >>>>>>>>>>>> that it > >>>>>>>>>>>>>>>>>>> makes the keys a bit shorter. Imagine prefixing all the > >>>>>>>> properties > >>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>> connector (or if we go with FLINK-12557: > >> elasticsearch): > >>>>>>>>>>>>>>>>>>>>> elasticsearch.key.format.type: csv > >>>>>>>>>>>>>>>>>>>>> elasticsearch.key.format.field: .... > >>>>>>>>>>>>>>>>>>>>> elasticsearch.key.format.delimiter: .... > >>>>>>>>>>>>>>>>>>>>> elasticsearch.key.format.*: .... > >>>>>>>>>>>>>>>>>>>>> I am fine with doing it though if this is a preferred > >>>>>>>> approach > >>>>>>>>>>>> in the > >>>>>>>>>>>>>>>>>>> community. > >>>>>>>>>>>>>>>>>>>>> Ad in-line comments: > >>>>>>>>>>>>>>>>>>>>> I forgot to update the `value.fields.include` > >> property. > >>>> It > >>>>>>>>>>>> should be > >>>>>>>>>>>>>>>>>>> value.fields-include. Which I think you also suggested > >> in > >>>> the > >>>>>>>>>>>> comment, > >>>>>>>>>>>>>>>>>>> right? > >>>>>>>>>>>>>>>>>>>>> As for the cast vs declaring output type of computed > >>>>>> column. > >>>>>>>> I > >>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>> it's better not to use CAST, but declare a type of an > >>>>>>>> expression > >>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>> later > >>>>>>>>>>>>>>>>>>> on infer the output type of SYSTEM_METADATA. The reason > >>> is > >>>> I > >>>>>>>> think > >>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>> way > >>>>>>>>>>>>>>>>>>> it will be easier to implement e.g. filter push downs > >>> when > >>>>>>>> working > >>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> native types of the source, e.g. in case of Kafka's > >>>> offset, i > >>>>>>>>>>>> think it's > >>>>>>>>>>>>>>>>>>> better to pushdown long rather than string. This could > >>> let > >>>> us > >>>>>>>> push > >>>>>>>>>>>>>>>>>>> expression like e.g. offset > 12345 & offset < 59382. > >>>>>>>> Otherwise we > >>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>> have to push down cast(offset, long) > 12345 && > >>>> cast(offset, > >>>>>>>> long) > >>>>>>>>>>>> < > >>>>>>>>>>>>>>>>>> 59382. > >>>>>>>>>>>>>>>>>>> Moreover I think we need to introduce the type for > >>> computed > >>>>>>>> columns > >>>>>>>>>>>>>>>>>> anyway > >>>>>>>>>>>>>>>>>>> to support functions that infer output type based on > >>>> expected > >>>>>>>>>>>> return > >>>>>>>>>>>>>>>>>> type. > >>>>>>>>>>>>>>>>>>>>> As for the computed column push down. Yes, > >>>> SYSTEM_METADATA > >>>>>>>> would > >>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>> to be pushed down to the source. If it is not possible > >>> the > >>>>>>>> planner > >>>>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>> fail. As far as I know computed columns push down will > >> be > >>>>>> part > >>>>>>>> of > >>>>>>>>>>>> source > >>>>>>>>>>>>>>>>>>> rework, won't it? ;) > >>>>>>>>>>>>>>>>>>>>> As for the persisted computed column. I think it is > >>>>>>>> completely > >>>>>>>>>>>>>>>>>>> orthogonal. In my current proposal you can also > >> partition > >>>> by > >>>>>> a > >>>>>>>>>>>> computed > >>>>>>>>>>>>>>>>>>> column. The difference between using a udf in > >> partitioned > >>>> by > >>>>>> vs > >>>>>>>>>>>>>>>>>> partitioned > >>>>>>>>>>>>>>>>>>> by a computed column is that when you partition by a > >>>> computed > >>>>>>>>>>>> column > >>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>> column must be also computed when reading the table. If > >>> you > >>>>>>>> use a > >>>>>>>>>>>> udf in > >>>>>>>>>>>>>>>>>>> the partitioned by, the expression is computed only > >> when > >>>>>>>> inserting > >>>>>>>>>>>> into > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> table. > >>>>>>>>>>>>>>>>>>>>> Hope this answers some of your questions. Looking > >>> forward > >>>>>> for > >>>>>>>>>>>> further > >>>>>>>>>>>>>>>>>>> suggestions. > >>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>> Dawid > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On 02/03/2020 05:18, Jark Wu wrote: > >>>>>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Thanks Dawid for starting such a great discussion. > >>>> Reaing > >>>>>>>>>>>> metadata > >>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>> key-part information from source is an important > >>> feature > >>>>>> for > >>>>>>>>>>>>>>>>>> streaming > >>>>>>>>>>>>>>>>>>>>>> users. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> In general, I agree with the proposal of the FLIP. > >>>>>>>>>>>>>>>>>>>>>> I will leave my thoughts and comments here: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 1) +1 to use connector properties instead of > >>> introducing > >>>>>>>> HEADER > >>>>>>>>>>>>>>>>>>> keyword as > >>>>>>>>>>>>>>>>>>>>>> the reason you mentioned in the FLIP. > >>>>>>>>>>>>>>>>>>>>>> 2) we already introduced PARTITIONED BY in FLIP-63. > >>>> Maybe > >>>>>> we > >>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>> add a > >>>>>>>>>>>>>>>>>>>>>> section to explain what's the relationship between > >>> them. > >>>>>>>>>>>>>>>>>>>>>> Do their concepts conflict? Could INSERT PARTITION > >> be > >>>> used > >>>>>>>> on > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> PARTITIONED table in this FLIP? > >>>>>>>>>>>>>>>>>>>>>> 3) Currently, properties are hierarchical in Flink > >>> SQL. > >>>>>>>> Shall we > >>>>>>>>>>>>>>>>>> make > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> new introduced properties more hierarchical? > >>>>>>>>>>>>>>>>>>>>>> For example, "timestamp" => "connector.timestamp"? > >>>>>>>> (actually, I > >>>>>>>>>>>>>>>>>>> prefer > >>>>>>>>>>>>>>>>>>>>>> "kafka.timestamp" which is another improvement for > >>>>>>>> properties > >>>>>>>>>>>>>>>>>>> FLINK-12557) > >>>>>>>>>>>>>>>>>>>>>> A single "timestamp" in properties may mislead users > >>>> that > >>>>>>>> the > >>>>>>>>>>>>>>>>>> field > >>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>> a rowtime attribute. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> I also left some minor comments in the FLIP. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>> Jark > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz < > >>>>>>>>>>>>>>>>>> dwysakow...@apache.org> > >>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I would like to propose an improvement that would > >>>> enable > >>>>>>>>>>>> reading > >>>>>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>>>>>>> columns from different parts of source records. > >>> Besides > >>>>>> the > >>>>>>>>>>>> main > >>>>>>>>>>>>>>>>>>> payload > >>>>>>>>>>>>>>>>>>>>>>> majority (if not all of the sources) expose > >>> additional > >>>>>>>>>>>>>>>>>> information. It > >>>>>>>>>>>>>>>>>>>>>>> can be simply a read-only metadata such as offset, > >>>>>>>> ingestion > >>>>>>>>>>>> time > >>>>>>>>>>>>>>>>>> or a > >>>>>>>>>>>>>>>>>>>>>>> read and write parts of the record that contain > >> data > >>>> but > >>>>>>>>>>>>>>>>>> additionally > >>>>>>>>>>>>>>>>>>>>>>> serve different purposes (partitioning, compaction > >>>> etc.), > >>>>>>>> e.g. > >>>>>>>>>>>> key > >>>>>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>>>>> timestamp in Kafka. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> We should make it possible to read and write data > >>> from > >>>>>> all > >>>>>>>> of > >>>>>>>>>>>> those > >>>>>>>>>>>>>>>>>>>>>>> locations. In this proposal I discuss reading > >>>>>> partitioning > >>>>>>>>>>>> data, > >>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>> completeness this proposal discusses also the > >>>>>> partitioning > >>>>>>>> when > >>>>>>>>>>>>>>>>>>> writing > >>>>>>>>>>>>>>>>>>>>>>> data out. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I am looking forward to your comments. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> You can access the FLIP here: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Dawid > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>> > >> > > > >