Hi Timo, Thank you very much for the update. It indeed covers the full story in more details. I agree with the proposal.
On 04/09/2020 10:48, Timo Walther wrote: > Hi everyone, > > I completely reworked FLIP-107. It now covers the full story how to > read and write metadata from/to connectors and formats. It considers > all of the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It > introduces the concept of PERSISTED computed columns and leaves out > partitioning for now. > > Looking forward to your feedback. > > Regards, > Timo > > > On 04.03.20 09:45, Kurt Young wrote: >> Sorry, forgot one question. >> >> 4. Can we make the value.fields-include more orthogonal? Like one can >> specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP". >> With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users can not >> config to >> just ignore timestamp but keep key. >> >> Best, >> Kurt >> >> >> On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <ykt...@gmail.com> wrote: >> >>> Hi Dawid, >>> >>> I have a couple of questions around key fields, actually I also have >>> some >>> other questions but want to be focused on key fields first. >>> >>> 1. I don't fully understand the usage of "key.fields". Is this >>> option only >>> valid during write operation? Because for >>> reading, I can't imagine how such options can be applied. I would >>> expect >>> that there might be a SYSTEM_METADATA("key") >>> to read and assign the key to a normal field? >>> >>> 2. If "key.fields" is only valid in write operation, I want to >>> propose we >>> can simplify the options to not introducing key.format.type and >>> other related options. I think a single "key.field" (not fields) >>> would be >>> enough, users can use UDF to calculate whatever key they >>> want before sink. >>> >>> 3. Also I don't want to introduce "value.format.type" and >>> "value.format.xxx" with the "value" prefix. Not every connector has a >>> concept >>> of key and values. The old parameter "format.type" already good >>> enough to >>> use. >>> >>> Best, >>> Kurt >>> >>> >>> On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <imj...@gmail.com> wrote: >>> >>>> Thanks Dawid, >>>> >>>> I have two more questions. >>>> >>>>> SupportsMetadata >>>> Introducing SupportsMetadata sounds good to me. But I have some >>>> questions >>>> regarding to this interface. >>>> 1) How do the source know what the expected return type of each >>>> metadata? >>>> 2) Where to put the metadata fields? Append to the existing physical >>>> fields? >>>> If yes, I would suggest to change the signature to `TableSource >>>> appendMetadataFields(String[] metadataNames, DataType[] >>>> metadataTypes)` >>>> >>>>> SYSTEM_METADATA("partition") >>>> Can SYSTEM_METADATA() function be used nested in a computed column >>>> expression? If yes, how to specify the return type of SYSTEM_METADATA? >>>> >>>> Best, >>>> Jark >>>> >>>> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz <dwysakow...@apache.org> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> 1. I thought a bit more on how the source would emit the columns >>>>> and I >>>>> now see its not exactly the same as regular columns. I see a need to >>>>> elaborate a bit more on that in the FLIP as you asked, Jark. >>>>> >>>>> I do agree mostly with Danny on how we should do that. One additional >>>>> things I would introduce is an >>>>> >>>>> interface SupportsMetadata { >>>>> >>>>> boolean supportsMetadata(Set<String> metadataFields); >>>>> >>>>> TableSource generateMetadataFields(Set<String> metadataFields); >>>>> >>>>> } >>>>> >>>>> This way the source would have to declare/emit only the requested >>>>> metadata fields. In order not to clash with user defined fields. When >>>>> emitting the metadata field I would prepend the column name with >>>>> __system_{property_name}. Therefore when requested >>>>> SYSTEM_METADATA("partition") the source would append a field >>>>> __system_partition to the schema. This would be never visible to the >>>>> user as it would be used only for the subsequent computed columns. If >>>>> that makes sense to you, I will update the FLIP with this >>>>> description. >>>>> >>>>> 2. CAST vs explicit type in computed columns >>>>> >>>>> Here I agree with Danny. It is also the current state of the >>>>> proposal. >>>>> >>>>> 3. Partitioning on computed column vs function >>>>> >>>>> Here I also agree with Danny. I also think those are orthogonal. I >>>>> would >>>>> leave out the STORED computed columns out of the discussion. I >>>>> don't see >>>>> how do they relate to the partitioning. I already put both of those >>>>> cases in the document. We can either partition on a computed >>>>> column or >>>>> use a udf in a partioned by clause. I am fine with leaving out the >>>>> partitioning by udf in the first version if you still have some >>>> concerns. >>>>> >>>>> As for your question Danny. It depends which partitioning strategy >>>>> you >>>> use. >>>>> >>>>> For the HASH partitioning strategy I thought it would work as you >>>>> explained. It would be N = MOD(expr, num). I am not sure though if we >>>>> should introduce the PARTITIONS clause. Usually Flink does not own >>>>> the >>>>> data and the partitions are already an intrinsic property of the >>>>> underlying source e.g. for kafka we do not create topics, but we just >>>>> describe pre-existing pre-partitioned topic. >>>>> >>>>> 4. timestamp vs timestamp.field vs connector.field vs ... >>>>> >>>>> I am fine with changing it to timestamp.field to be consistent with >>>>> other value.fields and key.fields. Actually that was also my initial >>>>> proposal in a first draft I prepared. I changed it afterwards to >>>>> shorten >>>>> the key. >>>>> >>>>> Best, >>>>> >>>>> Dawid >>>>> >>>>> On 03/03/2020 09:00, Danny Chan wrote: >>>>>> Thanks Dawid for bringing up this discussion, I think it is a useful >>>>> feature ~ >>>>>> >>>>>> About how the metadata outputs from source >>>>>> >>>>>> I think it is completely orthogonal, computed column push down is >>>>> another topic, this should not be a blocker but a promotion, if we do >>>> not >>>>> have any filters on the computed column, there is no need to do any >>>>> pushings; the source node just emit the complete record with full >>>> metadata >>>>> with the declared physical schema, then when generating the virtual >>>>> columns, we would extract the metadata info and output as full >>>> columns(with >>>>> full schema). >>>>>> >>>>>> About the type of metadata column >>>>>> >>>>>> Personally i prefer explicit type instead of CAST, they are symantic >>>>> equivalent though, explict type is more straight-forward and we can >>>> declare >>>>> the nullable attribute there. >>>>>> >>>>>> About option A: partitioning based on acomputed column VS option B: >>>>> partitioning with just a function >>>>>> >>>>>> From the FLIP, it seems that B's partitioning is just a strategy >>>>>> when >>>>> writing data, the partiton column is not included in the table >>>>> schema, >>>> so >>>>> it's just useless when reading from that. >>>>>> >>>>>> - Compared to A, we do not need to generate the partition column >>>>>> when >>>>> selecting from the table(but insert into) >>>>>> - For A we can also mark the column as STORED when we want to >>>>>> persist >>>>> that >>>>>> >>>>>> So in my opition they are orthogonal, we can support both, i saw >>>>>> that >>>>> MySQL/Oracle[1][2] would suggest to also define the PARTITIONS >>>>> num, and >>>> the >>>>> partitions are managed under a "tablenamespace", the partition in >>>>> which >>>> the >>>>> record is stored is partition number N, where N = MOD(expr, num), for >>>> your >>>>> design, which partiton the record would persist ? >>>>>> >>>>>> [1] https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html >>>>>> [2] >>>>> >>>> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 >>>> >>>>>> >>>>>> Best, >>>>>> Danny Chan >>>>>> 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz >>>>>> <dwysakow...@apache.org >>>>> ,写道: >>>>>>> Hi Jark, >>>>>>> Ad. 2 I added a section to discuss relation to FLIP-63 >>>>>>> Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. >>>>> Therefore you have the key.format.type. >>>>>>> I also considered exactly what you are suggesting (prefixing with >>>>> connector or kafka). I should've put that into an Option/Rejected >>>>> alternatives. >>>>>>> I agree timestamp, key.*, value.* are connector properties. Why I >>>>> wanted to suggest not adding that prefix in the first version is that >>>>> actually all the properties in the WITH section are connector >>>> properties. >>>>> Even format is in the end a connector property as some of the sources >>>> might >>>>> not have a format, imo. The benefit of not adding the prefix is >>>>> that it >>>>> makes the keys a bit shorter. Imagine prefixing all the properties >>>>> with >>>>> connector (or if we go with FLINK-12557: elasticsearch): >>>>>>> elasticsearch.key.format.type: csv >>>>>>> elasticsearch.key.format.field: .... >>>>>>> elasticsearch.key.format.delimiter: .... >>>>>>> elasticsearch.key.format.*: .... >>>>>>> I am fine with doing it though if this is a preferred approach >>>>>>> in the >>>>> community. >>>>>>> Ad in-line comments: >>>>>>> I forgot to update the `value.fields.include` property. It >>>>>>> should be >>>>> value.fields-include. Which I think you also suggested in the >>>>> comment, >>>>> right? >>>>>>> As for the cast vs declaring output type of computed column. I >>>>>>> think >>>>> it's better not to use CAST, but declare a type of an expression and >>>> later >>>>> on infer the output type of SYSTEM_METADATA. The reason is I think >>>>> this >>>> way >>>>> it will be easier to implement e.g. filter push downs when working >>>>> with >>>> the >>>>> native types of the source, e.g. in case of Kafka's offset, i >>>>> think it's >>>>> better to pushdown long rather than string. This could let us push >>>>> expression like e.g. offset > 12345 & offset < 59382. Otherwise we >>>>> would >>>>> have to push down cast(offset, long) > 12345 && cast(offset, long) < >>>> 59382. >>>>> Moreover I think we need to introduce the type for computed columns >>>> anyway >>>>> to support functions that infer output type based on expected return >>>> type. >>>>>>> As for the computed column push down. Yes, SYSTEM_METADATA would >>>>>>> have >>>>> to be pushed down to the source. If it is not possible the planner >>>> should >>>>> fail. As far as I know computed columns push down will be part of >>>>> source >>>>> rework, won't it? ;) >>>>>>> As for the persisted computed column. I think it is completely >>>>> orthogonal. In my current proposal you can also partition by a >>>>> computed >>>>> column. The difference between using a udf in partitioned by vs >>>> partitioned >>>>> by a computed column is that when you partition by a computed column >>>> this >>>>> column must be also computed when reading the table. If you use a >>>>> udf in >>>>> the partitioned by, the expression is computed only when inserting >>>>> into >>>> the >>>>> table. >>>>>>> Hope this answers some of your questions. Looking forward for >>>>>>> further >>>>> suggestions. >>>>>>> Best, >>>>>>> Dawid >>>>>>> >>>>>>> >>>>>>> On 02/03/2020 05:18, Jark Wu wrote: >>>>>>>> Hi, >>>>>>>> >>>>>>>> Thanks Dawid for starting such a great discussion. Reaing metadata >>>> and >>>>>>>> key-part information from source is an important feature for >>>> streaming >>>>>>>> users. >>>>>>>> >>>>>>>> In general, I agree with the proposal of the FLIP. >>>>>>>> I will leave my thoughts and comments here: >>>>>>>> >>>>>>>> 1) +1 to use connector properties instead of introducing HEADER >>>>> keyword as >>>>>>>> the reason you mentioned in the FLIP. >>>>>>>> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we >>>>>>>> should >>>>> add a >>>>>>>> section to explain what's the relationship between them. >>>>>>>> Do their concepts conflict? Could INSERT PARTITION be used >>>>>>>> on the >>>>>>>> PARTITIONED table in this FLIP? >>>>>>>> 3) Currently, properties are hierarchical in Flink SQL. Shall we >>>> make >>>>> the >>>>>>>> new introduced properties more hierarchical? >>>>>>>> For example, "timestamp" => "connector.timestamp"? >>>>>>>> (actually, I >>>>> prefer >>>>>>>> "kafka.timestamp" which is another improvement for properties >>>>> FLINK-12557) >>>>>>>> A single "timestamp" in properties may mislead users that the >>>> field >>>>> is >>>>>>>> a rowtime attribute. >>>>>>>> >>>>>>>> I also left some minor comments in the FLIP. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Jark >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz < >>>> dwysakow...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I would like to propose an improvement that would enable reading >>>> table >>>>>>>>> columns from different parts of source records. Besides the main >>>>> payload >>>>>>>>> majority (if not all of the sources) expose additional >>>> information. It >>>>>>>>> can be simply a read-only metadata such as offset, ingestion time >>>> or a >>>>>>>>> read and write parts of the record that contain data but >>>> additionally >>>>>>>>> serve different purposes (partitioning, compaction etc.), e.g. >>>>>>>>> key >>>> or >>>>>>>>> timestamp in Kafka. >>>>>>>>> >>>>>>>>> We should make it possible to read and write data from all of >>>>>>>>> those >>>>>>>>> locations. In this proposal I discuss reading partitioning data, >>>> for >>>>>>>>> completeness this proposal discusses also the partitioning when >>>>> writing >>>>>>>>> data out. >>>>>>>>> >>>>>>>>> I am looking forward to your comments. >>>>>>>>> >>>>>>>>> You can access the FLIP here: >>>>>>>>> >>>>>>>>> >>>>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode >>>> >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> >>>>>>>>> Dawid >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>> >>>>> >>>> >>> >> >
signature.asc
Description: OpenPGP digital signature