Hi Timo,

Thank you very much for the update. It indeed covers the full story in
more details. I agree with the proposal.

On 04/09/2020 10:48, Timo Walther wrote:
> Hi everyone,
>
> I completely reworked FLIP-107. It now covers the full story how to
> read and write metadata from/to connectors and formats. It considers
> all of the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It
> introduces the concept of PERSISTED computed columns and leaves out
> partitioning for now.
>
> Looking forward to your feedback.
>
> Regards,
> Timo
>
>
> On 04.03.20 09:45, Kurt Young wrote:
>> Sorry, forgot one question.
>>
>> 4. Can we make the value.fields-include more orthogonal? Like one can
>> specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP".
>> With current  EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users can not
>> config to
>> just ignore timestamp but keep key.
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <ykt...@gmail.com> wrote:
>>
>>> Hi Dawid,
>>>
>>> I have a couple of questions around key fields, actually I also have
>>> some
>>> other questions but want to be focused on key fields first.
>>>
>>> 1. I don't fully understand the usage of "key.fields". Is this
>>> option only
>>> valid during write operation? Because for
>>> reading, I can't imagine how such options can be applied. I would
>>> expect
>>> that there might be a SYSTEM_METADATA("key")
>>> to read and assign the key to a normal field?
>>>
>>> 2. If "key.fields" is only valid in write operation, I want to
>>> propose we
>>> can simplify the options to not introducing key.format.type and
>>> other related options. I think a single "key.field" (not fields)
>>> would be
>>> enough, users can use UDF to calculate whatever key they
>>> want before sink.
>>>
>>> 3. Also I don't want to introduce "value.format.type" and
>>> "value.format.xxx" with the "value" prefix. Not every connector has a
>>> concept
>>> of key and values. The old parameter "format.type" already good
>>> enough to
>>> use.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <imj...@gmail.com> wrote:
>>>
>>>> Thanks Dawid,
>>>>
>>>> I have two more questions.
>>>>
>>>>> SupportsMetadata
>>>> Introducing SupportsMetadata sounds good to me. But I have some
>>>> questions
>>>> regarding to this interface.
>>>> 1) How do the source know what the expected return type of each
>>>> metadata?
>>>> 2) Where to put the metadata fields? Append to the existing physical
>>>> fields?
>>>> If yes, I would suggest to change the signature to `TableSource
>>>> appendMetadataFields(String[] metadataNames, DataType[]
>>>> metadataTypes)`
>>>>
>>>>> SYSTEM_METADATA("partition")
>>>> Can SYSTEM_METADATA() function be used nested in a computed column
>>>> expression? If yes, how to specify the return type of SYSTEM_METADATA?
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz <dwysakow...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> 1. I thought a bit more on how the source would emit the columns
>>>>> and I
>>>>> now see its not exactly the same as regular columns. I see a need to
>>>>> elaborate a bit more on that in the FLIP as you asked, Jark.
>>>>>
>>>>> I do agree mostly with Danny on how we should do that. One additional
>>>>> things I would introduce is an
>>>>>
>>>>> interface SupportsMetadata {
>>>>>
>>>>>     boolean supportsMetadata(Set<String> metadataFields);
>>>>>
>>>>>     TableSource generateMetadataFields(Set<String> metadataFields);
>>>>>
>>>>> }
>>>>>
>>>>> This way the source would have to declare/emit only the requested
>>>>> metadata fields. In order not to clash with user defined fields. When
>>>>> emitting the metadata field I would prepend the column name with
>>>>> __system_{property_name}. Therefore when requested
>>>>> SYSTEM_METADATA("partition") the source would append a field
>>>>> __system_partition to the schema. This would be never visible to the
>>>>> user as it would be used only for the subsequent computed columns. If
>>>>> that makes sense to you, I will update the FLIP with this
>>>>> description.
>>>>>
>>>>> 2. CAST vs explicit type in computed columns
>>>>>
>>>>> Here I agree with Danny. It is also the current state of the
>>>>> proposal.
>>>>>
>>>>> 3. Partitioning on computed column vs function
>>>>>
>>>>> Here I also agree with Danny. I also think those are orthogonal. I
>>>>> would
>>>>> leave out the STORED computed columns out of the discussion. I
>>>>> don't see
>>>>> how do they relate to the partitioning. I already put both of those
>>>>> cases in the document. We can either partition on a computed
>>>>> column or
>>>>> use a udf in a partioned by clause. I am fine with leaving out the
>>>>> partitioning by udf in the first version if you still have some
>>>> concerns.
>>>>>
>>>>> As for your question Danny. It depends which partitioning strategy
>>>>> you
>>>> use.
>>>>>
>>>>> For the HASH partitioning strategy I thought it would work as you
>>>>> explained. It would be N = MOD(expr, num). I am not sure though if we
>>>>> should introduce the PARTITIONS clause. Usually Flink does not own
>>>>> the
>>>>> data and the partitions are already an intrinsic property of the
>>>>> underlying source e.g. for kafka we do not create topics, but we just
>>>>> describe pre-existing pre-partitioned topic.
>>>>>
>>>>> 4. timestamp vs timestamp.field vs connector.field vs ...
>>>>>
>>>>> I am fine with changing it to timestamp.field to be consistent with
>>>>> other value.fields and key.fields. Actually that was also my initial
>>>>> proposal in a first draft I prepared. I changed it afterwards to
>>>>> shorten
>>>>> the key.
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>>
>>>>> On 03/03/2020 09:00, Danny Chan wrote:
>>>>>> Thanks Dawid for bringing up this discussion, I think it is a useful
>>>>> feature ~
>>>>>>
>>>>>> About how the metadata outputs from source
>>>>>>
>>>>>> I think it is completely orthogonal, computed column push down is
>>>>> another topic, this should not be a blocker but a promotion, if we do
>>>> not
>>>>> have any filters on the computed column, there is no need to do any
>>>>> pushings; the source node just emit the complete record with full
>>>> metadata
>>>>> with the declared physical schema, then when generating the virtual
>>>>> columns, we would extract the metadata info and output as full
>>>> columns(with
>>>>> full schema).
>>>>>>
>>>>>> About the type of metadata column
>>>>>>
>>>>>> Personally i prefer explicit type instead of CAST, they are symantic
>>>>> equivalent though, explict type is more straight-forward and we can
>>>> declare
>>>>> the nullable attribute there.
>>>>>>
>>>>>> About option A: partitioning based on acomputed column VS option B:
>>>>> partitioning with just a function
>>>>>>
>>>>>>  From the FLIP, it seems that B's partitioning is just a strategy
>>>>>> when
>>>>> writing data, the partiton column is not included in the table
>>>>> schema,
>>>> so
>>>>> it's just useless when reading from that.
>>>>>>
>>>>>> - Compared to A, we do not need to generate the partition column
>>>>>> when
>>>>> selecting from the table(but insert into)
>>>>>> - For A we can also mark the column as STORED when we want to
>>>>>> persist
>>>>> that
>>>>>>
>>>>>> So in my opition they are orthogonal, we can support both, i saw
>>>>>> that
>>>>> MySQL/Oracle[1][2] would suggest to also define the PARTITIONS
>>>>> num, and
>>>> the
>>>>> partitions are managed under a "tablenamespace", the partition in
>>>>> which
>>>> the
>>>>> record is stored is partition number N, where N = MOD(expr, num), for
>>>> your
>>>>> design, which partiton the record would persist ?
>>>>>>
>>>>>> [1] https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html
>>>>>> [2]
>>>>>
>>>> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270
>>>>
>>>>>>
>>>>>> Best,
>>>>>> Danny Chan
>>>>>> 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz
>>>>>> <dwysakow...@apache.org
>>>>> ,写道:
>>>>>>> Hi Jark,
>>>>>>> Ad. 2 I added a section to discuss relation to FLIP-63
>>>>>>> Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties.
>>>>> Therefore you have the key.format.type.
>>>>>>> I also considered exactly what you are suggesting (prefixing with
>>>>> connector or kafka). I should've put that into an Option/Rejected
>>>>> alternatives.
>>>>>>> I agree timestamp, key.*, value.* are connector properties. Why I
>>>>> wanted to suggest not adding that prefix in the first version is that
>>>>> actually all the properties in the WITH section are connector
>>>> properties.
>>>>> Even format is in the end a connector property as some of the sources
>>>> might
>>>>> not have a format, imo. The benefit of not adding the prefix is
>>>>> that it
>>>>> makes the keys a bit shorter. Imagine prefixing all the properties
>>>>> with
>>>>> connector (or if we go with FLINK-12557: elasticsearch):
>>>>>>> elasticsearch.key.format.type: csv
>>>>>>> elasticsearch.key.format.field: ....
>>>>>>> elasticsearch.key.format.delimiter: ....
>>>>>>> elasticsearch.key.format.*: ....
>>>>>>> I am fine with doing it though if this is a preferred approach
>>>>>>> in the
>>>>> community.
>>>>>>> Ad in-line comments:
>>>>>>> I forgot to update the `value.fields.include` property. It
>>>>>>> should be
>>>>> value.fields-include. Which I think you also suggested in the
>>>>> comment,
>>>>> right?
>>>>>>> As for the cast vs declaring output type of computed column. I
>>>>>>> think
>>>>> it's better not to use CAST, but declare a type of an expression and
>>>> later
>>>>> on infer the output type of SYSTEM_METADATA. The reason is I think
>>>>> this
>>>> way
>>>>> it will be easier to implement e.g. filter push downs when working
>>>>> with
>>>> the
>>>>> native types of the source, e.g. in case of Kafka's offset, i
>>>>> think it's
>>>>> better to pushdown long rather than string. This could let us push
>>>>> expression like e.g. offset > 12345 & offset < 59382. Otherwise we
>>>>> would
>>>>> have to push down cast(offset, long) > 12345 && cast(offset, long) <
>>>> 59382.
>>>>> Moreover I think we need to introduce the type for computed columns
>>>> anyway
>>>>> to support functions that infer output type based on expected return
>>>> type.
>>>>>>> As for the computed column push down. Yes, SYSTEM_METADATA would
>>>>>>> have
>>>>> to be pushed down to the source. If it is not possible the planner
>>>> should
>>>>> fail. As far as I know computed columns push down will be part of
>>>>> source
>>>>> rework, won't it? ;)
>>>>>>> As for the persisted computed column. I think it is completely
>>>>> orthogonal. In my current proposal you can also partition by a
>>>>> computed
>>>>> column. The difference between using a udf in partitioned by vs
>>>> partitioned
>>>>> by a computed column is that when you partition by a computed column
>>>> this
>>>>> column must be also computed when reading the table. If you use a
>>>>> udf in
>>>>> the partitioned by, the expression is computed only when inserting
>>>>> into
>>>> the
>>>>> table.
>>>>>>> Hope this answers some of your questions. Looking forward for
>>>>>>> further
>>>>> suggestions.
>>>>>>> Best,
>>>>>>> Dawid
>>>>>>>
>>>>>>>
>>>>>>> On 02/03/2020 05:18, Jark Wu wrote:
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Thanks Dawid for starting such a great discussion. Reaing metadata
>>>> and
>>>>>>>> key-part information from source is an important feature for
>>>> streaming
>>>>>>>> users.
>>>>>>>>
>>>>>>>> In general, I agree with the proposal of the FLIP.
>>>>>>>> I will leave my thoughts and comments here:
>>>>>>>>
>>>>>>>> 1) +1 to use connector properties instead of introducing HEADER
>>>>> keyword as
>>>>>>>> the reason you mentioned in the FLIP.
>>>>>>>> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we
>>>>>>>> should
>>>>> add a
>>>>>>>> section to explain what's the relationship between them.
>>>>>>>>     Do their concepts conflict? Could INSERT PARTITION be used
>>>>>>>> on the
>>>>>>>> PARTITIONED table in this FLIP?
>>>>>>>> 3) Currently, properties are hierarchical in Flink SQL. Shall we
>>>> make
>>>>> the
>>>>>>>> new introduced properties more hierarchical?
>>>>>>>>     For example, "timestamp" => "connector.timestamp"?
>>>>>>>> (actually, I
>>>>> prefer
>>>>>>>> "kafka.timestamp" which is another improvement for properties
>>>>> FLINK-12557)
>>>>>>>>     A single "timestamp" in properties may mislead users that the
>>>> field
>>>>> is
>>>>>>>> a rowtime attribute.
>>>>>>>>
>>>>>>>> I also left some minor comments in the FLIP.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Jark
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz <
>>>> dwysakow...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I would like to propose an improvement that would enable reading
>>>> table
>>>>>>>>> columns from different parts of source records. Besides the main
>>>>> payload
>>>>>>>>> majority (if not all of the sources) expose additional
>>>> information. It
>>>>>>>>> can be simply a read-only metadata such as offset, ingestion time
>>>> or a
>>>>>>>>> read and write  parts of the record that contain data but
>>>> additionally
>>>>>>>>> serve different purposes (partitioning, compaction etc.), e.g.
>>>>>>>>> key
>>>> or
>>>>>>>>> timestamp in Kafka.
>>>>>>>>>
>>>>>>>>> We should make it possible to read and write data from all of
>>>>>>>>> those
>>>>>>>>> locations. In this proposal I discuss reading partitioning data,
>>>> for
>>>>>>>>> completeness this proposal discusses also the partitioning when
>>>>> writing
>>>>>>>>> data out.
>>>>>>>>>
>>>>>>>>> I am looking forward to your comments.
>>>>>>>>>
>>>>>>>>> You can access the FLIP here:
>>>>>>>>>
>>>>>>>>>
>>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode
>>>>
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Dawid
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to