Thanks Timo ~ The FLIP was already in pretty good shape, I have only 2 questions here:
1. “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a valid read-only computed column for Kafka and can be extracted by the planner.” What is the pros we follow the SQL-SERVER syntax here ? Usually an expression return type can be inferred automatically. But I guess SQL-SERVER does not have function like SYSTEM_METADATA which actually does not have a specific return type. And why not use the Oracle or MySQL syntax there ? column_name [datatype] [GENERATED ALWAYS] AS (expression) [VIRTUAL] Which is more straight-forward. 2. “SYSTEM_METADATA("offset")` returns the NULL type by default” The default type should not be NULL because only NULL literal does that. Usually we use ANY as the type if we do not know the specific type in the SQL context. ANY means the physical value can be any java object. [1] https://oracle-base.com/articles/11g/virtual-columns-11gr1 [2] https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html Best, Danny Chan 在 2020年9月4日 +0800 PM4:48,Timo Walther <twal...@apache.org>,写道: > Hi everyone, > > I completely reworked FLIP-107. It now covers the full story how to read > and write metadata from/to connectors and formats. It considers all of > the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It introduces > the concept of PERSISTED computed columns and leaves out partitioning > for now. > > Looking forward to your feedback. > > Regards, > Timo > > > On 04.03.20 09:45, Kurt Young wrote: > > Sorry, forgot one question. > > > > 4. Can we make the value.fields-include more orthogonal? Like one can > > specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP". > > With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users can not config to > > just ignore timestamp but keep key. > > > > Best, > > Kurt > > > > > > On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <ykt...@gmail.com> wrote: > > > > > Hi Dawid, > > > > > > I have a couple of questions around key fields, actually I also have some > > > other questions but want to be focused on key fields first. > > > > > > 1. I don't fully understand the usage of "key.fields". Is this option only > > > valid during write operation? Because for > > > reading, I can't imagine how such options can be applied. I would expect > > > that there might be a SYSTEM_METADATA("key") > > > to read and assign the key to a normal field? > > > > > > 2. If "key.fields" is only valid in write operation, I want to propose we > > > can simplify the options to not introducing key.format.type and > > > other related options. I think a single "key.field" (not fields) would be > > > enough, users can use UDF to calculate whatever key they > > > want before sink. > > > > > > 3. Also I don't want to introduce "value.format.type" and > > > "value.format.xxx" with the "value" prefix. Not every connector has a > > > concept > > > of key and values. The old parameter "format.type" already good enough to > > > use. > > > > > > Best, > > > Kurt > > > > > > > > > On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <imj...@gmail.com> wrote: > > > > > > > Thanks Dawid, > > > > > > > > I have two more questions. > > > > > > > > > SupportsMetadata > > > > Introducing SupportsMetadata sounds good to me. But I have some > > > > questions > > > > regarding to this interface. > > > > 1) How do the source know what the expected return type of each > > > > metadata? > > > > 2) Where to put the metadata fields? Append to the existing physical > > > > fields? > > > > If yes, I would suggest to change the signature to `TableSource > > > > appendMetadataFields(String[] metadataNames, DataType[] metadataTypes)` > > > > > > > > > SYSTEM_METADATA("partition") > > > > Can SYSTEM_METADATA() function be used nested in a computed column > > > > expression? If yes, how to specify the return type of SYSTEM_METADATA? > > > > > > > > Best, > > > > Jark > > > > > > > > On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz <dwysakow...@apache.org> > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > 1. I thought a bit more on how the source would emit the columns and I > > > > > now see its not exactly the same as regular columns. I see a need to > > > > > elaborate a bit more on that in the FLIP as you asked, Jark. > > > > > > > > > > I do agree mostly with Danny on how we should do that. One additional > > > > > things I would introduce is an > > > > > > > > > > interface SupportsMetadata { > > > > > > > > > > boolean supportsMetadata(Set<String> metadataFields); > > > > > > > > > > TableSource generateMetadataFields(Set<String> metadataFields); > > > > > > > > > > } > > > > > > > > > > This way the source would have to declare/emit only the requested > > > > > metadata fields. In order not to clash with user defined fields. When > > > > > emitting the metadata field I would prepend the column name with > > > > > __system_{property_name}. Therefore when requested > > > > > SYSTEM_METADATA("partition") the source would append a field > > > > > __system_partition to the schema. This would be never visible to the > > > > > user as it would be used only for the subsequent computed columns. If > > > > > that makes sense to you, I will update the FLIP with this description. > > > > > > > > > > 2. CAST vs explicit type in computed columns > > > > > > > > > > Here I agree with Danny. It is also the current state of the proposal. > > > > > > > > > > 3. Partitioning on computed column vs function > > > > > > > > > > Here I also agree with Danny. I also think those are orthogonal. I > > > > > would > > > > > leave out the STORED computed columns out of the discussion. I don't > > > > > see > > > > > how do they relate to the partitioning. I already put both of those > > > > > cases in the document. We can either partition on a computed column or > > > > > use a udf in a partioned by clause. I am fine with leaving out the > > > > > partitioning by udf in the first version if you still have some > > > > concerns. > > > > > > > > > > As for your question Danny. It depends which partitioning strategy you > > > > use. > > > > > > > > > > For the HASH partitioning strategy I thought it would work as you > > > > > explained. It would be N = MOD(expr, num). I am not sure though if we > > > > > should introduce the PARTITIONS clause. Usually Flink does not own the > > > > > data and the partitions are already an intrinsic property of the > > > > > underlying source e.g. for kafka we do not create topics, but we just > > > > > describe pre-existing pre-partitioned topic. > > > > > > > > > > 4. timestamp vs timestamp.field vs connector.field vs ... > > > > > > > > > > I am fine with changing it to timestamp.field to be consistent with > > > > > other value.fields and key.fields. Actually that was also my initial > > > > > proposal in a first draft I prepared. I changed it afterwards to > > > > > shorten > > > > > the key. > > > > > > > > > > Best, > > > > > > > > > > Dawid > > > > > > > > > > On 03/03/2020 09:00, Danny Chan wrote: > > > > > > Thanks Dawid for bringing up this discussion, I think it is a useful > > > > > feature ~ > > > > > > > > > > > > About how the metadata outputs from source > > > > > > > > > > > > I think it is completely orthogonal, computed column push down is > > > > > another topic, this should not be a blocker but a promotion, if we do > > > > not > > > > > have any filters on the computed column, there is no need to do any > > > > > pushings; the source node just emit the complete record with full > > > > metadata > > > > > with the declared physical schema, then when generating the virtual > > > > > columns, we would extract the metadata info and output as full > > > > columns(with > > > > > full schema). > > > > > > > > > > > > About the type of metadata column > > > > > > > > > > > > Personally i prefer explicit type instead of CAST, they are symantic > > > > > equivalent though, explict type is more straight-forward and we can > > > > declare > > > > > the nullable attribute there. > > > > > > > > > > > > About option A: partitioning based on acomputed column VS option B: > > > > > partitioning with just a function > > > > > > > > > > > > From the FLIP, it seems that B's partitioning is just a strategy > > > > > > when > > > > > writing data, the partiton column is not included in the table schema, > > > > so > > > > > it's just useless when reading from that. > > > > > > > > > > > > - Compared to A, we do not need to generate the partition column > > > > > > when > > > > > selecting from the table(but insert into) > > > > > > - For A we can also mark the column as STORED when we want to > > > > > > persist > > > > > that > > > > > > > > > > > > So in my opition they are orthogonal, we can support both, i saw > > > > > > that > > > > > MySQL/Oracle[1][2] would suggest to also define the PARTITIONS num, > > > > > and > > > > the > > > > > partitions are managed under a "tablenamespace", the partition in > > > > > which > > > > the > > > > > record is stored is partition number N, where N = MOD(expr, num), for > > > > your > > > > > design, which partiton the record would persist ? > > > > > > > > > > > > [1] https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html > > > > > > [2] > > > > > > > > > https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 > > > > > > > > > > > > Best, > > > > > > Danny Chan > > > > > > 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz <dwysakow...@apache.org > > > > > ,写道: > > > > > > > Hi Jark, > > > > > > > Ad. 2 I added a section to discuss relation to FLIP-63 > > > > > > > Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. > > > > > Therefore you have the key.format.type. > > > > > > > I also considered exactly what you are suggesting (prefixing with > > > > > connector or kafka). I should've put that into an Option/Rejected > > > > > alternatives. > > > > > > > I agree timestamp, key.*, value.* are connector properties. Why I > > > > > wanted to suggest not adding that prefix in the first version is that > > > > > actually all the properties in the WITH section are connector > > > > properties. > > > > > Even format is in the end a connector property as some of the sources > > > > might > > > > > not have a format, imo. The benefit of not adding the prefix is that > > > > > it > > > > > makes the keys a bit shorter. Imagine prefixing all the properties > > > > > with > > > > > connector (or if we go with FLINK-12557: elasticsearch): > > > > > > > elasticsearch.key.format.type: csv > > > > > > > elasticsearch.key.format.field: .... > > > > > > > elasticsearch.key.format.delimiter: .... > > > > > > > elasticsearch.key.format.*: .... > > > > > > > I am fine with doing it though if this is a preferred approach in > > > > > > > the > > > > > community. > > > > > > > Ad in-line comments: > > > > > > > I forgot to update the `value.fields.include` property. It should > > > > > > > be > > > > > value.fields-include. Which I think you also suggested in the comment, > > > > > right? > > > > > > > As for the cast vs declaring output type of computed column. I > > > > > > > think > > > > > it's better not to use CAST, but declare a type of an expression and > > > > later > > > > > on infer the output type of SYSTEM_METADATA. The reason is I think > > > > > this > > > > way > > > > > it will be easier to implement e.g. filter push downs when working > > > > > with > > > > the > > > > > native types of the source, e.g. in case of Kafka's offset, i think > > > > > it's > > > > > better to pushdown long rather than string. This could let us push > > > > > expression like e.g. offset > 12345 & offset < 59382. Otherwise we > > > > > would > > > > > have to push down cast(offset, long) > 12345 && cast(offset, long) < > > > > 59382. > > > > > Moreover I think we need to introduce the type for computed columns > > > > anyway > > > > > to support functions that infer output type based on expected return > > > > type. > > > > > > > As for the computed column push down. Yes, SYSTEM_METADATA would > > > > > > > have > > > > > to be pushed down to the source. If it is not possible the planner > > > > should > > > > > fail. As far as I know computed columns push down will be part of > > > > > source > > > > > rework, won't it? ;) > > > > > > > As for the persisted computed column. I think it is completely > > > > > orthogonal. In my current proposal you can also partition by a > > > > > computed > > > > > column. The difference between using a udf in partitioned by vs > > > > partitioned > > > > > by a computed column is that when you partition by a computed column > > > > this > > > > > column must be also computed when reading the table. If you use a udf > > > > > in > > > > > the partitioned by, the expression is computed only when inserting > > > > > into > > > > the > > > > > table. > > > > > > > Hope this answers some of your questions. Looking forward for > > > > > > > further > > > > > suggestions. > > > > > > > Best, > > > > > > > Dawid > > > > > > > > > > > > > > > > > > > > > On 02/03/2020 05:18, Jark Wu wrote: > > > > > > > > Hi, > > > > > > > > > > > > > > > > Thanks Dawid for starting such a great discussion. Reaing > > > > > > > > metadata > > > > and > > > > > > > > key-part information from source is an important feature for > > > > streaming > > > > > > > > users. > > > > > > > > > > > > > > > > In general, I agree with the proposal of the FLIP. > > > > > > > > I will leave my thoughts and comments here: > > > > > > > > > > > > > > > > 1) +1 to use connector properties instead of introducing HEADER > > > > > keyword as > > > > > > > > the reason you mentioned in the FLIP. > > > > > > > > 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we > > > > > > > > should > > > > > add a > > > > > > > > section to explain what's the relationship between them. > > > > > > > > Do their concepts conflict? Could INSERT PARTITION be used on > > > > > > > > the > > > > > > > > PARTITIONED table in this FLIP? > > > > > > > > 3) Currently, properties are hierarchical in Flink SQL. Shall we > > > > make > > > > > the > > > > > > > > new introduced properties more hierarchical? > > > > > > > > For example, "timestamp" => "connector.timestamp"? (actually, I > > > > > prefer > > > > > > > > "kafka.timestamp" which is another improvement for properties > > > > > FLINK-12557) > > > > > > > > A single "timestamp" in properties may mislead users that the > > > > field > > > > > is > > > > > > > > a rowtime attribute. > > > > > > > > > > > > > > > > I also left some minor comments in the FLIP. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Jark > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz < > > > > dwysakow...@apache.org> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > I would like to propose an improvement that would enable > > > > > > > > > reading > > > > table > > > > > > > > > columns from different parts of source records. Besides the > > > > > > > > > main > > > > > payload > > > > > > > > > majority (if not all of the sources) expose additional > > > > information. It > > > > > > > > > can be simply a read-only metadata such as offset, ingestion > > > > > > > > > time > > > > or a > > > > > > > > > read and write parts of the record that contain data but > > > > additionally > > > > > > > > > serve different purposes (partitioning, compaction etc.), > > > > > > > > > e.g. key > > > > or > > > > > > > > > timestamp in Kafka. > > > > > > > > > > > > > > > > > > We should make it possible to read and write data from all of > > > > > > > > > those > > > > > > > > > locations. In this proposal I discuss reading partitioning > > > > > > > > > data, > > > > for > > > > > > > > > completeness this proposal discusses also the partitioning > > > > > > > > > when > > > > > writing > > > > > > > > > data out. > > > > > > > > > > > > > > > > > > I am looking forward to your comments. > > > > > > > > > > > > > > > > > > You can access the FLIP here: > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > Dawid > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >