Hi Jark, Ad. 2 I added a section to discuss relation to FLIP-63
Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. Therefore you have the key.format.type. I also considered exactly what you are suggesting (prefixing with connector or kafka). I should've put that into an Option/Rejected alternatives. I agree timestamp, key.*, value.* are connector properties. Why I wanted to suggest not adding that prefix in the first version is that actually all the properties in the WITH section are connector properties. Even format is in the end a connector property as some of the sources might not have a format, imo. The benefit of not adding the prefix is that it makes the keys a bit shorter. Imagine prefixing all the properties with connector (or if we go with FLINK-12557: elasticsearch): elasticsearch.key.format.type: csv elasticsearch.key.format.field: .... elasticsearch.key.format.delimiter: .... elasticsearch.key.format.*: .... I am fine with doing it though if this is a preferred approach in the community. Ad in-line comments: I forgot to update the `value.fields.include` property. It should be *value.fields-include. *Which I think you also suggested in the comment, right? As for the cast vs declaring output type of computed column. I think it's better not to use CAST, but declare a type of an expression and later on infer the output type of SYSTEM_METADATA. The reason is I think this way it will be easier to implement e.g. filter push downs when working with the native types of the source, e.g. in case of Kafka's offset, i think it's better to pushdown long rather than string. This could let us push expression like e.g. offset > 12345 & offset < 59382. Otherwise we would have to push down cast(offset, long) > 12345 && cast(offset, long) < 59382. Moreover I think we need to introduce the type for computed columns anyway to support functions that infer output type based on expected return type. As for the computed column push down. Yes, SYSTEM_METADATA would have to be pushed down to the source. If it is not possible the planner should fail. As far as I know computed columns push down will be part of source rework, won't it? ;) As for the persisted computed column. I think it is completely orthogonal. In my current proposal you can also partition by a computed column. The difference between using a udf in partitioned by vs partitioned by a computed column is that when you partition by a computed column this column must be also computed when reading the table. If you use a udf in the partitioned by, the expression is computed only when inserting into the table. Hope this answers some of your questions. Looking forward for further suggestions. Best, Dawid ** On 02/03/2020 05:18, Jark Wu wrote: > Hi, > > Thanks Dawid for starting such a great discussion. Reaing metadata and > key-part information from source is an important feature for streaming > users. > > In general, I agree with the proposal of the FLIP. > I will leave my thoughts and comments here: > > 1) +1 to use connector properties instead of introducing HEADER keyword as > the reason you mentioned in the FLIP. > 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we should add a > section to explain what's the relationship between them. > Do their concepts conflict? Could INSERT PARTITION be used on the > PARTITIONED table in this FLIP? > 3) Currently, properties are hierarchical in Flink SQL. Shall we make the > new introduced properties more hierarchical? > For example, "timestamp" => "connector.timestamp"? (actually, I prefer > "kafka.timestamp" which is another improvement for properties FLINK-12557) > A single "timestamp" in properties may mislead users that the field is > a rowtime attribute. > > I also left some minor comments in the FLIP. > > Thanks, > Jark > > > > On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > >> Hi, >> >> I would like to propose an improvement that would enable reading table >> columns from different parts of source records. Besides the main payload >> majority (if not all of the sources) expose additional information. It >> can be simply a read-only metadata such as offset, ingestion time or a >> read and write parts of the record that contain data but additionally >> serve different purposes (partitioning, compaction etc.), e.g. key or >> timestamp in Kafka. >> >> We should make it possible to read and write data from all of those >> locations. In this proposal I discuss reading partitioning data, for >> completeness this proposal discusses also the partitioning when writing >> data out. >> >> I am looking forward to your comments. >> >> You can access the FLIP here: >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode >> >> Best, >> >> Dawid >> >> >>
signature.asc
Description: OpenPGP digital signature