Hi Timo, Thanks a lot for picking up this FLIP. I believe it's a very important one for almost everyone who uses Flink SQL with Kafka. Also +1 to leave out partitioning for now.
Best, Konstantin On Fri, Sep 4, 2020 at 1:37 PM Aljoscha Krettek <aljos...@apache.org> wrote: > I like the proposal! I didn't check the implementation section in detail > but the SQL DDL examples look good as well as the options for specifying > how fields are mapped to keys/values look good. > > Aljoscha > > On 04.09.20 11:47, Dawid Wysakowicz wrote: > > Hi Timo, > > > > Thank you very much for the update. It indeed covers the full story in > > more details. I agree with the proposal. > > > > On 04/09/2020 10:48, Timo Walther wrote: > >> Hi everyone, > >> > >> I completely reworked FLIP-107. It now covers the full story how to > >> read and write metadata from/to connectors and formats. It considers > >> all of the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It > >> introduces the concept of PERSISTED computed columns and leaves out > >> partitioning for now. > >> > >> Looking forward to your feedback. > >> > >> Regards, > >> Timo > >> > >> > >> On 04.03.20 09:45, Kurt Young wrote: > >>> Sorry, forgot one question. > >>> > >>> 4. Can we make the value.fields-include more orthogonal? Like one can > >>> specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP". > >>> With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users can not > >>> config to > >>> just ignore timestamp but keep key. > >>> > >>> Best, > >>> Kurt > >>> > >>> > >>> On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <ykt...@gmail.com> wrote: > >>> > >>>> Hi Dawid, > >>>> > >>>> I have a couple of questions around key fields, actually I also have > >>>> some > >>>> other questions but want to be focused on key fields first. > >>>> > >>>> 1. I don't fully understand the usage of "key.fields". Is this > >>>> option only > >>>> valid during write operation? Because for > >>>> reading, I can't imagine how such options can be applied. I would > >>>> expect > >>>> that there might be a SYSTEM_METADATA("key") > >>>> to read and assign the key to a normal field? > >>>> > >>>> 2. If "key.fields" is only valid in write operation, I want to > >>>> propose we > >>>> can simplify the options to not introducing key.format.type and > >>>> other related options. I think a single "key.field" (not fields) > >>>> would be > >>>> enough, users can use UDF to calculate whatever key they > >>>> want before sink. > >>>> > >>>> 3. Also I don't want to introduce "value.format.type" and > >>>> "value.format.xxx" with the "value" prefix. Not every connector has a > >>>> concept > >>>> of key and values. The old parameter "format.type" already good > >>>> enough to > >>>> use. > >>>> > >>>> Best, > >>>> Kurt > >>>> > >>>> > >>>> On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <imj...@gmail.com> wrote: > >>>> > >>>>> Thanks Dawid, > >>>>> > >>>>> I have two more questions. > >>>>> > >>>>>> SupportsMetadata > >>>>> Introducing SupportsMetadata sounds good to me. But I have some > >>>>> questions > >>>>> regarding to this interface. > >>>>> 1) How do the source know what the expected return type of each > >>>>> metadata? > >>>>> 2) Where to put the metadata fields? Append to the existing physical > >>>>> fields? > >>>>> If yes, I would suggest to change the signature to `TableSource > >>>>> appendMetadataFields(String[] metadataNames, DataType[] > >>>>> metadataTypes)` > >>>>> > >>>>>> SYSTEM_METADATA("partition") > >>>>> Can SYSTEM_METADATA() function be used nested in a computed column > >>>>> expression? If yes, how to specify the return type of > SYSTEM_METADATA? > >>>>> > >>>>> Best, > >>>>> Jark > >>>>> > >>>>> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz < > dwysakow...@apache.org> > >>>>> wrote: > >>>>> > >>>>>> Hi, > >>>>>> > >>>>>> 1. I thought a bit more on how the source would emit the columns > >>>>>> and I > >>>>>> now see its not exactly the same as regular columns. I see a need to > >>>>>> elaborate a bit more on that in the FLIP as you asked, Jark. > >>>>>> > >>>>>> I do agree mostly with Danny on how we should do that. One > additional > >>>>>> things I would introduce is an > >>>>>> > >>>>>> interface SupportsMetadata { > >>>>>> > >>>>>> boolean supportsMetadata(Set<String> metadataFields); > >>>>>> > >>>>>> TableSource generateMetadataFields(Set<String> metadataFields); > >>>>>> > >>>>>> } > >>>>>> > >>>>>> This way the source would have to declare/emit only the requested > >>>>>> metadata fields. In order not to clash with user defined fields. > When > >>>>>> emitting the metadata field I would prepend the column name with > >>>>>> __system_{property_name}. Therefore when requested > >>>>>> SYSTEM_METADATA("partition") the source would append a field > >>>>>> __system_partition to the schema. This would be never visible to the > >>>>>> user as it would be used only for the subsequent computed columns. > If > >>>>>> that makes sense to you, I will update the FLIP with this > >>>>>> description. > >>>>>> > >>>>>> 2. CAST vs explicit type in computed columns > >>>>>> > >>>>>> Here I agree with Danny. It is also the current state of the > >>>>>> proposal. > >>>>>> > >>>>>> 3. Partitioning on computed column vs function > >>>>>> > >>>>>> Here I also agree with Danny. I also think those are orthogonal. I > >>>>>> would > >>>>>> leave out the STORED computed columns out of the discussion. I > >>>>>> don't see > >>>>>> how do they relate to the partitioning. I already put both of those > >>>>>> cases in the document. We can either partition on a computed > >>>>>> column or > >>>>>> use a udf in a partioned by clause. I am fine with leaving out the > >>>>>> partitioning by udf in the first version if you still have some > >>>>> concerns. > >>>>>> > >>>>>> As for your question Danny. It depends which partitioning strategy > >>>>>> you > >>>>> use. > >>>>>> > >>>>>> For the HASH partitioning strategy I thought it would work as you > >>>>>> explained. It would be N = MOD(expr, num). I am not sure though if > we > >>>>>> should introduce the PARTITIONS clause. Usually Flink does not own > >>>>>> the > >>>>>> data and the partitions are already an intrinsic property of the > >>>>>> underlying source e.g. for kafka we do not create topics, but we > just > >>>>>> describe pre-existing pre-partitioned topic. > >>>>>> > >>>>>> 4. timestamp vs timestamp.field vs connector.field vs ... > >>>>>> > >>>>>> I am fine with changing it to timestamp.field to be consistent with > >>>>>> other value.fields and key.fields. Actually that was also my initial > >>>>>> proposal in a first draft I prepared. I changed it afterwards to > >>>>>> shorten > >>>>>> the key. > >>>>>> > >>>>>> Best, > >>>>>> > >>>>>> Dawid > >>>>>> > >>>>>> On 03/03/2020 09:00, Danny Chan wrote: > >>>>>>> Thanks Dawid for bringing up this discussion, I think it is a > useful > >>>>>> feature ~ > >>>>>>> > >>>>>>> About how the metadata outputs from source > >>>>>>> > >>>>>>> I think it is completely orthogonal, computed column push down is > >>>>>> another topic, this should not be a blocker but a promotion, if we > do > >>>>> not > >>>>>> have any filters on the computed column, there is no need to do any > >>>>>> pushings; the source node just emit the complete record with full > >>>>> metadata > >>>>>> with the declared physical schema, then when generating the virtual > >>>>>> columns, we would extract the metadata info and output as full > >>>>> columns(with > >>>>>> full schema). > >>>>>>> > >>>>>>> About the type of metadata column > >>>>>>> > >>>>>>> Personally i prefer explicit type instead of CAST, they are > symantic > >>>>>> equivalent though, explict type is more straight-forward and we can > >>>>> declare > >>>>>> the nullable attribute there. > >>>>>>> > >>>>>>> About option A: partitioning based on acomputed column VS option B: > >>>>>> partitioning with just a function > >>>>>>> > >>>>>>> From the FLIP, it seems that B's partitioning is just a strategy > >>>>>>> when > >>>>>> writing data, the partiton column is not included in the table > >>>>>> schema, > >>>>> so > >>>>>> it's just useless when reading from that. > >>>>>>> > >>>>>>> - Compared to A, we do not need to generate the partition column > >>>>>>> when > >>>>>> selecting from the table(but insert into) > >>>>>>> - For A we can also mark the column as STORED when we want to > >>>>>>> persist > >>>>>> that > >>>>>>> > >>>>>>> So in my opition they are orthogonal, we can support both, i saw > >>>>>>> that > >>>>>> MySQL/Oracle[1][2] would suggest to also define the PARTITIONS > >>>>>> num, and > >>>>> the > >>>>>> partitions are managed under a "tablenamespace", the partition in > >>>>>> which > >>>>> the > >>>>>> record is stored is partition number N, where N = MOD(expr, num), > for > >>>>> your > >>>>>> design, which partiton the record would persist ? > >>>>>>> > >>>>>>> [1] https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html > >>>>>>> [2] > >>>>>> > >>>>> > https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 > >>>>> > >>>>>>> > >>>>>>> Best, > >>>>>>> Danny Chan > >>>>>>> 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz > >>>>>>> <dwysakow...@apache.org > >>>>>> ,写道: > >>>>>>>> Hi Jark, > >>>>>>>> Ad. 2 I added a section to discuss relation to FLIP-63 > >>>>>>>> Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. > >>>>>> Therefore you have the key.format.type. > >>>>>>>> I also considered exactly what you are suggesting (prefixing with > >>>>>> connector or kafka). I should've put that into an Option/Rejected > >>>>>> alternatives. > >>>>>>>> I agree timestamp, key.*, value.* are connector properties. Why I > >>>>>> wanted to suggest not adding that prefix in the first version is > that > >>>>>> actually all the properties in the WITH section are connector > >>>>> properties. > >>>>>> Even format is in the end a connector property as some of the > sources > >>>>> might > >>>>>> not have a format, imo. The benefit of not adding the prefix is > >>>>>> that it > >>>>>> makes the keys a bit shorter. Imagine prefixing all the properties > >>>>>> with > >>>>>> connector (or if we go with FLINK-12557: elasticsearch): > >>>>>>>> elasticsearch.key.format.type: csv > >>>>>>>> elasticsearch.key.format.field: .... > >>>>>>>> elasticsearch.key.format.delimiter: .... > >>>>>>>> elasticsearch.key.format.*: .... > >>>>>>>> I am fine with doing it though if this is a preferred approach > >>>>>>>> in the > >>>>>> community. > >>>>>>>> Ad in-line comments: > >>>>>>>> I forgot to update the `value.fields.include` property. It > >>>>>>>> should be > >>>>>> value.fields-include. Which I think you also suggested in the > >>>>>> comment, > >>>>>> right? > >>>>>>>> As for the cast vs declaring output type of computed column. I > >>>>>>>> think > >>>>>> it's better not to use CAST, but declare a type of an expression and > >>>>> later > >>>>>> on infer the output type of SYSTEM_METADATA. The reason is I think > >>>>>> this > >>>>> way > >>>>>> it will be easier to implement e.g. filter push downs when working > >>>>>> with > >>>>> the > >>>>>> native types of the source, e.g. in case of Kafka's offset, i > >>>>>> think it's > >>>>>> better to pushdown long rather than string. This could let us push > >>>>>> expression like e.g. offset > 12345 & offset < 59382. Otherwise we > >>>>>> would > >>>>>> have to push down cast(offset, long) > 12345 && cast(offset, long) < > >>>>> 59382. > >>>>>> Moreover I think we need to introduce the type for computed columns > >>>>> anyway > >>>>>> to support functions that infer output type based on expected return > >>>>> type. > >>>>>>>> As for the computed column push down. Yes, SYSTEM_METADATA would > >>>>>>>> have > >>>>>> to be pushed down to the source. If it is not possible the planner > >>>>> should > >>>>>> fail. As far as I know computed columns push down will be part of > >>>>>> source > >>>>>> rework, won't it? ;) > >>>>>>>> As for the persisted computed column. I think it is completely > >>>>>> orthogonal. In my current proposal you can also partition by a > >>>>>> computed > >>>>>> column. The difference between using a udf in partitioned by vs > >>>>> partitioned > >>>>>> by a computed column is that when you partition by a computed column > >>>>> this > >>>>>> column must be also computed when reading the table. If you use a > >>>>>> udf in > >>>>>> the partitioned by, the expression is computed only when inserting > >>>>>> into > >>>>> the > >>>>>> table. > >>>>>>>> Hope this answers some of your questions. Looking forward for > >>>>>>>> further > >>>>>> suggestions. > >>>>>>>> Best, > >>>>>>>> Dawid > >>>>>>>> > >>>>>>>> > >>>>>>>> On 02/03/2020 05:18, Jark Wu wrote: > >>>>>>>>> Hi, > >>>>>>>>> > >>>>>>>>> Thanks Dawid for starting such a great discussion. Reaing > metadata > >>>>> and > >>>>>>>>> key-part information from source is an important feature for > >>>>> streaming > >>>>>>>>> users. > >>>>>>>>> > >>>>>>>>> In general, I agree with the proposal of the FLIP. > >>>>>>>>> I will leave my thoughts and comments here: > >>>>>>>>> > >>>>>>>>> 1) +1 to use connector properties instead of introducing HEADER > >>>>>> keyword as > >>>>>>>>> the reason you mentioned in the FLIP. > >>>>>>>>> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we > >>>>>>>>> should > >>>>>> add a > >>>>>>>>> section to explain what's the relationship between them. > >>>>>>>>> Do their concepts conflict? Could INSERT PARTITION be used > >>>>>>>>> on the > >>>>>>>>> PARTITIONED table in this FLIP? > >>>>>>>>> 3) Currently, properties are hierarchical in Flink SQL. Shall we > >>>>> make > >>>>>> the > >>>>>>>>> new introduced properties more hierarchical? > >>>>>>>>> For example, "timestamp" => "connector.timestamp"? > >>>>>>>>> (actually, I > >>>>>> prefer > >>>>>>>>> "kafka.timestamp" which is another improvement for properties > >>>>>> FLINK-12557) > >>>>>>>>> A single "timestamp" in properties may mislead users that > the > >>>>> field > >>>>>> is > >>>>>>>>> a rowtime attribute. > >>>>>>>>> > >>>>>>>>> I also left some minor comments in the FLIP. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Jark > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz < > >>>>> dwysakow...@apache.org> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi, > >>>>>>>>>> > >>>>>>>>>> I would like to propose an improvement that would enable reading > >>>>> table > >>>>>>>>>> columns from different parts of source records. Besides the main > >>>>>> payload > >>>>>>>>>> majority (if not all of the sources) expose additional > >>>>> information. It > >>>>>>>>>> can be simply a read-only metadata such as offset, ingestion > time > >>>>> or a > >>>>>>>>>> read and write parts of the record that contain data but > >>>>> additionally > >>>>>>>>>> serve different purposes (partitioning, compaction etc.), e.g. > >>>>>>>>>> key > >>>>> or > >>>>>>>>>> timestamp in Kafka. > >>>>>>>>>> > >>>>>>>>>> We should make it possible to read and write data from all of > >>>>>>>>>> those > >>>>>>>>>> locations. In this proposal I discuss reading partitioning data, > >>>>> for > >>>>>>>>>> completeness this proposal discusses also the partitioning when > >>>>>> writing > >>>>>>>>>> data out. > >>>>>>>>>> > >>>>>>>>>> I am looking forward to your comments. > >>>>>>>>>> > >>>>>>>>>> You can access the FLIP here: > >>>>>>>>>> > >>>>>>>>>> > >>>>>> > >>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode > >>>>> > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> > >>>>>>>>>> Dawid > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > > > -- Konstantin Knauf https://twitter.com/snntrable https://github.com/knaufk