Sorry, forgot one question.

4. Can we make the value.fields-include more orthogonal? Like one can
specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP".
With current  EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users can not config to
just ignore timestamp but keep key.

Best,
Kurt


On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <ykt...@gmail.com> wrote:

> Hi Dawid,
>
> I have a couple of questions around key fields, actually I also have some
> other questions but want to be focused on key fields first.
>
> 1. I don't fully understand the usage of "key.fields". Is this option only
> valid during write operation? Because for
> reading, I can't imagine how such options can be applied. I would expect
> that there might be a SYSTEM_METADATA("key")
> to read and assign the key to a normal field?
>
> 2. If "key.fields" is only valid in write operation, I want to propose we
> can simplify the options to not introducing key.format.type and
> other related options. I think a single "key.field" (not fields) would be
> enough, users can use UDF to calculate whatever key they
> want before sink.
>
> 3. Also I don't want to introduce "value.format.type" and
> "value.format.xxx" with the "value" prefix. Not every connector has a
> concept
> of key and values. The old parameter "format.type" already good enough to
> use.
>
> Best,
> Kurt
>
>
> On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <imj...@gmail.com> wrote:
>
>> Thanks Dawid,
>>
>> I have two more questions.
>>
>> > SupportsMetadata
>> Introducing SupportsMetadata sounds good to me. But I have some questions
>> regarding to this interface.
>> 1) How do the source know what the expected return type of each metadata?
>> 2) Where to put the metadata fields? Append to the existing physical
>> fields?
>> If yes, I would suggest to change the signature to `TableSource
>> appendMetadataFields(String[] metadataNames, DataType[] metadataTypes)`
>>
>> > SYSTEM_METADATA("partition")
>> Can SYSTEM_METADATA() function be used nested in a computed column
>> expression? If yes, how to specify the return type of SYSTEM_METADATA?
>>
>> Best,
>> Jark
>>
>> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz <dwysakow...@apache.org>
>> wrote:
>>
>> > Hi,
>> >
>> > 1. I thought a bit more on how the source would emit the columns and I
>> > now see its not exactly the same as regular columns. I see a need to
>> > elaborate a bit more on that in the FLIP as you asked, Jark.
>> >
>> > I do agree mostly with Danny on how we should do that. One additional
>> > things I would introduce is an
>> >
>> > interface SupportsMetadata {
>> >
>> >    boolean supportsMetadata(Set<String> metadataFields);
>> >
>> >    TableSource generateMetadataFields(Set<String> metadataFields);
>> >
>> > }
>> >
>> > This way the source would have to declare/emit only the requested
>> > metadata fields. In order not to clash with user defined fields. When
>> > emitting the metadata field I would prepend the column name with
>> > __system_{property_name}. Therefore when requested
>> > SYSTEM_METADATA("partition") the source would append a field
>> > __system_partition to the schema. This would be never visible to the
>> > user as it would be used only for the subsequent computed columns. If
>> > that makes sense to you, I will update the FLIP with this description.
>> >
>> > 2. CAST vs explicit type in computed columns
>> >
>> > Here I agree with Danny. It is also the current state of the proposal.
>> >
>> > 3. Partitioning on computed column vs function
>> >
>> > Here I also agree with Danny. I also think those are orthogonal. I would
>> > leave out the STORED computed columns out of the discussion. I don't see
>> > how do they relate to the partitioning. I already put both of those
>> > cases in the document. We can either partition on a computed column or
>> > use a udf in a partioned by clause. I am fine with leaving out the
>> > partitioning by udf in the first version if you still have some
>> concerns.
>> >
>> > As for your question Danny. It depends which partitioning strategy you
>> use.
>> >
>> > For the HASH partitioning strategy I thought it would work as you
>> > explained. It would be N = MOD(expr, num). I am not sure though if we
>> > should introduce the PARTITIONS clause. Usually Flink does not own the
>> > data and the partitions are already an intrinsic property of the
>> > underlying source e.g. for kafka we do not create topics, but we just
>> > describe pre-existing pre-partitioned topic.
>> >
>> > 4. timestamp vs timestamp.field vs connector.field vs ...
>> >
>> > I am fine with changing it to timestamp.field to be consistent with
>> > other value.fields and key.fields. Actually that was also my initial
>> > proposal in a first draft I prepared. I changed it afterwards to shorten
>> > the key.
>> >
>> > Best,
>> >
>> > Dawid
>> >
>> > On 03/03/2020 09:00, Danny Chan wrote:
>> > > Thanks Dawid for bringing up this discussion, I think it is a useful
>> > feature ~
>> > >
>> > > About how the metadata outputs from source
>> > >
>> > > I think it is completely orthogonal, computed column push down is
>> > another topic, this should not be a blocker but a promotion, if we do
>> not
>> > have any filters on the computed column, there is no need to do any
>> > pushings; the source node just emit the complete record with full
>> metadata
>> > with the declared physical schema, then when generating the virtual
>> > columns, we would extract the metadata info and output as full
>> columns(with
>> > full schema).
>> > >
>> > > About the type of metadata column
>> > >
>> > > Personally i prefer explicit type instead of CAST, they are symantic
>> > equivalent though, explict type is more straight-forward and we can
>> declare
>> > the nullable attribute there.
>> > >
>> > > About option A: partitioning based on acomputed column VS option B:
>> > partitioning with just a function
>> > >
>> > > From the FLIP, it seems that B's partitioning is just a strategy when
>> > writing data, the partiton column is not included in the table schema,
>> so
>> > it's just useless when reading from that.
>> > >
>> > > - Compared to A, we do not need to generate the partition column when
>> > selecting from the table(but insert into)
>> > > - For A we can also mark the column as STORED when we want to persist
>> > that
>> > >
>> > > So in my opition they are orthogonal, we can support both, i saw that
>> > MySQL/Oracle[1][2] would suggest to also define the PARTITIONS num, and
>> the
>> > partitions are managed under a "tablenamespace", the partition in which
>> the
>> > record is stored is partition number N, where N = MOD(expr, num), for
>> your
>> > design, which partiton the record would persist ?
>> > >
>> > > [1] https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html
>> > > [2]
>> >
>> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270
>> > >
>> > > Best,
>> > > Danny Chan
>> > > 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz <dwysakow...@apache.org
>> >,写道:
>> > >> Hi Jark,
>> > >> Ad. 2 I added a section to discuss relation to FLIP-63
>> > >> Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties.
>> > Therefore you have the key.format.type.
>> > >> I also considered exactly what you are suggesting (prefixing with
>> > connector or kafka). I should've put that into an Option/Rejected
>> > alternatives.
>> > >> I agree timestamp, key.*, value.* are connector properties. Why I
>> > wanted to suggest not adding that prefix in the first version is that
>> > actually all the properties in the WITH section are connector
>> properties.
>> > Even format is in the end a connector property as some of the sources
>> might
>> > not have a format, imo. The benefit of not adding the prefix is that it
>> > makes the keys a bit shorter. Imagine prefixing all the properties with
>> > connector (or if we go with FLINK-12557: elasticsearch):
>> > >> elasticsearch.key.format.type: csv
>> > >> elasticsearch.key.format.field: ....
>> > >> elasticsearch.key.format.delimiter: ....
>> > >> elasticsearch.key.format.*: ....
>> > >> I am fine with doing it though if this is a preferred approach in the
>> > community.
>> > >> Ad in-line comments:
>> > >> I forgot to update the `value.fields.include` property. It should be
>> > value.fields-include. Which I think you also suggested in the comment,
>> > right?
>> > >> As for the cast vs declaring output type of computed column. I think
>> > it's better not to use CAST, but declare a type of an expression and
>> later
>> > on infer the output type of SYSTEM_METADATA. The reason is I think this
>> way
>> > it will be easier to implement e.g. filter push downs when working with
>> the
>> > native types of the source, e.g. in case of Kafka's offset, i think it's
>> > better to pushdown long rather than string. This could let us push
>> > expression like e.g. offset > 12345 & offset < 59382. Otherwise we would
>> > have to push down cast(offset, long) > 12345 && cast(offset, long) <
>> 59382.
>> > Moreover I think we need to introduce the type for computed columns
>> anyway
>> > to support functions that infer output type based on expected return
>> type.
>> > >> As for the computed column push down. Yes, SYSTEM_METADATA would have
>> > to be pushed down to the source. If it is not possible the planner
>> should
>> > fail. As far as I know computed columns push down will be part of source
>> > rework, won't it? ;)
>> > >> As for the persisted computed column. I think it is completely
>> > orthogonal. In my current proposal you can also partition by a computed
>> > column. The difference between using a udf in partitioned by vs
>> partitioned
>> > by a computed column is that when you partition by a computed column
>> this
>> > column must be also computed when reading the table. If you use a udf in
>> > the partitioned by, the expression is computed only when inserting into
>> the
>> > table.
>> > >> Hope this answers some of your questions. Looking forward for further
>> > suggestions.
>> > >> Best,
>> > >> Dawid
>> > >>
>> > >>
>> > >> On 02/03/2020 05:18, Jark Wu wrote:
>> > >>> Hi,
>> > >>>
>> > >>> Thanks Dawid for starting such a great discussion. Reaing metadata
>> and
>> > >>> key-part information from source is an important feature for
>> streaming
>> > >>> users.
>> > >>>
>> > >>> In general, I agree with the proposal of the FLIP.
>> > >>> I will leave my thoughts and comments here:
>> > >>>
>> > >>> 1) +1 to use connector properties instead of introducing HEADER
>> > keyword as
>> > >>> the reason you mentioned in the FLIP.
>> > >>> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we should
>> > add a
>> > >>> section to explain what's the relationship between them.
>> > >>>    Do their concepts conflict? Could INSERT PARTITION be used on the
>> > >>> PARTITIONED table in this FLIP?
>> > >>> 3) Currently, properties are hierarchical in Flink SQL. Shall we
>> make
>> > the
>> > >>> new introduced properties more hierarchical?
>> > >>>    For example, "timestamp" => "connector.timestamp"? (actually, I
>> > prefer
>> > >>> "kafka.timestamp" which is another improvement for properties
>> > FLINK-12557)
>> > >>>    A single "timestamp" in properties may mislead users that the
>> field
>> > is
>> > >>> a rowtime attribute.
>> > >>>
>> > >>> I also left some minor comments in the FLIP.
>> > >>>
>> > >>> Thanks,
>> > >>> Jark
>> > >>>
>> > >>>
>> > >>>
>> > >>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz <
>> dwysakow...@apache.org>
>> > >>> wrote:
>> > >>>
>> > >>>> Hi,
>> > >>>>
>> > >>>> I would like to propose an improvement that would enable reading
>> table
>> > >>>> columns from different parts of source records. Besides the main
>> > payload
>> > >>>> majority (if not all of the sources) expose additional
>> information. It
>> > >>>> can be simply a read-only metadata such as offset, ingestion time
>> or a
>> > >>>> read and write  parts of the record that contain data but
>> additionally
>> > >>>> serve different purposes (partitioning, compaction etc.), e.g. key
>> or
>> > >>>> timestamp in Kafka.
>> > >>>>
>> > >>>> We should make it possible to read and write data from all of those
>> > >>>> locations. In this proposal I discuss reading partitioning data,
>> for
>> > >>>> completeness this proposal discusses also the partitioning when
>> > writing
>> > >>>> data out.
>> > >>>>
>> > >>>> I am looking forward to your comments.
>> > >>>>
>> > >>>> You can access the FLIP here:
>> > >>>>
>> > >>>>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode
>> > >>>>
>> > >>>> Best,
>> > >>>>
>> > >>>> Dawid
>> > >>>>
>> > >>>>
>> > >>>>
>> >
>> >
>>
>

Reply via email to