Hi Timo,

Thanks a lot for picking up this FLIP. I believe it's a very important one
for almost everyone who uses Flink SQL with Kafka. Also +1 to leave out
partitioning for now.

Best,

Konstantin

On Fri, Sep 4, 2020 at 1:37 PM Aljoscha Krettek <aljos...@apache.org> wrote:

> I like the proposal! I didn't check the implementation section in detail
> but the SQL DDL examples look good as well as the options for specifying
> how fields are mapped to keys/values look good.
>
> Aljoscha
>
> On 04.09.20 11:47, Dawid Wysakowicz wrote:
> > Hi Timo,
> >
> > Thank you very much for the update. It indeed covers the full story in
> > more details. I agree with the proposal.
> >
> > On 04/09/2020 10:48, Timo Walther wrote:
> >> Hi everyone,
> >>
> >> I completely reworked FLIP-107. It now covers the full story how to
> >> read and write metadata from/to connectors and formats. It considers
> >> all of the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It
> >> introduces the concept of PERSISTED computed columns and leaves out
> >> partitioning for now.
> >>
> >> Looking forward to your feedback.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 04.03.20 09:45, Kurt Young wrote:
> >>> Sorry, forgot one question.
> >>>
> >>> 4. Can we make the value.fields-include more orthogonal? Like one can
> >>> specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP".
> >>> With current  EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users can not
> >>> config to
> >>> just ignore timestamp but keep key.
> >>>
> >>> Best,
> >>> Kurt
> >>>
> >>>
> >>> On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <ykt...@gmail.com> wrote:
> >>>
> >>>> Hi Dawid,
> >>>>
> >>>> I have a couple of questions around key fields, actually I also have
> >>>> some
> >>>> other questions but want to be focused on key fields first.
> >>>>
> >>>> 1. I don't fully understand the usage of "key.fields". Is this
> >>>> option only
> >>>> valid during write operation? Because for
> >>>> reading, I can't imagine how such options can be applied. I would
> >>>> expect
> >>>> that there might be a SYSTEM_METADATA("key")
> >>>> to read and assign the key to a normal field?
> >>>>
> >>>> 2. If "key.fields" is only valid in write operation, I want to
> >>>> propose we
> >>>> can simplify the options to not introducing key.format.type and
> >>>> other related options. I think a single "key.field" (not fields)
> >>>> would be
> >>>> enough, users can use UDF to calculate whatever key they
> >>>> want before sink.
> >>>>
> >>>> 3. Also I don't want to introduce "value.format.type" and
> >>>> "value.format.xxx" with the "value" prefix. Not every connector has a
> >>>> concept
> >>>> of key and values. The old parameter "format.type" already good
> >>>> enough to
> >>>> use.
> >>>>
> >>>> Best,
> >>>> Kurt
> >>>>
> >>>>
> >>>> On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <imj...@gmail.com> wrote:
> >>>>
> >>>>> Thanks Dawid,
> >>>>>
> >>>>> I have two more questions.
> >>>>>
> >>>>>> SupportsMetadata
> >>>>> Introducing SupportsMetadata sounds good to me. But I have some
> >>>>> questions
> >>>>> regarding to this interface.
> >>>>> 1) How do the source know what the expected return type of each
> >>>>> metadata?
> >>>>> 2) Where to put the metadata fields? Append to the existing physical
> >>>>> fields?
> >>>>> If yes, I would suggest to change the signature to `TableSource
> >>>>> appendMetadataFields(String[] metadataNames, DataType[]
> >>>>> metadataTypes)`
> >>>>>
> >>>>>> SYSTEM_METADATA("partition")
> >>>>> Can SYSTEM_METADATA() function be used nested in a computed column
> >>>>> expression? If yes, how to specify the return type of
> SYSTEM_METADATA?
> >>>>>
> >>>>> Best,
> >>>>> Jark
> >>>>>
> >>>>> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz <
> dwysakow...@apache.org>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> 1. I thought a bit more on how the source would emit the columns
> >>>>>> and I
> >>>>>> now see its not exactly the same as regular columns. I see a need to
> >>>>>> elaborate a bit more on that in the FLIP as you asked, Jark.
> >>>>>>
> >>>>>> I do agree mostly with Danny on how we should do that. One
> additional
> >>>>>> things I would introduce is an
> >>>>>>
> >>>>>> interface SupportsMetadata {
> >>>>>>
> >>>>>>      boolean supportsMetadata(Set<String> metadataFields);
> >>>>>>
> >>>>>>      TableSource generateMetadataFields(Set<String> metadataFields);
> >>>>>>
> >>>>>> }
> >>>>>>
> >>>>>> This way the source would have to declare/emit only the requested
> >>>>>> metadata fields. In order not to clash with user defined fields.
> When
> >>>>>> emitting the metadata field I would prepend the column name with
> >>>>>> __system_{property_name}. Therefore when requested
> >>>>>> SYSTEM_METADATA("partition") the source would append a field
> >>>>>> __system_partition to the schema. This would be never visible to the
> >>>>>> user as it would be used only for the subsequent computed columns.
> If
> >>>>>> that makes sense to you, I will update the FLIP with this
> >>>>>> description.
> >>>>>>
> >>>>>> 2. CAST vs explicit type in computed columns
> >>>>>>
> >>>>>> Here I agree with Danny. It is also the current state of the
> >>>>>> proposal.
> >>>>>>
> >>>>>> 3. Partitioning on computed column vs function
> >>>>>>
> >>>>>> Here I also agree with Danny. I also think those are orthogonal. I
> >>>>>> would
> >>>>>> leave out the STORED computed columns out of the discussion. I
> >>>>>> don't see
> >>>>>> how do they relate to the partitioning. I already put both of those
> >>>>>> cases in the document. We can either partition on a computed
> >>>>>> column or
> >>>>>> use a udf in a partioned by clause. I am fine with leaving out the
> >>>>>> partitioning by udf in the first version if you still have some
> >>>>> concerns.
> >>>>>>
> >>>>>> As for your question Danny. It depends which partitioning strategy
> >>>>>> you
> >>>>> use.
> >>>>>>
> >>>>>> For the HASH partitioning strategy I thought it would work as you
> >>>>>> explained. It would be N = MOD(expr, num). I am not sure though if
> we
> >>>>>> should introduce the PARTITIONS clause. Usually Flink does not own
> >>>>>> the
> >>>>>> data and the partitions are already an intrinsic property of the
> >>>>>> underlying source e.g. for kafka we do not create topics, but we
> just
> >>>>>> describe pre-existing pre-partitioned topic.
> >>>>>>
> >>>>>> 4. timestamp vs timestamp.field vs connector.field vs ...
> >>>>>>
> >>>>>> I am fine with changing it to timestamp.field to be consistent with
> >>>>>> other value.fields and key.fields. Actually that was also my initial
> >>>>>> proposal in a first draft I prepared. I changed it afterwards to
> >>>>>> shorten
> >>>>>> the key.
> >>>>>>
> >>>>>> Best,
> >>>>>>
> >>>>>> Dawid
> >>>>>>
> >>>>>> On 03/03/2020 09:00, Danny Chan wrote:
> >>>>>>> Thanks Dawid for bringing up this discussion, I think it is a
> useful
> >>>>>> feature ~
> >>>>>>>
> >>>>>>> About how the metadata outputs from source
> >>>>>>>
> >>>>>>> I think it is completely orthogonal, computed column push down is
> >>>>>> another topic, this should not be a blocker but a promotion, if we
> do
> >>>>> not
> >>>>>> have any filters on the computed column, there is no need to do any
> >>>>>> pushings; the source node just emit the complete record with full
> >>>>> metadata
> >>>>>> with the declared physical schema, then when generating the virtual
> >>>>>> columns, we would extract the metadata info and output as full
> >>>>> columns(with
> >>>>>> full schema).
> >>>>>>>
> >>>>>>> About the type of metadata column
> >>>>>>>
> >>>>>>> Personally i prefer explicit type instead of CAST, they are
> symantic
> >>>>>> equivalent though, explict type is more straight-forward and we can
> >>>>> declare
> >>>>>> the nullable attribute there.
> >>>>>>>
> >>>>>>> About option A: partitioning based on acomputed column VS option B:
> >>>>>> partitioning with just a function
> >>>>>>>
> >>>>>>>   From the FLIP, it seems that B's partitioning is just a strategy
> >>>>>>> when
> >>>>>> writing data, the partiton column is not included in the table
> >>>>>> schema,
> >>>>> so
> >>>>>> it's just useless when reading from that.
> >>>>>>>
> >>>>>>> - Compared to A, we do not need to generate the partition column
> >>>>>>> when
> >>>>>> selecting from the table(but insert into)
> >>>>>>> - For A we can also mark the column as STORED when we want to
> >>>>>>> persist
> >>>>>> that
> >>>>>>>
> >>>>>>> So in my opition they are orthogonal, we can support both, i saw
> >>>>>>> that
> >>>>>> MySQL/Oracle[1][2] would suggest to also define the PARTITIONS
> >>>>>> num, and
> >>>>> the
> >>>>>> partitions are managed under a "tablenamespace", the partition in
> >>>>>> which
> >>>>> the
> >>>>>> record is stored is partition number N, where N = MOD(expr, num),
> for
> >>>>> your
> >>>>>> design, which partiton the record would persist ?
> >>>>>>>
> >>>>>>> [1] https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html
> >>>>>>> [2]
> >>>>>>
> >>>>>
> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270
> >>>>>
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Danny Chan
> >>>>>>> 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz
> >>>>>>> <dwysakow...@apache.org
> >>>>>> ,写道:
> >>>>>>>> Hi Jark,
> >>>>>>>> Ad. 2 I added a section to discuss relation to FLIP-63
> >>>>>>>> Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties.
> >>>>>> Therefore you have the key.format.type.
> >>>>>>>> I also considered exactly what you are suggesting (prefixing with
> >>>>>> connector or kafka). I should've put that into an Option/Rejected
> >>>>>> alternatives.
> >>>>>>>> I agree timestamp, key.*, value.* are connector properties. Why I
> >>>>>> wanted to suggest not adding that prefix in the first version is
> that
> >>>>>> actually all the properties in the WITH section are connector
> >>>>> properties.
> >>>>>> Even format is in the end a connector property as some of the
> sources
> >>>>> might
> >>>>>> not have a format, imo. The benefit of not adding the prefix is
> >>>>>> that it
> >>>>>> makes the keys a bit shorter. Imagine prefixing all the properties
> >>>>>> with
> >>>>>> connector (or if we go with FLINK-12557: elasticsearch):
> >>>>>>>> elasticsearch.key.format.type: csv
> >>>>>>>> elasticsearch.key.format.field: ....
> >>>>>>>> elasticsearch.key.format.delimiter: ....
> >>>>>>>> elasticsearch.key.format.*: ....
> >>>>>>>> I am fine with doing it though if this is a preferred approach
> >>>>>>>> in the
> >>>>>> community.
> >>>>>>>> Ad in-line comments:
> >>>>>>>> I forgot to update the `value.fields.include` property. It
> >>>>>>>> should be
> >>>>>> value.fields-include. Which I think you also suggested in the
> >>>>>> comment,
> >>>>>> right?
> >>>>>>>> As for the cast vs declaring output type of computed column. I
> >>>>>>>> think
> >>>>>> it's better not to use CAST, but declare a type of an expression and
> >>>>> later
> >>>>>> on infer the output type of SYSTEM_METADATA. The reason is I think
> >>>>>> this
> >>>>> way
> >>>>>> it will be easier to implement e.g. filter push downs when working
> >>>>>> with
> >>>>> the
> >>>>>> native types of the source, e.g. in case of Kafka's offset, i
> >>>>>> think it's
> >>>>>> better to pushdown long rather than string. This could let us push
> >>>>>> expression like e.g. offset > 12345 & offset < 59382. Otherwise we
> >>>>>> would
> >>>>>> have to push down cast(offset, long) > 12345 && cast(offset, long) <
> >>>>> 59382.
> >>>>>> Moreover I think we need to introduce the type for computed columns
> >>>>> anyway
> >>>>>> to support functions that infer output type based on expected return
> >>>>> type.
> >>>>>>>> As for the computed column push down. Yes, SYSTEM_METADATA would
> >>>>>>>> have
> >>>>>> to be pushed down to the source. If it is not possible the planner
> >>>>> should
> >>>>>> fail. As far as I know computed columns push down will be part of
> >>>>>> source
> >>>>>> rework, won't it? ;)
> >>>>>>>> As for the persisted computed column. I think it is completely
> >>>>>> orthogonal. In my current proposal you can also partition by a
> >>>>>> computed
> >>>>>> column. The difference between using a udf in partitioned by vs
> >>>>> partitioned
> >>>>>> by a computed column is that when you partition by a computed column
> >>>>> this
> >>>>>> column must be also computed when reading the table. If you use a
> >>>>>> udf in
> >>>>>> the partitioned by, the expression is computed only when inserting
> >>>>>> into
> >>>>> the
> >>>>>> table.
> >>>>>>>> Hope this answers some of your questions. Looking forward for
> >>>>>>>> further
> >>>>>> suggestions.
> >>>>>>>> Best,
> >>>>>>>> Dawid
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 02/03/2020 05:18, Jark Wu wrote:
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> Thanks Dawid for starting such a great discussion. Reaing
> metadata
> >>>>> and
> >>>>>>>>> key-part information from source is an important feature for
> >>>>> streaming
> >>>>>>>>> users.
> >>>>>>>>>
> >>>>>>>>> In general, I agree with the proposal of the FLIP.
> >>>>>>>>> I will leave my thoughts and comments here:
> >>>>>>>>>
> >>>>>>>>> 1) +1 to use connector properties instead of introducing HEADER
> >>>>>> keyword as
> >>>>>>>>> the reason you mentioned in the FLIP.
> >>>>>>>>> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we
> >>>>>>>>> should
> >>>>>> add a
> >>>>>>>>> section to explain what's the relationship between them.
> >>>>>>>>>      Do their concepts conflict? Could INSERT PARTITION be used
> >>>>>>>>> on the
> >>>>>>>>> PARTITIONED table in this FLIP?
> >>>>>>>>> 3) Currently, properties are hierarchical in Flink SQL. Shall we
> >>>>> make
> >>>>>> the
> >>>>>>>>> new introduced properties more hierarchical?
> >>>>>>>>>      For example, "timestamp" => "connector.timestamp"?
> >>>>>>>>> (actually, I
> >>>>>> prefer
> >>>>>>>>> "kafka.timestamp" which is another improvement for properties
> >>>>>> FLINK-12557)
> >>>>>>>>>      A single "timestamp" in properties may mislead users that
> the
> >>>>> field
> >>>>>> is
> >>>>>>>>> a rowtime attribute.
> >>>>>>>>>
> >>>>>>>>> I also left some minor comments in the FLIP.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Jark
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz <
> >>>>> dwysakow...@apache.org>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi,
> >>>>>>>>>>
> >>>>>>>>>> I would like to propose an improvement that would enable reading
> >>>>> table
> >>>>>>>>>> columns from different parts of source records. Besides the main
> >>>>>> payload
> >>>>>>>>>> majority (if not all of the sources) expose additional
> >>>>> information. It
> >>>>>>>>>> can be simply a read-only metadata such as offset, ingestion
> time
> >>>>> or a
> >>>>>>>>>> read and write  parts of the record that contain data but
> >>>>> additionally
> >>>>>>>>>> serve different purposes (partitioning, compaction etc.), e.g.
> >>>>>>>>>> key
> >>>>> or
> >>>>>>>>>> timestamp in Kafka.
> >>>>>>>>>>
> >>>>>>>>>> We should make it possible to read and write data from all of
> >>>>>>>>>> those
> >>>>>>>>>> locations. In this proposal I discuss reading partitioning data,
> >>>>> for
> >>>>>>>>>> completeness this proposal discusses also the partitioning when
> >>>>>> writing
> >>>>>>>>>> data out.
> >>>>>>>>>>
> >>>>>>>>>> I am looking forward to your comments.
> >>>>>>>>>>
> >>>>>>>>>> You can access the FLIP here:
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>
> >>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode
> >>>>>
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>>
> >>>>>>>>>> Dawid
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>
>

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk

Reply via email to