Hi Timo,

1.  "`Map<String, DataType> listReadableMetadata()` only allows one
possible DataType for a metadata key."
I think the main purpose of the metadata feature is to access the Kafka
timestamp and use it as a rowtime attribute.
If we force users to use the specific type, then this feature might be
tricky to use,
e.g. rowtime AS CAST(CAST(SYSTEM_METADATA("timestamp") AS BIGINT) AS
TIMESTAMP(3) WITH LOCAL TIME ZONE). It will be super long.

My suggestion would be either we use "TIMESTAMP(3) WITH LOCAL TIME ZONE" as
the defined type of Kafka timestamp,
or allow different types in the CAST iff the defined type can be casted to
the cast type, for example,
CAST(SYSTEM_METADATA("partition") AS BIGINT) can be valid, because the
defined type "INT" can be casted to the cast type "BIGINT".

The former one is more concise, the later one is more flexible. What do you
think?

2. "I don't see a reason why `DecodingFormat#applyReadableMetadata` needs a
DataType argument."
Do you mean the output TypeInformation of the `DeserializationSchema` can
be calculated via producedDataType + metadata columns?
Then maybe we also don't need the outputDataType argument for
`SupportsReadingMetadata#applyReadableMetadata`.
I guess the outputDataType is needed here because
of SupportsComputedColumnPushDown?  Btw, Shengkai started a discussion in
the
mailing list to merge `SupportsComputedColumnPushDown` and
`SupportsWatermarkPushdown` interfaces [1].

3. "list the metadata keys"
LGTM. Maybe we can expand the properties in the "source", e.g. allow
"source.ts_ms" metadata, this is the most commonly used one.


Best,
Jark

[1]:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Merge-SupportsComputedColumnPushDown-and-SupportsWatermarkPushDown-td44387.html



On Mon, 7 Sep 2020 at 23:51, Timo Walther <twal...@apache.org> wrote:

> Hi Jark,
>
> 1. "`Map<String, DataType> listReadableMetadata()` only allows one
> possible DataType for a metadata key."
> I was thinking about this topic a lot today. My conclusion is: yes, we
> should force users to specify the type as documented. Users can further
> cast or compute using expressions to more specific types. I decided for
> BIGINT instead of TIMESTAMP(3) for Kafka timestamps, I think for
> metadata we should directly forward the underlying atomic type of the
> external system. And for a Kafka consumer record this is BIGINT without
> any timezone interpretation. Users can further cast to TIMESTAMP(3) if
> necessary. I wouldn't introduce too much magic here. What do you think?
>
> 2. I don't see a reason why `DecodingFormat#applyReadableMetadata` needs
> a DataType argument. This argument would need to be created by the
> source then. Do you have an example in mind? In any case the format
> could also calculate it later via: producedDataType + metadata columns
>
> 3. "list the metadata keys"
> I went through the list of current connectors and formats. I updated the
> FLIP for the Kafka and Debezium. For the key design, I used the FLIP-122
> naming schema. For HBase, Elasticsearch and others I could not find
> metadata that might be important for users.
>
> 4. "sub-expression"
> Yes, sub-expression like the ones you mentioned would be allowed.
> We will push down only one "headers" metadata.
>
> Regards,
> Timo
>
>
> On 07.09.20 14:41, Jark Wu wrote:
> > Sorry, I forgot to ask one more question.
> >
> > 4. Do we allow to use the SYSTEM_METADATA as a sub-expression? For
> example,
> >
> > checksum AS CAST(CAST(SYSTEM_METADATA("headers") AS MAP<STRING,
> > BYTES>)['checksum'] AS STRING),
> > myvalue AS CAST(CAST(SYSTEM_METADATA("headers") AS MAP<STRING,
> > BYTES>)['mykey'] AS BIGINT)
> >
> > And we will push down only one "headers" metadata, right?
> >
> > Best,
> > Jark
> >
> >
> >
> > On Mon, 7 Sep 2020 at 19:55, Jark Wu <imj...@gmail.com> wrote:
> >
> >> Thanks Timo,
> >>
> >> I think this FLIP is already in great shape!
> >>
> >> I have following questions:
> >>
> >> 1. `Map<String, DataType> listReadableMetadata()` only allows one
> possible
> >> DataType for a metadata key.
> >> However, users may expect to use different types, e.g. for "timestamp"
> >> metadata, users may use it as BIGINT, or TIMESTAMP(6) WITH LOCAL TIME
> ZONE
> >>   or TIMESTAMP(3) WITH LOCAL TIME ZONE.
> >> Do we force users to use the specific types or can use several types in
> >> the CAST?
> >>
> >> 2. Why does the `DecodingFormat#applyReadableMetadata(List<String>
> >> metadataKeys)` don't need the `DataType outputDataType` parameter?
> >>
> >> 3. I think it would be great if we can list the metadata keys (and
> >> readable/writable) we want to expose in the first version. I think they
> are
> >> also important public APIs, like connector options?
> >>
> >> Best,
> >> Jark
> >>
> >> On Mon, 7 Sep 2020 at 18:28, Timo Walther <twal...@apache.org> wrote:
> >>
> >>> Hi Leonard,
> >>>
> >>> thanks for your feedback.
> >>>
> >>> (1) Actually, I discuss this already in the FLIP. But let me summarize
> >>> our options again if it was not clear enough in the FLIP:
> >>>
> >>> a) CREATE TABLE t (a AS CAST(SYSTEM_METADATA("offset") AS INT))
> >>> pro: readable, complex arithmetic possible, more SQL compliant, SQL
> >>> Server compliant
> >>> con: long
> >>>
> >>> b) CREATE TABLE t (a INT AS SYSTEM_METADATA("offset"))
> >>> pro: shorter, not SQL nor SQL Server compliant
> >>> con: requires parser changes, no complex arithmetic like
> >>> `computeSomeThing(SYSTEM_METADATA("offset"))` possible
> >>>
> >>> c) CREATE TABLE t (a AS SYSTEM_METADATA("offset", INT))
> >>> pro: shorter, very readable, complex arithmetic possible
> >>> con: non SQL expression, requires parser changes
> >>>
> >>> So I decided for a) with less disadvantages.
> >>>
> >>> 2) Yes, a format can expose its metadata through the mentioned
> >>> interfaces in the FLIP. I added an example to the FLIP.
> >>>
> >>> 3) The concept of a key or value format is connector specific. And
> since
> >>> the table source/table sinks are responsible for returning the metadata
> >>> columns. We can allow this in the future due to the flexibility of the
> >>> design. But I also don't think that we need this case for now. I think
> >>> we can focus on the value format and ignore metadata from the key.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>>
> >>> On 07.09.20 11:03, Leonard Xu wrote:
> >>>> Ignore  my question(4), I’ve  found the answer in the doc :
> >>> 'value.fields-include' = ‘EXCEPT_KEY' (all fields of the schema minus
> >>> fields of the key)
> >>>>
> >>>>> 在 2020年9月7日,16:33,Leonard Xu <xbjt...@gmail.com> 写道:
> >>>>>
> >>>>> (4) About Reading and writing from key and value section, we bind
> that
> >>> the fields of key part must belong to the fields of value part
> according to
> >>> the options 'key.fields' = 'id, name' and 'value.fields-include' =
> 'ALL',
> >>> Is this by design? I think the key fields and value fields are
> independent
> >>> each other in Kafka.
> >>>>>
> >>>>
> >>>>
> >>>
> >>>
> >
>
>

Reply via email to