Hi Danny, This is not Oracle and MySQL computed column syntax, because there is no "AS" after the type.
Hi everyone, If we want to use "offset INT SYSTEM_METADATA("offset")", then I think we must further discuss about "PERSISED" or "VIRTUAL" keyword for query-sink schema problem. Personally, I think we can use a shorter keyword "METADATA" for "SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a system function and confuse users this looks like a computed column. Best, Jark On Wed, 9 Sep 2020 at 17:23, Danny Chan <danny0...@apache.org> wrote: > "offset INT SYSTEM_METADATA("offset")" > > This is actually Oracle or MySQL style computed column syntax. > > "You are right that one could argue that "timestamp", "headers" are > something like "key" and "value"" > > I have the same feeling, both key value and headers timestamp are *real* > data > stored in the consumed record, they are not computed or generated. > > "Trying to solve everything via properties sounds rather like a hack to > me" > > Things are not that hack if we can unify the routines or the definitions > (all from the computed column way or all from the table options), i also > think that it is a hacky that we mix in 2 kinds of syntax for different > kinds of metadata (read-only and read-write). In this FLIP, we declare the > Kafka key fields with table options but SYSTEM_METADATA for other metadata, > that is a hacky thing or something in-consistent. > > Kurt Young <ykt...@gmail.com> 于2020年9月9日周三 下午4:48写道: > > > I would vote for `offset INT SYSTEM_METADATA("offset")`. > > > > I don't think we can stick with the SQL standard in DDL part forever, > > especially as there are more and more > > requirements coming from different connectors and external systems. > > > > Best, > > Kurt > > > > > > On Wed, Sep 9, 2020 at 4:40 PM Timo Walther <twal...@apache.org> wrote: > > > > > Hi Jark, > > > > > > now we are back at the original design proposed by Dawid :D Yes, we > > > should be cautious about adding new syntax. But the length of this > > > discussion shows that we are looking for a good long-term solution. In > > > this case I would rather vote for a deep integration into the syntax. > > > > > > Computed columns are also not SQL standard compliant. And our DDL is > > > neither, so we have some degree of freedom here. > > > > > > Trying to solve everything via properties sounds rather like a hack to > > > me. You are right that one could argue that "timestamp", "headers" are > > > something like "key" and "value". However, mixing > > > > > > `offset AS SYSTEM_METADATA("offset")` > > > > > > and > > > > > > `'timestamp.field' = 'ts'` > > > > > > looks more confusing to users that an explicit > > > > > > `offset AS CAST(SYSTEM_METADATA("offset") AS INT)` > > > > > > or > > > > > > `offset INT SYSTEM_METADATA("offset")` > > > > > > that is symetric for both source and sink. > > > > > > What do others think? > > > > > > Regards, > > > Timo > > > > > > > > > On 09.09.20 10:09, Jark Wu wrote: > > > > Hi everyone, > > > > > > > > I think we have a conclusion that the writable metadata shouldn't be > > > > defined as a computed column, but a normal column. > > > > > > > > "timestamp STRING SYSTEM_METADATA('timestamp')" is one of the > > approaches. > > > > However, it is not SQL standard compliant, we need to be cautious > > enough > > > > when adding new syntax. > > > > Besides, we have to introduce the `PERSISTED` or `VIRTUAL` keyword to > > > > resolve the query-sink schema problem if it is read-only metadata. > That > > > > adds more stuff to learn for users. > > > > > > > >>From my point of view, the "timestamp", "headers" are something like > > > "key" > > > > and "value" that stores with the real data. So why not define the > > > > "timestamp" in the same way with "key" by using a "timestamp.field" > > > > connector option? > > > > On the other side, the read-only metadata, such as "offset", > shouldn't > > be > > > > defined as a normal column. So why not use the existing computed > column > > > > syntax for such metadata? Then we don't have the query-sink schema > > > problem. > > > > So here is my proposal: > > > > > > > > CREATE TABLE kafka_table ( > > > > id BIGINT, > > > > name STRING, > > > > col1 STRING, > > > > col2 STRING, > > > > ts TIMESTAMP(3) WITH LOCAL TIME ZONE, -- ts is a normal field, > so > > > can > > > > be read and written. > > > > offset AS SYSTEM_METADATA("offset") > > > > ) WITH ( > > > > 'connector' = 'kafka', > > > > 'topic' = 'test-topic', > > > > 'key.fields' = 'id, name', > > > > 'key.format' = 'csv', > > > > 'value.format' = 'avro', > > > > 'timestamp.field' = 'ts' -- define the mapping of Kafka > timestamp > > > > ); > > > > > > > > INSERT INTO kafka_table > > > > SELECT id, name, col1, col2, rowtime FROM another_table; > > > > > > > > I think this can solve all the problems without introducing any new > > > syntax. > > > > The only minor disadvantage is that we separate the definition > > way/syntax > > > > of read-only metadata and read-write fields. > > > > However, I don't think this is a big problem. > > > > > > > > Best, > > > > Jark > > > > > > > > > > > > On Wed, 9 Sep 2020 at 15:09, Timo Walther <twal...@apache.org> > wrote: > > > > > > > >> Hi Kurt, > > > >> > > > >> thanks for sharing your opinion. I'm totally up for not reusing > > computed > > > >> columns. I think Jark was a big supporter of this syntax, @Jark are > > you > > > >> fine with this as well? The non-computed column approach was only a > > > >> "slightly rejected alternative". > > > >> > > > >> Furthermore, we would need to think about how such a new design > > > >> influences the LIKE clause though. > > > >> > > > >> However, we should still keep the `PERSISTED` keyword as it > influences > > > >> the query->sink schema. If you look at the list of metadata for > > existing > > > >> connectors and formats, we currently offer only two writable > metadata > > > >> fields. Otherwise, one would need to declare two tables whenever a > > > >> metadata columns is read (one for the source, one for the sink). > This > > > >> can be quite inconvientient e.g. for just reading the topic. > > > >> > > > >> Regards, > > > >> Timo > > > >> > > > >> > > > >> On 09.09.20 08:52, Kurt Young wrote: > > > >>> I also share the concern that reusing the computed column syntax > but > > > have > > > >>> different semantics > > > >>> would confuse users a lot. > > > >>> > > > >>> Besides, I think metadata fields are conceptually not the same with > > > >>> computed columns. The metadata > > > >>> field is a connector specific thing and it only contains the > > > information > > > >>> that where does the field come > > > >>> from (during source) or where does the field need to write to > (during > > > >>> sink). It's more similar with normal > > > >>> fields, with assumption that all these fields need going to the > data > > > >> part. > > > >>> > > > >>> Thus I'm more lean to the rejected alternative that Timo mentioned. > > > And I > > > >>> think we don't need the > > > >>> PERSISTED keyword, SYSTEM_METADATA should be enough. > > > >>> > > > >>> During implementation, the framework only needs to pass such > <field, > > > >>> metadata field> information to the > > > >>> connector, and the logic of handling such fields inside the > connector > > > >>> should be straightforward. > > > >>> > > > >>> Regarding the downside Timo mentioned: > > > >>> > > > >>>> The disadvantage is that users cannot call UDFs or parse > timestamps. > > > >>> > > > >>> I think this is fairly simple to solve. Since the metadata field > > isn't > > > a > > > >>> computed column anymore, we can support > > > >>> referencing such fields in the computed column. For example: > > > >>> > > > >>> CREATE TABLE kafka_table ( > > > >>> id BIGINT, > > > >>> name STRING, > > > >>> timestamp STRING SYSTEM_METADATA("timestamp"), // get the > > > >> timestamp > > > >>> field from metadata > > > >>> ts AS to_timestamp(timestamp) // normal computed column, > parse > > > the > > > >>> string to TIMESTAMP type by using the metadata field > > > >>> ) WITH ( > > > >>> ... > > > >>> ) > > > >>> > > > >>> Best, > > > >>> Kurt > > > >>> > > > >>> > > > >>> On Tue, Sep 8, 2020 at 11:57 PM Timo Walther <twal...@apache.org> > > > wrote: > > > >>> > > > >>>> Hi Leonard, > > > >>>> > > > >>>> the only alternative I see is that we introduce a concept that is > > > >>>> completely different to computed columns. This is also mentioned > in > > > the > > > >>>> rejected alternative section of the FLIP. Something like: > > > >>>> > > > >>>> CREATE TABLE kafka_table ( > > > >>>> id BIGINT, > > > >>>> name STRING, > > > >>>> timestamp INT SYSTEM_METADATA("timestamp") PERSISTED, > > > >>>> headers MAP<STRING, BYTES> SYSTEM_METADATA("headers") > > PERSISTED > > > >>>> ) WITH ( > > > >>>> ... > > > >>>> ) > > > >>>> > > > >>>> This way we would avoid confusion at all and can easily map > columns > > to > > > >>>> metadata columns. The disadvantage is that users cannot call UDFs > or > > > >>>> parse timestamps. This would need to be done in a real computed > > > column. > > > >>>> > > > >>>> I'm happy about better alternatives. > > > >>>> > > > >>>> Regards, > > > >>>> Timo > > > >>>> > > > >>>> > > > >>>> On 08.09.20 15:37, Leonard Xu wrote: > > > >>>>> HI, Timo > > > >>>>> > > > >>>>> Thanks for driving this FLIP. > > > >>>>> > > > >>>>> Sorry but I have a concern about Writing metadata via > > > DynamicTableSink > > > >>>> section: > > > >>>>> > > > >>>>> CREATE TABLE kafka_table ( > > > >>>>> id BIGINT, > > > >>>>> name STRING, > > > >>>>> timestamp AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT) > > > >> PERSISTED, > > > >>>>> headers AS CAST(SYSTEM_METADATA("headers") AS MAP<STRING, > > > BYTES>) > > > >>>> PERSISTED > > > >>>>> ) WITH ( > > > >>>>> ... > > > >>>>> ) > > > >>>>> An insert statement could look like: > > > >>>>> > > > >>>>> INSERT INTO kafka_table VALUES ( > > > >>>>> (1, "ABC", 1599133672, MAP('checksum', > computeChecksum(...))) > > > >>>>> ) > > > >>>>> > > > >>>>> The proposed INERT syntax does not make sense to me, because it > > > >> contains > > > >>>> computed(generated) column. > > > >>>>> Both SQL server and Postgresql do not allow to insert value to > > > computed > > > >>>> columns even they are persisted, this boke the generated column > > > >> semantics > > > >>>> and may confuse user much. > > > >>>>> > > > >>>>> For SQL server computed column[1]: > > > >>>>>> column_name AS computed_column_expression [ PERSISTED [ NOT > NULL ] > > > >> ]... > > > >>>>>> NOTE: A computed column cannot be the target of an INSERT or > > UPDATE > > > >>>> statement. > > > >>>>> > > > >>>>> For Postgresql generated column[2]: > > > >>>>>> height_in numeric GENERATED ALWAYS AS (height_cm / 2.54) > > STORED > > > >>>>>> NOTE: A generated column cannot be written to directly. In > INSERT > > or > > > >>>> UPDATE commands, a value cannot be specified for a generated > column, > > > but > > > >>>> the keyword DEFAULT may be specified. > > > >>>>> > > > >>>>> It shouldn't be allowed to set/update value for generated column > > > after > > > >>>> lookup the SQL 2016: > > > >>>>>> <insert statement> ::= > > > >>>>>> INSERT INTO <insertion target> <insert columns and source> > > > >>>>>> > > > >>>>>> If <contextually typed table value constructor> CTTVC is > > specified, > > > >>>> then every <contextually typed row > > > >>>>>> value constructor element> simply contained in CTTVC whose > > > >> positionally > > > >>>> corresponding <column name> > > > >>>>>> in <insert column list> references a column of which some > > underlying > > > >>>> column is a generated column shall > > > >>>>>> be a <default specification>. > > > >>>>>> A <default specification> specifies the default value of some > > > >>>> associated item. > > > >>>>> > > > >>>>> > > > >>>>> [1] > > > >>>> > > > >> > > > > > > https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15 > > > >>>> < > > > >>>> > > > >> > > > > > > https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15 > > > >>>>> > > > >>>>> [2] > https://www.postgresql.org/docs/12/ddl-generated-columns.html > > < > > > >>>> https://www.postgresql.org/docs/12/ddl-generated-columns.html> > > > >>>>> > > > >>>>>> 在 2020年9月8日,17:31,Timo Walther <twal...@apache.org> 写道: > > > >>>>>> > > > >>>>>> Hi Jark, > > > >>>>>> > > > >>>>>> according to Flink's and Calcite's casting definition in [1][2] > > > >>>> TIMESTAMP WITH LOCAL TIME ZONE should be castable from BIGINT. If > > not, > > > >> we > > > >>>> will make it possible ;-) > > > >>>>>> > > > >>>>>> I'm aware of DeserializationSchema.getProducedType but I think > > that > > > >>>> this method is actually misplaced. The type should rather be > passed > > to > > > >> the > > > >>>> source itself. > > > >>>>>> > > > >>>>>> For our Kafka SQL source, we will also not use this method > because > > > the > > > >>>> Kafka source will add own metadata in addition to the > > > >>>> DeserializationSchema. So DeserializationSchema.getProducedType > will > > > >> never > > > >>>> be read. > > > >>>>>> > > > >>>>>> For now I suggest to leave out the `DataType` from > > > >>>> DecodingFormat.applyReadableMetadata. Also because the format's > > > physical > > > >>>> type is passed later in `createRuntimeDecoder`. If necessary, it > can > > > be > > > >>>> computed manually by consumedType + metadata types. We will > provide > > a > > > >>>> metadata utility class for that. > > > >>>>>> > > > >>>>>> Regards, > > > >>>>>> Timo > > > >>>>>> > > > >>>>>> > > > >>>>>> [1] > > > >>>> > > > >> > > > > > > https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200 > > > >>>>>> [2] > > > >>>> > > > >> > > > > > > https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254 > > > >>>>>> > > > >>>>>> > > > >>>>>> On 08.09.20 10:52, Jark Wu wrote: > > > >>>>>>> Hi Timo, > > > >>>>>>> The updated CAST SYSTEM_METADATA behavior sounds good to me. I > > just > > > >>>> noticed > > > >>>>>>> that a BIGINT can't be converted to "TIMESTAMP(3) WITH LOCAL > TIME > > > >>>> ZONE". > > > >>>>>>> So maybe we need to support this, or use "TIMESTAMP(3) WITH > LOCAL > > > >> TIME > > > >>>>>>> ZONE" as the defined type of Kafka timestamp? I think this > makes > > > >> sense, > > > >>>>>>> because it represents the milli-seconds since epoch. > > > >>>>>>> Regarding "DeserializationSchema doesn't need TypeInfo", I > don't > > > >> think > > > >>>> so. > > > >>>>>>> The DeserializationSchema implements ResultTypeQueryable, thus > > the > > > >>>>>>> implementation needs to return an output TypeInfo. > > > >>>>>>> Besides, FlinkKafkaConsumer also > > > >>>>>>> calls DeserializationSchema.getProducedType as the produced > type > > of > > > >> the > > > >>>>>>> source function [1]. > > > >>>>>>> Best, > > > >>>>>>> Jark > > > >>>>>>> [1]: > > > >>>>>>> > > > >>>> > > > >> > > > > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066 > > > >>>>>>> On Tue, 8 Sep 2020 at 16:35, Timo Walther <twal...@apache.org> > > > >> wrote: > > > >>>>>>>> Hi everyone, > > > >>>>>>>> > > > >>>>>>>> I updated the FLIP again and hope that I could address the > > > mentioned > > > >>>>>>>> concerns. > > > >>>>>>>> > > > >>>>>>>> @Leonard: Thanks for the explanation. I wasn't aware that > ts_ms > > > and > > > >>>>>>>> source.ts_ms have different semantics. I updated the FLIP and > > > expose > > > >>>> the > > > >>>>>>>> most commonly used properties separately. So frequently used > > > >>>> properties > > > >>>>>>>> are not hidden in the MAP anymore: > > > >>>>>>>> > > > >>>>>>>> debezium-json.ingestion-timestamp > > > >>>>>>>> debezium-json.source.timestamp > > > >>>>>>>> debezium-json.source.database > > > >>>>>>>> debezium-json.source.schema > > > >>>>>>>> debezium-json.source.table > > > >>>>>>>> > > > >>>>>>>> However, since other properties depend on the used > > > connector/vendor, > > > >>>> the > > > >>>>>>>> remaining options are stored in: > > > >>>>>>>> > > > >>>>>>>> debezium-json.source.properties > > > >>>>>>>> > > > >>>>>>>> And accessed with: > > > >>>>>>>> > > > >>>>>>>> CAST(SYSTEM_METADATA('debezium-json.source.properties') AS > > > >> MAP<STRING, > > > >>>>>>>> STRING>)['table'] > > > >>>>>>>> > > > >>>>>>>> Otherwise it is not possible to figure out the value and > column > > > type > > > >>>>>>>> during validation. > > > >>>>>>>> > > > >>>>>>>> @Jark: You convinced me in relaxing the CAST constraints. I > > added > > > a > > > >>>>>>>> dedicacated sub-section to the FLIP: > > > >>>>>>>> > > > >>>>>>>> For making the use of SYSTEM_METADATA easier and avoid nested > > > >> casting > > > >>>> we > > > >>>>>>>> allow explicit casting to a target data type: > > > >>>>>>>> > > > >>>>>>>> rowtime AS CAST(SYSTEM_METADATA("timestamp") AS TIMESTAMP(3) > > WITH > > > >>>> LOCAL > > > >>>>>>>> TIME ZONE) > > > >>>>>>>> > > > >>>>>>>> A connector still produces and consumes the data type returned > > by > > > >>>>>>>> `listMetadata()`. The planner will insert necessary explicit > > > casts. > > > >>>>>>>> > > > >>>>>>>> In any case, the user must provide a CAST such that the > computed > > > >>>> column > > > >>>>>>>> receives a valid data type when constructing the table schema. > > > >>>>>>>> > > > >>>>>>>> "I don't see a reason why > `DecodingFormat#applyReadableMetadata` > > > >>>> needs a > > > >>>>>>>> DataType argument." > > > >>>>>>>> > > > >>>>>>>> Correct he DeserializationSchema doesn't need TypeInfo, it is > > > always > > > >>>>>>>> executed locally. It is the source that needs TypeInfo for > > > >> serializing > > > >>>>>>>> the record to the next operator. And that's this is what we > > > provide. > > > >>>>>>>> > > > >>>>>>>> @Danny: > > > >>>>>>>> > > > >>>>>>>> “SYSTEM_METADATA("offset")` returns the NULL type by default” > > > >>>>>>>> > > > >>>>>>>> We can also use some other means to represent an UNKNOWN data > > > type. > > > >> In > > > >>>>>>>> the Flink type system, we use the NullType for it. The > important > > > >> part > > > >>>> is > > > >>>>>>>> that the final data type is known for the entire computed > > column. > > > >> As I > > > >>>>>>>> mentioned before, I would avoid the suggested option b) that > > would > > > >> be > > > >>>>>>>> similar to your suggestion. The CAST should be enough and > allows > > > for > > > >>>>>>>> complex expressions in the computed column. Option b) would > need > > > >>>> parser > > > >>>>>>>> changes. > > > >>>>>>>> > > > >>>>>>>> Regards, > > > >>>>>>>> Timo > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> On 08.09.20 06:21, Leonard Xu wrote: > > > >>>>>>>>> Hi, Timo > > > >>>>>>>>> > > > >>>>>>>>> Thanks for you explanation and update, I have only one > > question > > > >> for > > > >>>>>>>> the latest FLIP. > > > >>>>>>>>> > > > >>>>>>>>> About the MAP<STRING, STRING> DataType of key > > > >>>> 'debezium-json.source', if > > > >>>>>>>> user want to use the table name metadata, they need to write: > > > >>>>>>>>> tableName STRING AS > CAST(SYSTEM_METADATA('debeuim-json.source') > > > AS > > > >>>>>>>> MAP<STRING, STRING>)['table'] > > > >>>>>>>>> > > > >>>>>>>>> the expression is a little complex for user, Could we only > > > support > > > >>>>>>>> necessary metas with simple DataType as following? > > > >>>>>>>>> tableName STRING AS > > > >>>> CAST(SYSTEM_METADATA('debeuim-json.source.table') AS > > > >>>>>>>> STRING), > > > >>>>>>>>> transactionTime LONG AS > > > >>>>>>>> CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS BIGINT), > > > >>>>>>>>> > > > >>>>>>>>> In this way, we can simplify the expression, the mainly used > > > >>>> metadata in > > > >>>>>>>> changelog format may include > > > >>>> 'database','table','source.ts_ms','ts_ms' from > > > >>>>>>>> my side, > > > >>>>>>>>> maybe we could only support them at first version. > > > >>>>>>>>> > > > >>>>>>>>> Both Debezium and Canal have above four metadata, and I‘m > > willing > > > >> to > > > >>>>>>>> take some subtasks in next development if necessary. > > > >>>>>>>>> > > > >>>>>>>>> Debezium: > > > >>>>>>>>> { > > > >>>>>>>>> "before": null, > > > >>>>>>>>> "after": { "id": 101,"name": "scooter"}, > > > >>>>>>>>> "source": { > > > >>>>>>>>> "db": "inventory", # 1. database > name > > > the > > > >>>>>>>> changelog belongs to. > > > >>>>>>>>> "table": "products", # 2. table name > the > > > >>>> changelog > > > >>>>>>>> belongs to. > > > >>>>>>>>> "ts_ms": 1589355504100, # 3. timestamp of > > the > > > >>>> change > > > >>>>>>>> happened in database system, i.e.: transaction time in > database. > > > >>>>>>>>> "connector": "mysql", > > > >>>>>>>>> …. > > > >>>>>>>>> }, > > > >>>>>>>>> "ts_ms": 1589355606100, # 4. timestamp > when > > > the > > > >>>> debezium > > > >>>>>>>> processed the changelog. > > > >>>>>>>>> "op": "c", > > > >>>>>>>>> "transaction": null > > > >>>>>>>>> } > > > >>>>>>>>> > > > >>>>>>>>> Canal: > > > >>>>>>>>> { > > > >>>>>>>>> "data": [{ "id": "102", "name": "car battery" }], > > > >>>>>>>>> "database": "inventory", # 1. database name the > > > changelog > > > >>>>>>>> belongs to. > > > >>>>>>>>> "table": "products", # 2. table name the > > changelog > > > >>>> belongs > > > >>>>>>>> to. > > > >>>>>>>>> "es": 1589374013000, # 3. execution time of > the > > > >> change > > > >>>> in > > > >>>>>>>> database system, i.e.: transaction time in database. > > > >>>>>>>>> "ts": 1589374013680, # 4. timestamp when the > > > cannal > > > >>>>>>>> processed the changelog. > > > >>>>>>>>> "isDdl": false, > > > >>>>>>>>> "mysqlType": {}, > > > >>>>>>>>> .... > > > >>>>>>>>> } > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> Best > > > >>>>>>>>> Leonard > > > >>>>>>>>> > > > >>>>>>>>>> 在 2020年9月8日,11:57,Danny Chan <yuzhao....@gmail.com> 写道: > > > >>>>>>>>>> > > > >>>>>>>>>> Thanks Timo ~ > > > >>>>>>>>>> > > > >>>>>>>>>> The FLIP was already in pretty good shape, I have only 2 > > > questions > > > >>>> here: > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> 1. “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a > valid > > > >>>> read-only > > > >>>>>>>> computed column for Kafka and can be extracted by the > planner.” > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> What is the pros we follow the SQL-SERVER syntax here ? > > Usually > > > an > > > >>>>>>>> expression return type can be inferred automatically. But I > > guess > > > >>>>>>>> SQL-SERVER does not have function like SYSTEM_METADATA which > > > >> actually > > > >>>> does > > > >>>>>>>> not have a specific return type. > > > >>>>>>>>>> > > > >>>>>>>>>> And why not use the Oracle or MySQL syntax there ? > > > >>>>>>>>>> > > > >>>>>>>>>> column_name [datatype] [GENERATED ALWAYS] AS (expression) > > > >> [VIRTUAL] > > > >>>>>>>>>> Which is more straight-forward. > > > >>>>>>>>>> > > > >>>>>>>>>> 2. “SYSTEM_METADATA("offset")` returns the NULL type by > > default” > > > >>>>>>>>>> > > > >>>>>>>>>> The default type should not be NULL because only NULL > literal > > > does > > > >>>>>>>> that. Usually we use ANY as the type if we do not know the > > > specific > > > >>>> type in > > > >>>>>>>> the SQL context. ANY means the physical value can be any java > > > >> object. > > > >>>>>>>>>> > > > >>>>>>>>>> [1] > > https://oracle-base.com/articles/11g/virtual-columns-11gr1 > > > >>>>>>>>>> [2] > > > >>>>>>>> > > > >>>> > > > >> > > > > > > https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html > > > >>>>>>>>>> > > > >>>>>>>>>> Best, > > > >>>>>>>>>> Danny Chan > > > >>>>>>>>>> 在 2020年9月4日 +0800 PM4:48,Timo Walther <twal...@apache.org > > >,写道: > > > >>>>>>>>>>> Hi everyone, > > > >>>>>>>>>>> > > > >>>>>>>>>>> I completely reworked FLIP-107. It now covers the full > story > > > how > > > >> to > > > >>>>>>>> read > > > >>>>>>>>>>> and write metadata from/to connectors and formats. It > > considers > > > >>>> all of > > > >>>>>>>>>>> the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It > > > >>>> introduces > > > >>>>>>>>>>> the concept of PERSISTED computed columns and leaves out > > > >>>> partitioning > > > >>>>>>>>>>> for now. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Looking forward to your feedback. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Regards, > > > >>>>>>>>>>> Timo > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> On 04.03.20 09:45, Kurt Young wrote: > > > >>>>>>>>>>>> Sorry, forgot one question. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 4. Can we make the value.fields-include more orthogonal? > > Like > > > >> one > > > >>>> can > > > >>>>>>>>>>>> specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP". > > > >>>>>>>>>>>> With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users > can > > > not > > > >>>>>>>> config to > > > >>>>>>>>>>>> just ignore timestamp but keep key. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Best, > > > >>>>>>>>>>>> Kurt > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On Wed, Mar 4, 2020 at 4:42 PM Kurt Young < > ykt...@gmail.com > > > > > > >>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> Hi Dawid, > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> I have a couple of questions around key fields, actually > I > > > also > > > >>>> have > > > >>>>>>>> some > > > >>>>>>>>>>>>> other questions but want to be focused on key fields > first. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 1. I don't fully understand the usage of "key.fields". Is > > > this > > > >>>>>>>> option only > > > >>>>>>>>>>>>> valid during write operation? Because for > > > >>>>>>>>>>>>> reading, I can't imagine how such options can be > applied. I > > > >> would > > > >>>>>>>> expect > > > >>>>>>>>>>>>> that there might be a SYSTEM_METADATA("key") > > > >>>>>>>>>>>>> to read and assign the key to a normal field? > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 2. If "key.fields" is only valid in write operation, I > want > > > to > > > >>>>>>>> propose we > > > >>>>>>>>>>>>> can simplify the options to not introducing > key.format.type > > > and > > > >>>>>>>>>>>>> other related options. I think a single "key.field" (not > > > >> fields) > > > >>>>>>>> would be > > > >>>>>>>>>>>>> enough, users can use UDF to calculate whatever key they > > > >>>>>>>>>>>>> want before sink. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 3. Also I don't want to introduce "value.format.type" and > > > >>>>>>>>>>>>> "value.format.xxx" with the "value" prefix. Not every > > > connector > > > >>>> has a > > > >>>>>>>>>>>>> concept > > > >>>>>>>>>>>>> of key and values. The old parameter "format.type" > already > > > good > > > >>>>>>>> enough to > > > >>>>>>>>>>>>> use. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>> Kurt > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> On Tue, Mar 3, 2020 at 10:40 PM Jark Wu < > imj...@gmail.com> > > > >>>> wrote: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Thanks Dawid, > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> I have two more questions. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> SupportsMetadata > > > >>>>>>>>>>>>>> Introducing SupportsMetadata sounds good to me. But I > have > > > >> some > > > >>>>>>>> questions > > > >>>>>>>>>>>>>> regarding to this interface. > > > >>>>>>>>>>>>>> 1) How do the source know what the expected return type > of > > > >> each > > > >>>>>>>> metadata? > > > >>>>>>>>>>>>>> 2) Where to put the metadata fields? Append to the > > existing > > > >>>> physical > > > >>>>>>>>>>>>>> fields? > > > >>>>>>>>>>>>>> If yes, I would suggest to change the signature to > > > >> `TableSource > > > >>>>>>>>>>>>>> appendMetadataFields(String[] metadataNames, DataType[] > > > >>>>>>>> metadataTypes)` > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> SYSTEM_METADATA("partition") > > > >>>>>>>>>>>>>> Can SYSTEM_METADATA() function be used nested in a > > computed > > > >>>> column > > > >>>>>>>>>>>>>> expression? If yes, how to specify the return type of > > > >>>>>>>> SYSTEM_METADATA? > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>> Jark > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz < > > > >>>>>>>> dwysakow...@apache.org> > > > >>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Hi, > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> 1. I thought a bit more on how the source would emit > the > > > >>>> columns > > > >>>>>>>> and I > > > >>>>>>>>>>>>>>> now see its not exactly the same as regular columns. I > > see > > > a > > > >>>> need > > > >>>>>>>> to > > > >>>>>>>>>>>>>>> elaborate a bit more on that in the FLIP as you asked, > > > Jark. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> I do agree mostly with Danny on how we should do that. > > One > > > >>>>>>>> additional > > > >>>>>>>>>>>>>>> things I would introduce is an > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> interface SupportsMetadata { > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> boolean supportsMetadata(Set<String> metadataFields); > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> TableSource generateMetadataFields(Set<String> > > > >> metadataFields); > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> } > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> This way the source would have to declare/emit only the > > > >>>> requested > > > >>>>>>>>>>>>>>> metadata fields. In order not to clash with user > defined > > > >>>> fields. > > > >>>>>>>> When > > > >>>>>>>>>>>>>>> emitting the metadata field I would prepend the column > > name > > > >>>> with > > > >>>>>>>>>>>>>>> __system_{property_name}. Therefore when requested > > > >>>>>>>>>>>>>>> SYSTEM_METADATA("partition") the source would append a > > > field > > > >>>>>>>>>>>>>>> __system_partition to the schema. This would be never > > > visible > > > >>>> to > > > >>>>>>>> the > > > >>>>>>>>>>>>>>> user as it would be used only for the subsequent > computed > > > >>>> columns. > > > >>>>>>>> If > > > >>>>>>>>>>>>>>> that makes sense to you, I will update the FLIP with > this > > > >>>>>>>> description. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> 2. CAST vs explicit type in computed columns > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Here I agree with Danny. It is also the current state > of > > > the > > > >>>>>>>> proposal. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> 3. Partitioning on computed column vs function > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Here I also agree with Danny. I also think those are > > > >>>> orthogonal. I > > > >>>>>>>> would > > > >>>>>>>>>>>>>>> leave out the STORED computed columns out of the > > > discussion. > > > >> I > > > >>>>>>>> don't see > > > >>>>>>>>>>>>>>> how do they relate to the partitioning. I already put > > both > > > of > > > >>>> those > > > >>>>>>>>>>>>>>> cases in the document. We can either partition on a > > > computed > > > >>>>>>>> column or > > > >>>>>>>>>>>>>>> use a udf in a partioned by clause. I am fine with > > leaving > > > >> out > > > >>>> the > > > >>>>>>>>>>>>>>> partitioning by udf in the first version if you still > > have > > > >> some > > > >>>>>>>>>>>>>> concerns. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> As for your question Danny. It depends which > partitioning > > > >>>> strategy > > > >>>>>>>> you > > > >>>>>>>>>>>>>> use. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> For the HASH partitioning strategy I thought it would > > work > > > as > > > >>>> you > > > >>>>>>>>>>>>>>> explained. It would be N = MOD(expr, num). I am not > sure > > > >>>> though if > > > >>>>>>>> we > > > >>>>>>>>>>>>>>> should introduce the PARTITIONS clause. Usually Flink > > does > > > >> not > > > >>>> own > > > >>>>>>>> the > > > >>>>>>>>>>>>>>> data and the partitions are already an intrinsic > property > > > of > > > >>>> the > > > >>>>>>>>>>>>>>> underlying source e.g. for kafka we do not create > topics, > > > but > > > >>>> we > > > >>>>>>>> just > > > >>>>>>>>>>>>>>> describe pre-existing pre-partitioned topic. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> 4. timestamp vs timestamp.field vs connector.field vs > ... > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> I am fine with changing it to timestamp.field to be > > > >> consistent > > > >>>> with > > > >>>>>>>>>>>>>>> other value.fields and key.fields. Actually that was > also > > > my > > > >>>>>>>> initial > > > >>>>>>>>>>>>>>> proposal in a first draft I prepared. I changed it > > > afterwards > > > >>>> to > > > >>>>>>>> shorten > > > >>>>>>>>>>>>>>> the key. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Dawid > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> On 03/03/2020 09:00, Danny Chan wrote: > > > >>>>>>>>>>>>>>>> Thanks Dawid for bringing up this discussion, I think > it > > > is > > > >> a > > > >>>>>>>> useful > > > >>>>>>>>>>>>>>> feature ~ > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> About how the metadata outputs from source > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> I think it is completely orthogonal, computed column > > push > > > >>>> down is > > > >>>>>>>>>>>>>>> another topic, this should not be a blocker but a > > > promotion, > > > >>>> if we > > > >>>>>>>> do > > > >>>>>>>>>>>>>> not > > > >>>>>>>>>>>>>>> have any filters on the computed column, there is no > need > > > to > > > >>>> do any > > > >>>>>>>>>>>>>>> pushings; the source node just emit the complete record > > > with > > > >>>> full > > > >>>>>>>>>>>>>> metadata > > > >>>>>>>>>>>>>>> with the declared physical schema, then when generating > > the > > > >>>> virtual > > > >>>>>>>>>>>>>>> columns, we would extract the metadata info and output > as > > > >> full > > > >>>>>>>>>>>>>> columns(with > > > >>>>>>>>>>>>>>> full schema). > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> About the type of metadata column > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Personally i prefer explicit type instead of CAST, > they > > > are > > > >>>>>>>> symantic > > > >>>>>>>>>>>>>>> equivalent though, explict type is more > straight-forward > > > and > > > >>>> we can > > > >>>>>>>>>>>>>> declare > > > >>>>>>>>>>>>>>> the nullable attribute there. > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> About option A: partitioning based on acomputed column > > VS > > > >>>> option > > > >>>>>>>> B: > > > >>>>>>>>>>>>>>> partitioning with just a function > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> From the FLIP, it seems that B's partitioning is > > just > > > a > > > >>>> strategy > > > >>>>>>>> when > > > >>>>>>>>>>>>>>> writing data, the partiton column is not included in > the > > > >> table > > > >>>>>>>> schema, > > > >>>>>>>>>>>>>> so > > > >>>>>>>>>>>>>>> it's just useless when reading from that. > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> - Compared to A, we do not need to generate the > > partition > > > >>>> column > > > >>>>>>>> when > > > >>>>>>>>>>>>>>> selecting from the table(but insert into) > > > >>>>>>>>>>>>>>>> - For A we can also mark the column as STORED when we > > want > > > >> to > > > >>>>>>>> persist > > > >>>>>>>>>>>>>>> that > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> So in my opition they are orthogonal, we can support > > > both, i > > > >>>> saw > > > >>>>>>>> that > > > >>>>>>>>>>>>>>> MySQL/Oracle[1][2] would suggest to also define the > > > >> PARTITIONS > > > >>>>>>>> num, and > > > >>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>> partitions are managed under a "tablenamespace", the > > > >> partition > > > >>>> in > > > >>>>>>>> which > > > >>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>> record is stored is partition number N, where N = > > MOD(expr, > > > >>>> num), > > > >>>>>>>> for > > > >>>>>>>>>>>>>> your > > > >>>>>>>>>>>>>>> design, which partiton the record would persist ? > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> [1] > > > >>>>>>>> > https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html > > > >>>>>>>>>>>>>>>> [2] > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>> > > > >>>> > > > >> > > > > > > https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>> Danny Chan > > > >>>>>>>>>>>>>>>> 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz < > > > >>>> dwysakow...@apache.org > > > >>>>>>>>>>>>>>> ,写道: > > > >>>>>>>>>>>>>>>>> Hi Jark, > > > >>>>>>>>>>>>>>>>> Ad. 2 I added a section to discuss relation to > FLIP-63 > > > >>>>>>>>>>>>>>>>> Ad. 3 Yes, I also tried to somewhat keep hierarchy of > > > >>>> properties. > > > >>>>>>>>>>>>>>> Therefore you have the key.format.type. > > > >>>>>>>>>>>>>>>>> I also considered exactly what you are suggesting > > > >> (prefixing > > > >>>> with > > > >>>>>>>>>>>>>>> connector or kafka). I should've put that into an > > > >>>> Option/Rejected > > > >>>>>>>>>>>>>>> alternatives. > > > >>>>>>>>>>>>>>>>> I agree timestamp, key.*, value.* are connector > > > properties. > > > >>>> Why I > > > >>>>>>>>>>>>>>> wanted to suggest not adding that prefix in the first > > > version > > > >>>> is > > > >>>>>>>> that > > > >>>>>>>>>>>>>>> actually all the properties in the WITH section are > > > connector > > > >>>>>>>>>>>>>> properties. > > > >>>>>>>>>>>>>>> Even format is in the end a connector property as some > of > > > the > > > >>>>>>>> sources > > > >>>>>>>>>>>>>> might > > > >>>>>>>>>>>>>>> not have a format, imo. The benefit of not adding the > > > prefix > > > >> is > > > >>>>>>>> that it > > > >>>>>>>>>>>>>>> makes the keys a bit shorter. Imagine prefixing all the > > > >>>> properties > > > >>>>>>>> with > > > >>>>>>>>>>>>>>> connector (or if we go with FLINK-12557: > elasticsearch): > > > >>>>>>>>>>>>>>>>> elasticsearch.key.format.type: csv > > > >>>>>>>>>>>>>>>>> elasticsearch.key.format.field: .... > > > >>>>>>>>>>>>>>>>> elasticsearch.key.format.delimiter: .... > > > >>>>>>>>>>>>>>>>> elasticsearch.key.format.*: .... > > > >>>>>>>>>>>>>>>>> I am fine with doing it though if this is a preferred > > > >>>> approach > > > >>>>>>>> in the > > > >>>>>>>>>>>>>>> community. > > > >>>>>>>>>>>>>>>>> Ad in-line comments: > > > >>>>>>>>>>>>>>>>> I forgot to update the `value.fields.include` > property. > > > It > > > >>>>>>>> should be > > > >>>>>>>>>>>>>>> value.fields-include. Which I think you also suggested > in > > > the > > > >>>>>>>> comment, > > > >>>>>>>>>>>>>>> right? > > > >>>>>>>>>>>>>>>>> As for the cast vs declaring output type of computed > > > >> column. > > > >>>> I > > > >>>>>>>> think > > > >>>>>>>>>>>>>>> it's better not to use CAST, but declare a type of an > > > >>>> expression > > > >>>>>>>> and > > > >>>>>>>>>>>>>> later > > > >>>>>>>>>>>>>>> on infer the output type of SYSTEM_METADATA. The reason > > is > > > I > > > >>>> think > > > >>>>>>>> this > > > >>>>>>>>>>>>>> way > > > >>>>>>>>>>>>>>> it will be easier to implement e.g. filter push downs > > when > > > >>>> working > > > >>>>>>>> with > > > >>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>> native types of the source, e.g. in case of Kafka's > > > offset, i > > > >>>>>>>> think it's > > > >>>>>>>>>>>>>>> better to pushdown long rather than string. This could > > let > > > us > > > >>>> push > > > >>>>>>>>>>>>>>> expression like e.g. offset > 12345 & offset < 59382. > > > >>>> Otherwise we > > > >>>>>>>> would > > > >>>>>>>>>>>>>>> have to push down cast(offset, long) > 12345 && > > > cast(offset, > > > >>>> long) > > > >>>>>>>> < > > > >>>>>>>>>>>>>> 59382. > > > >>>>>>>>>>>>>>> Moreover I think we need to introduce the type for > > computed > > > >>>> columns > > > >>>>>>>>>>>>>> anyway > > > >>>>>>>>>>>>>>> to support functions that infer output type based on > > > expected > > > >>>>>>>> return > > > >>>>>>>>>>>>>> type. > > > >>>>>>>>>>>>>>>>> As for the computed column push down. Yes, > > > SYSTEM_METADATA > > > >>>> would > > > >>>>>>>> have > > > >>>>>>>>>>>>>>> to be pushed down to the source. If it is not possible > > the > > > >>>> planner > > > >>>>>>>>>>>>>> should > > > >>>>>>>>>>>>>>> fail. As far as I know computed columns push down will > be > > > >> part > > > >>>> of > > > >>>>>>>> source > > > >>>>>>>>>>>>>>> rework, won't it? ;) > > > >>>>>>>>>>>>>>>>> As for the persisted computed column. I think it is > > > >>>> completely > > > >>>>>>>>>>>>>>> orthogonal. In my current proposal you can also > partition > > > by > > > >> a > > > >>>>>>>> computed > > > >>>>>>>>>>>>>>> column. The difference between using a udf in > partitioned > > > by > > > >> vs > > > >>>>>>>>>>>>>> partitioned > > > >>>>>>>>>>>>>>> by a computed column is that when you partition by a > > > computed > > > >>>>>>>> column > > > >>>>>>>>>>>>>> this > > > >>>>>>>>>>>>>>> column must be also computed when reading the table. If > > you > > > >>>> use a > > > >>>>>>>> udf in > > > >>>>>>>>>>>>>>> the partitioned by, the expression is computed only > when > > > >>>> inserting > > > >>>>>>>> into > > > >>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>> table. > > > >>>>>>>>>>>>>>>>> Hope this answers some of your questions. Looking > > forward > > > >> for > > > >>>>>>>> further > > > >>>>>>>>>>>>>>> suggestions. > > > >>>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>>> Dawid > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> On 02/03/2020 05:18, Jark Wu wrote: > > > >>>>>>>>>>>>>>>>>> Hi, > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Thanks Dawid for starting such a great discussion. > > > Reaing > > > >>>>>>>> metadata > > > >>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>>> key-part information from source is an important > > feature > > > >> for > > > >>>>>>>>>>>>>> streaming > > > >>>>>>>>>>>>>>>>>> users. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> In general, I agree with the proposal of the FLIP. > > > >>>>>>>>>>>>>>>>>> I will leave my thoughts and comments here: > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> 1) +1 to use connector properties instead of > > introducing > > > >>>> HEADER > > > >>>>>>>>>>>>>>> keyword as > > > >>>>>>>>>>>>>>>>>> the reason you mentioned in the FLIP. > > > >>>>>>>>>>>>>>>>>> 2) we already introduced PARTITIONED BY in FLIP-63. > > > Maybe > > > >> we > > > >>>>>>>> should > > > >>>>>>>>>>>>>>> add a > > > >>>>>>>>>>>>>>>>>> section to explain what's the relationship between > > them. > > > >>>>>>>>>>>>>>>>>> Do their concepts conflict? Could INSERT PARTITION > be > > > used > > > >>>> on > > > >>>>>>>> the > > > >>>>>>>>>>>>>>>>>> PARTITIONED table in this FLIP? > > > >>>>>>>>>>>>>>>>>> 3) Currently, properties are hierarchical in Flink > > SQL. > > > >>>> Shall we > > > >>>>>>>>>>>>>> make > > > >>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>> new introduced properties more hierarchical? > > > >>>>>>>>>>>>>>>>>> For example, "timestamp" => "connector.timestamp"? > > > >>>> (actually, I > > > >>>>>>>>>>>>>>> prefer > > > >>>>>>>>>>>>>>>>>> "kafka.timestamp" which is another improvement for > > > >>>> properties > > > >>>>>>>>>>>>>>> FLINK-12557) > > > >>>>>>>>>>>>>>>>>> A single "timestamp" in properties may mislead users > > > that > > > >>>> the > > > >>>>>>>>>>>>>> field > > > >>>>>>>>>>>>>>> is > > > >>>>>>>>>>>>>>>>>> a rowtime attribute. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> I also left some minor comments in the FLIP. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>>>>>> Jark > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz < > > > >>>>>>>>>>>>>> dwysakow...@apache.org> > > > >>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Hi, > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> I would like to propose an improvement that would > > > enable > > > >>>>>>>> reading > > > >>>>>>>>>>>>>> table > > > >>>>>>>>>>>>>>>>>>> columns from different parts of source records. > > Besides > > > >> the > > > >>>>>>>> main > > > >>>>>>>>>>>>>>> payload > > > >>>>>>>>>>>>>>>>>>> majority (if not all of the sources) expose > > additional > > > >>>>>>>>>>>>>> information. It > > > >>>>>>>>>>>>>>>>>>> can be simply a read-only metadata such as offset, > > > >>>> ingestion > > > >>>>>>>> time > > > >>>>>>>>>>>>>> or a > > > >>>>>>>>>>>>>>>>>>> read and write parts of the record that contain > data > > > but > > > >>>>>>>>>>>>>> additionally > > > >>>>>>>>>>>>>>>>>>> serve different purposes (partitioning, compaction > > > etc.), > > > >>>> e.g. > > > >>>>>>>> key > > > >>>>>>>>>>>>>> or > > > >>>>>>>>>>>>>>>>>>> timestamp in Kafka. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> We should make it possible to read and write data > > from > > > >> all > > > >>>> of > > > >>>>>>>> those > > > >>>>>>>>>>>>>>>>>>> locations. In this proposal I discuss reading > > > >> partitioning > > > >>>>>>>> data, > > > >>>>>>>>>>>>>> for > > > >>>>>>>>>>>>>>>>>>> completeness this proposal discusses also the > > > >> partitioning > > > >>>> when > > > >>>>>>>>>>>>>>> writing > > > >>>>>>>>>>>>>>>>>>> data out. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> I am looking forward to your comments. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> You can access the FLIP here: > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>> > > > >>>> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Dawid > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>> > > > >>>>> > > > >>>>> > > > >>>> > > > >>>> > > > >>> > > > >> > > > >> > > > > > > > > > > > > >