Sorry, forgot one question. 4. Can we make the value.fields-include more orthogonal? Like one can specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP". With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users can not config to just ignore timestamp but keep key.
Best, Kurt On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <ykt...@gmail.com> wrote: > Hi Dawid, > > I have a couple of questions around key fields, actually I also have some > other questions but want to be focused on key fields first. > > 1. I don't fully understand the usage of "key.fields". Is this option only > valid during write operation? Because for > reading, I can't imagine how such options can be applied. I would expect > that there might be a SYSTEM_METADATA("key") > to read and assign the key to a normal field? > > 2. If "key.fields" is only valid in write operation, I want to propose we > can simplify the options to not introducing key.format.type and > other related options. I think a single "key.field" (not fields) would be > enough, users can use UDF to calculate whatever key they > want before sink. > > 3. Also I don't want to introduce "value.format.type" and > "value.format.xxx" with the "value" prefix. Not every connector has a > concept > of key and values. The old parameter "format.type" already good enough to > use. > > Best, > Kurt > > > On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <imj...@gmail.com> wrote: > >> Thanks Dawid, >> >> I have two more questions. >> >> > SupportsMetadata >> Introducing SupportsMetadata sounds good to me. But I have some questions >> regarding to this interface. >> 1) How do the source know what the expected return type of each metadata? >> 2) Where to put the metadata fields? Append to the existing physical >> fields? >> If yes, I would suggest to change the signature to `TableSource >> appendMetadataFields(String[] metadataNames, DataType[] metadataTypes)` >> >> > SYSTEM_METADATA("partition") >> Can SYSTEM_METADATA() function be used nested in a computed column >> expression? If yes, how to specify the return type of SYSTEM_METADATA? >> >> Best, >> Jark >> >> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz <dwysakow...@apache.org> >> wrote: >> >> > Hi, >> > >> > 1. I thought a bit more on how the source would emit the columns and I >> > now see its not exactly the same as regular columns. I see a need to >> > elaborate a bit more on that in the FLIP as you asked, Jark. >> > >> > I do agree mostly with Danny on how we should do that. One additional >> > things I would introduce is an >> > >> > interface SupportsMetadata { >> > >> > boolean supportsMetadata(Set<String> metadataFields); >> > >> > TableSource generateMetadataFields(Set<String> metadataFields); >> > >> > } >> > >> > This way the source would have to declare/emit only the requested >> > metadata fields. In order not to clash with user defined fields. When >> > emitting the metadata field I would prepend the column name with >> > __system_{property_name}. Therefore when requested >> > SYSTEM_METADATA("partition") the source would append a field >> > __system_partition to the schema. This would be never visible to the >> > user as it would be used only for the subsequent computed columns. If >> > that makes sense to you, I will update the FLIP with this description. >> > >> > 2. CAST vs explicit type in computed columns >> > >> > Here I agree with Danny. It is also the current state of the proposal. >> > >> > 3. Partitioning on computed column vs function >> > >> > Here I also agree with Danny. I also think those are orthogonal. I would >> > leave out the STORED computed columns out of the discussion. I don't see >> > how do they relate to the partitioning. I already put both of those >> > cases in the document. We can either partition on a computed column or >> > use a udf in a partioned by clause. I am fine with leaving out the >> > partitioning by udf in the first version if you still have some >> concerns. >> > >> > As for your question Danny. It depends which partitioning strategy you >> use. >> > >> > For the HASH partitioning strategy I thought it would work as you >> > explained. It would be N = MOD(expr, num). I am not sure though if we >> > should introduce the PARTITIONS clause. Usually Flink does not own the >> > data and the partitions are already an intrinsic property of the >> > underlying source e.g. for kafka we do not create topics, but we just >> > describe pre-existing pre-partitioned topic. >> > >> > 4. timestamp vs timestamp.field vs connector.field vs ... >> > >> > I am fine with changing it to timestamp.field to be consistent with >> > other value.fields and key.fields. Actually that was also my initial >> > proposal in a first draft I prepared. I changed it afterwards to shorten >> > the key. >> > >> > Best, >> > >> > Dawid >> > >> > On 03/03/2020 09:00, Danny Chan wrote: >> > > Thanks Dawid for bringing up this discussion, I think it is a useful >> > feature ~ >> > > >> > > About how the metadata outputs from source >> > > >> > > I think it is completely orthogonal, computed column push down is >> > another topic, this should not be a blocker but a promotion, if we do >> not >> > have any filters on the computed column, there is no need to do any >> > pushings; the source node just emit the complete record with full >> metadata >> > with the declared physical schema, then when generating the virtual >> > columns, we would extract the metadata info and output as full >> columns(with >> > full schema). >> > > >> > > About the type of metadata column >> > > >> > > Personally i prefer explicit type instead of CAST, they are symantic >> > equivalent though, explict type is more straight-forward and we can >> declare >> > the nullable attribute there. >> > > >> > > About option A: partitioning based on acomputed column VS option B: >> > partitioning with just a function >> > > >> > > From the FLIP, it seems that B's partitioning is just a strategy when >> > writing data, the partiton column is not included in the table schema, >> so >> > it's just useless when reading from that. >> > > >> > > - Compared to A, we do not need to generate the partition column when >> > selecting from the table(but insert into) >> > > - For A we can also mark the column as STORED when we want to persist >> > that >> > > >> > > So in my opition they are orthogonal, we can support both, i saw that >> > MySQL/Oracle[1][2] would suggest to also define the PARTITIONS num, and >> the >> > partitions are managed under a "tablenamespace", the partition in which >> the >> > record is stored is partition number N, where N = MOD(expr, num), for >> your >> > design, which partiton the record would persist ? >> > > >> > > [1] https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html >> > > [2] >> > >> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 >> > > >> > > Best, >> > > Danny Chan >> > > 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz <dwysakow...@apache.org >> >,写道: >> > >> Hi Jark, >> > >> Ad. 2 I added a section to discuss relation to FLIP-63 >> > >> Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. >> > Therefore you have the key.format.type. >> > >> I also considered exactly what you are suggesting (prefixing with >> > connector or kafka). I should've put that into an Option/Rejected >> > alternatives. >> > >> I agree timestamp, key.*, value.* are connector properties. Why I >> > wanted to suggest not adding that prefix in the first version is that >> > actually all the properties in the WITH section are connector >> properties. >> > Even format is in the end a connector property as some of the sources >> might >> > not have a format, imo. The benefit of not adding the prefix is that it >> > makes the keys a bit shorter. Imagine prefixing all the properties with >> > connector (or if we go with FLINK-12557: elasticsearch): >> > >> elasticsearch.key.format.type: csv >> > >> elasticsearch.key.format.field: .... >> > >> elasticsearch.key.format.delimiter: .... >> > >> elasticsearch.key.format.*: .... >> > >> I am fine with doing it though if this is a preferred approach in the >> > community. >> > >> Ad in-line comments: >> > >> I forgot to update the `value.fields.include` property. It should be >> > value.fields-include. Which I think you also suggested in the comment, >> > right? >> > >> As for the cast vs declaring output type of computed column. I think >> > it's better not to use CAST, but declare a type of an expression and >> later >> > on infer the output type of SYSTEM_METADATA. The reason is I think this >> way >> > it will be easier to implement e.g. filter push downs when working with >> the >> > native types of the source, e.g. in case of Kafka's offset, i think it's >> > better to pushdown long rather than string. This could let us push >> > expression like e.g. offset > 12345 & offset < 59382. Otherwise we would >> > have to push down cast(offset, long) > 12345 && cast(offset, long) < >> 59382. >> > Moreover I think we need to introduce the type for computed columns >> anyway >> > to support functions that infer output type based on expected return >> type. >> > >> As for the computed column push down. Yes, SYSTEM_METADATA would have >> > to be pushed down to the source. If it is not possible the planner >> should >> > fail. As far as I know computed columns push down will be part of source >> > rework, won't it? ;) >> > >> As for the persisted computed column. I think it is completely >> > orthogonal. In my current proposal you can also partition by a computed >> > column. The difference between using a udf in partitioned by vs >> partitioned >> > by a computed column is that when you partition by a computed column >> this >> > column must be also computed when reading the table. If you use a udf in >> > the partitioned by, the expression is computed only when inserting into >> the >> > table. >> > >> Hope this answers some of your questions. Looking forward for further >> > suggestions. >> > >> Best, >> > >> Dawid >> > >> >> > >> >> > >> On 02/03/2020 05:18, Jark Wu wrote: >> > >>> Hi, >> > >>> >> > >>> Thanks Dawid for starting such a great discussion. Reaing metadata >> and >> > >>> key-part information from source is an important feature for >> streaming >> > >>> users. >> > >>> >> > >>> In general, I agree with the proposal of the FLIP. >> > >>> I will leave my thoughts and comments here: >> > >>> >> > >>> 1) +1 to use connector properties instead of introducing HEADER >> > keyword as >> > >>> the reason you mentioned in the FLIP. >> > >>> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we should >> > add a >> > >>> section to explain what's the relationship between them. >> > >>> Do their concepts conflict? Could INSERT PARTITION be used on the >> > >>> PARTITIONED table in this FLIP? >> > >>> 3) Currently, properties are hierarchical in Flink SQL. Shall we >> make >> > the >> > >>> new introduced properties more hierarchical? >> > >>> For example, "timestamp" => "connector.timestamp"? (actually, I >> > prefer >> > >>> "kafka.timestamp" which is another improvement for properties >> > FLINK-12557) >> > >>> A single "timestamp" in properties may mislead users that the >> field >> > is >> > >>> a rowtime attribute. >> > >>> >> > >>> I also left some minor comments in the FLIP. >> > >>> >> > >>> Thanks, >> > >>> Jark >> > >>> >> > >>> >> > >>> >> > >>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz < >> dwysakow...@apache.org> >> > >>> wrote: >> > >>> >> > >>>> Hi, >> > >>>> >> > >>>> I would like to propose an improvement that would enable reading >> table >> > >>>> columns from different parts of source records. Besides the main >> > payload >> > >>>> majority (if not all of the sources) expose additional >> information. It >> > >>>> can be simply a read-only metadata such as offset, ingestion time >> or a >> > >>>> read and write parts of the record that contain data but >> additionally >> > >>>> serve different purposes (partitioning, compaction etc.), e.g. key >> or >> > >>>> timestamp in Kafka. >> > >>>> >> > >>>> We should make it possible to read and write data from all of those >> > >>>> locations. In this proposal I discuss reading partitioning data, >> for >> > >>>> completeness this proposal discusses also the partitioning when >> > writing >> > >>>> data out. >> > >>>> >> > >>>> I am looking forward to your comments. >> > >>>> >> > >>>> You can access the FLIP here: >> > >>>> >> > >>>> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode >> > >>>> >> > >>>> Best, >> > >>>> >> > >>>> Dawid >> > >>>> >> > >>>> >> > >>>> >> > >> > >> >