Hi Leonard,

the only alternative I see is that we introduce a concept that is completely different to computed columns. This is also mentioned in the rejected alternative section of the FLIP. Something like:

CREATE TABLE kafka_table (
    id BIGINT,
    name STRING,
    timestamp INT SYSTEM_METADATA("timestamp") PERSISTED,
    headers MAP<STRING, BYTES> SYSTEM_METADATA("headers") PERSISTED
) WITH (
   ...
)

This way we would avoid confusion at all and can easily map columns to metadata columns. The disadvantage is that users cannot call UDFs or parse timestamps. This would need to be done in a real computed column.

I'm happy about better alternatives.

Regards,
Timo


On 08.09.20 15:37, Leonard Xu wrote:
HI, Timo

Thanks for driving this FLIP.

Sorry but I have a concern about Writing metadata via DynamicTableSink section:

CREATE TABLE kafka_table (
   id BIGINT,
   name STRING,
   timestamp AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT) PERSISTED,
   headers AS CAST(SYSTEM_METADATA("headers") AS MAP<STRING, BYTES>) PERSISTED
) WITH (
   ...
)
An insert statement could look like:

INSERT INTO kafka_table VALUES (
   (1, "ABC", 1599133672, MAP('checksum', computeChecksum(...)))
)

The proposed INERT syntax does not make sense to me, because it contains 
computed(generated) column.
Both SQL server and Postgresql do not allow to insert value to computed columns 
even they are persisted, this boke the generated column semantics and may 
confuse user much.

For SQL server computed column[1]:
column_name AS computed_column_expression [ PERSISTED [ NOT NULL ] ]...
NOTE: A computed column cannot be the target of an INSERT or UPDATE statement.

For Postgresql generated column[2]:
  height_in numeric GENERATED ALWAYS AS (height_cm / 2.54) STORED
NOTE: A generated column cannot be written to directly. In INSERT or UPDATE 
commands, a value cannot be specified for a generated column, but the keyword 
DEFAULT may be specified.

It shouldn't be allowed to set/update value for generated column after lookup 
the SQL 2016:
<insert statement> ::=
INSERT INTO <insertion target> <insert columns and source>

If <contextually typed table value constructor> CTTVC is specified, then every 
<contextually typed row
value constructor element> simply contained in CTTVC whose positionally corresponding 
<column name>
in <insert column list> references a column of which some underlying column is 
a generated column shall
be a <default specification>.
A <default specification> specifies the default value of some associated item.


[1] 
https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
 
<https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15>
[2] https://www.postgresql.org/docs/12/ddl-generated-columns.html 
<https://www.postgresql.org/docs/12/ddl-generated-columns.html>

在 2020年9月8日,17:31,Timo Walther <twal...@apache.org> 写道:

Hi Jark,

according to Flink's and Calcite's casting definition in [1][2] TIMESTAMP WITH 
LOCAL TIME ZONE should be castable from BIGINT. If not, we will make it 
possible ;-)

I'm aware of DeserializationSchema.getProducedType but I think that this method 
is actually misplaced. The type should rather be passed to the source itself.

For our Kafka SQL source, we will also not use this method because the Kafka 
source will add own metadata in addition to the DeserializationSchema. So 
DeserializationSchema.getProducedType will never be read.

For now I suggest to leave out the `DataType` from 
DecodingFormat.applyReadableMetadata. Also because the format's physical type 
is passed later in `createRuntimeDecoder`. If necessary, it can be computed 
manually by consumedType + metadata types. We will provide a metadata utility 
class for that.

Regards,
Timo


[1] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200
[2] 
https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254


On 08.09.20 10:52, Jark Wu wrote:
Hi Timo,
The updated CAST SYSTEM_METADATA behavior sounds good to me. I just noticed
that a BIGINT can't be converted to "TIMESTAMP(3) WITH LOCAL TIME ZONE".
So maybe we need to support this, or use "TIMESTAMP(3) WITH LOCAL TIME
ZONE" as the defined type of Kafka timestamp? I think this makes sense,
because it represents the milli-seconds since epoch.
Regarding "DeserializationSchema doesn't need TypeInfo", I don't think so.
The DeserializationSchema implements ResultTypeQueryable, thus the
implementation needs to return an output TypeInfo.
Besides, FlinkKafkaConsumer also
calls DeserializationSchema.getProducedType as the produced type of the
source function [1].
Best,
Jark
[1]:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066
On Tue, 8 Sep 2020 at 16:35, Timo Walther <twal...@apache.org> wrote:
Hi everyone,

I updated the FLIP again and hope that I could address the mentioned
concerns.

@Leonard: Thanks for the explanation. I wasn't aware that ts_ms and
source.ts_ms have different semantics. I updated the FLIP and expose the
most commonly used properties separately. So frequently used properties
are not hidden in the MAP anymore:

debezium-json.ingestion-timestamp
debezium-json.source.timestamp
debezium-json.source.database
debezium-json.source.schema
debezium-json.source.table

However, since other properties depend on the used connector/vendor, the
remaining options are stored in:

debezium-json.source.properties

And accessed with:

CAST(SYSTEM_METADATA('debezium-json.source.properties') AS MAP<STRING,
STRING>)['table']

Otherwise it is not possible to figure out the value and column type
during validation.

@Jark: You convinced me in relaxing the CAST constraints. I added a
dedicacated sub-section to the FLIP:

For making the use of SYSTEM_METADATA easier and avoid nested casting we
allow explicit casting to a target data type:

rowtime AS CAST(SYSTEM_METADATA("timestamp") AS TIMESTAMP(3) WITH LOCAL
TIME ZONE)

A connector still produces and consumes the data type returned by
`listMetadata()`. The planner will insert necessary explicit casts.

In any case, the user must provide a CAST such that the computed column
receives a valid data type when constructing the table schema.

"I don't see a reason why `DecodingFormat#applyReadableMetadata` needs a
DataType argument."

Correct he DeserializationSchema doesn't need TypeInfo, it is always
executed locally. It is the source that needs TypeInfo for serializing
the record to the next operator. And that's this is what we provide.

@Danny:

“SYSTEM_METADATA("offset")` returns the NULL type by default”

We can also use some other means to represent an UNKNOWN data type. In
the Flink type system, we use the NullType for it. The important part is
that the final data type is known for the entire computed column. As I
mentioned before, I would avoid the suggested option b) that would be
similar to your suggestion. The CAST should be enough and allows for
complex expressions in the computed column. Option b) would need parser
changes.

Regards,
Timo



On 08.09.20 06:21, Leonard Xu wrote:
Hi, Timo

Thanks for you explanation and update,  I have only one question  for
the latest FLIP.

About the MAP<STRING, STRING> DataType of key 'debezium-json.source', if
user want to use the table name metadata, they need to write:
tableName STRING AS CAST(SYSTEM_METADATA('debeuim-json.source') AS
MAP<STRING, STRING>)['table']

the expression is a little complex for user, Could we only support
necessary metas with simple DataType as following?
tableName STRING AS CAST(SYSTEM_METADATA('debeuim-json.source.table') AS
STRING),
transactionTime LONG AS
CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS BIGINT),

In this way, we can simplify the expression, the mainly used metadata in
changelog format may include 'database','table','source.ts_ms','ts_ms' from
my side,
maybe we could only support them at first version.

Both Debezium and Canal have above four metadata, and I‘m willing to
take some subtasks in next development if necessary.

Debezium:
{
    "before": null,
    "after": {  "id": 101,"name": "scooter"},
    "source": {
      "db": "inventory",                  # 1. database name the
changelog belongs to.
      "table": "products",                # 2. table name the changelog
belongs to.
      "ts_ms": 1589355504100,             # 3. timestamp of the change
happened in database system, i.e.: transaction time in database.
      "connector": "mysql",
      ….
    },
    "ts_ms": 1589355606100,              # 4. timestamp when the debezium
processed the changelog.
    "op": "c",
    "transaction": null
}

Canal:
{
    "data": [{  "id": "102", "name": "car battery" }],
    "database": "inventory",      # 1. database name the changelog
belongs to.
    "table": "products",          # 2. table name the changelog belongs
to.
    "es": 1589374013000,          # 3. execution time of the change in
database system, i.e.: transaction time in database.
    "ts": 1589374013680,          # 4. timestamp when the cannal
processed the changelog.
    "isDdl": false,
    "mysqlType": {},
    ....
}


Best
Leonard

在 2020年9月8日,11:57,Danny Chan <yuzhao....@gmail.com> 写道:

Thanks Timo ~

The FLIP was already in pretty good shape, I have only 2 questions here:


1. “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a valid read-only
computed column for Kafka and can be extracted by the planner.”


What is the pros we follow the SQL-SERVER syntax here ? Usually an
expression return type can be inferred automatically. But I guess
SQL-SERVER does not have function like SYSTEM_METADATA which actually does
not have a specific return type.

And why not use the Oracle or MySQL syntax there ?

column_name [datatype] [GENERATED ALWAYS] AS (expression) [VIRTUAL]
Which is more straight-forward.

2. “SYSTEM_METADATA("offset")` returns the NULL type by default”

The default type should not be NULL because only NULL literal does
that. Usually we use ANY as the type if we do not know the specific type in
the SQL context. ANY means the physical value can be any java object.

[1] https://oracle-base.com/articles/11g/virtual-columns-11gr1
[2]
https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html

Best,
Danny Chan
在 2020年9月4日 +0800 PM4:48,Timo Walther <twal...@apache.org>,写道:
Hi everyone,

I completely reworked FLIP-107. It now covers the full story how to
read
and write metadata from/to connectors and formats. It considers all of
the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It introduces
the concept of PERSISTED computed columns and leaves out partitioning
for now.

Looking forward to your feedback.

Regards,
Timo


On 04.03.20 09:45, Kurt Young wrote:
Sorry, forgot one question.

4. Can we make the value.fields-include more orthogonal? Like one can
specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP".
With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users can not
config to
just ignore timestamp but keep key.

Best,
Kurt


On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <ykt...@gmail.com> wrote:

Hi Dawid,

I have a couple of questions around key fields, actually I also have
some
other questions but want to be focused on key fields first.

1. I don't fully understand the usage of "key.fields". Is this
option only
valid during write operation? Because for
reading, I can't imagine how such options can be applied. I would
expect
that there might be a SYSTEM_METADATA("key")
to read and assign the key to a normal field?

2. If "key.fields" is only valid in write operation, I want to
propose we
can simplify the options to not introducing key.format.type and
other related options. I think a single "key.field" (not fields)
would be
enough, users can use UDF to calculate whatever key they
want before sink.

3. Also I don't want to introduce "value.format.type" and
"value.format.xxx" with the "value" prefix. Not every connector has a
concept
of key and values. The old parameter "format.type" already good
enough to
use.

Best,
Kurt


On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <imj...@gmail.com> wrote:

Thanks Dawid,

I have two more questions.

SupportsMetadata
Introducing SupportsMetadata sounds good to me. But I have some
questions
regarding to this interface.
1) How do the source know what the expected return type of each
metadata?
2) Where to put the metadata fields? Append to the existing physical
fields?
If yes, I would suggest to change the signature to `TableSource
appendMetadataFields(String[] metadataNames, DataType[]
metadataTypes)`

SYSTEM_METADATA("partition")
Can SYSTEM_METADATA() function be used nested in a computed column
expression? If yes, how to specify the return type of
SYSTEM_METADATA?

Best,
Jark

On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz <
dwysakow...@apache.org>
wrote:

Hi,

1. I thought a bit more on how the source would emit the columns
and I
now see its not exactly the same as regular columns. I see a need
to
elaborate a bit more on that in the FLIP as you asked, Jark.

I do agree mostly with Danny on how we should do that. One
additional
things I would introduce is an

interface SupportsMetadata {

boolean supportsMetadata(Set<String> metadataFields);

TableSource generateMetadataFields(Set<String> metadataFields);

}

This way the source would have to declare/emit only the requested
metadata fields. In order not to clash with user defined fields.
When
emitting the metadata field I would prepend the column name with
__system_{property_name}. Therefore when requested
SYSTEM_METADATA("partition") the source would append a field
__system_partition to the schema. This would be never visible to
the
user as it would be used only for the subsequent computed columns.
If
that makes sense to you, I will update the FLIP with this
description.

2. CAST vs explicit type in computed columns

Here I agree with Danny. It is also the current state of the
proposal.

3. Partitioning on computed column vs function

Here I also agree with Danny. I also think those are orthogonal. I
would
leave out the STORED computed columns out of the discussion. I
don't see
how do they relate to the partitioning. I already put both of those
cases in the document. We can either partition on a computed
column or
use a udf in a partioned by clause. I am fine with leaving out the
partitioning by udf in the first version if you still have some
concerns.

As for your question Danny. It depends which partitioning strategy
you
use.

For the HASH partitioning strategy I thought it would work as you
explained. It would be N = MOD(expr, num). I am not sure though if
we
should introduce the PARTITIONS clause. Usually Flink does not own
the
data and the partitions are already an intrinsic property of the
underlying source e.g. for kafka we do not create topics, but we
just
describe pre-existing pre-partitioned topic.

4. timestamp vs timestamp.field vs connector.field vs ...

I am fine with changing it to timestamp.field to be consistent with
other value.fields and key.fields. Actually that was also my
initial
proposal in a first draft I prepared. I changed it afterwards to
shorten
the key.

Best,

Dawid

On 03/03/2020 09:00, Danny Chan wrote:
Thanks Dawid for bringing up this discussion, I think it is a
useful
feature ~

About how the metadata outputs from source

I think it is completely orthogonal, computed column push down is
another topic, this should not be a blocker but a promotion, if we
do
not
have any filters on the computed column, there is no need to do any
pushings; the source node just emit the complete record with full
metadata
with the declared physical schema, then when generating the virtual
columns, we would extract the metadata info and output as full
columns(with
full schema).

About the type of metadata column

Personally i prefer explicit type instead of CAST, they are
symantic
equivalent though, explict type is more straight-forward and we can
declare
the nullable attribute there.

About option A: partitioning based on acomputed column VS option
B:
partitioning with just a function

  From the FLIP, it seems that B's partitioning is just a strategy
when
writing data, the partiton column is not included in the table
schema,
so
it's just useless when reading from that.

- Compared to A, we do not need to generate the partition column
when
selecting from the table(but insert into)
- For A we can also mark the column as STORED when we want to
persist
that

So in my opition they are orthogonal, we can support both, i saw
that
MySQL/Oracle[1][2] would suggest to also define the PARTITIONS
num, and
the
partitions are managed under a "tablenamespace", the partition in
which
the
record is stored is partition number N, where N = MOD(expr, num),
for
your
design, which partiton the record would persist ?

[1]
https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html
[2]


https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270

Best,
Danny Chan
在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz <dwysakow...@apache.org
,写道:
Hi Jark,
Ad. 2 I added a section to discuss relation to FLIP-63
Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties.
Therefore you have the key.format.type.
I also considered exactly what you are suggesting (prefixing with
connector or kafka). I should've put that into an Option/Rejected
alternatives.
I agree timestamp, key.*, value.* are connector properties. Why I
wanted to suggest not adding that prefix in the first version is
that
actually all the properties in the WITH section are connector
properties.
Even format is in the end a connector property as some of the
sources
might
not have a format, imo. The benefit of not adding the prefix is
that it
makes the keys a bit shorter. Imagine prefixing all the properties
with
connector (or if we go with FLINK-12557: elasticsearch):
elasticsearch.key.format.type: csv
elasticsearch.key.format.field: ....
elasticsearch.key.format.delimiter: ....
elasticsearch.key.format.*: ....
I am fine with doing it though if this is a preferred approach
in the
community.
Ad in-line comments:
I forgot to update the `value.fields.include` property. It
should be
value.fields-include. Which I think you also suggested in the
comment,
right?
As for the cast vs declaring output type of computed column. I
think
it's better not to use CAST, but declare a type of an expression
and
later
on infer the output type of SYSTEM_METADATA. The reason is I think
this
way
it will be easier to implement e.g. filter push downs when working
with
the
native types of the source, e.g. in case of Kafka's offset, i
think it's
better to pushdown long rather than string. This could let us push
expression like e.g. offset > 12345 & offset < 59382. Otherwise we
would
have to push down cast(offset, long) > 12345 && cast(offset, long)
<
59382.
Moreover I think we need to introduce the type for computed columns
anyway
to support functions that infer output type based on expected
return
type.
As for the computed column push down. Yes, SYSTEM_METADATA would
have
to be pushed down to the source. If it is not possible the planner
should
fail. As far as I know computed columns push down will be part of
source
rework, won't it? ;)
As for the persisted computed column. I think it is completely
orthogonal. In my current proposal you can also partition by a
computed
column. The difference between using a udf in partitioned by vs
partitioned
by a computed column is that when you partition by a computed
column
this
column must be also computed when reading the table. If you use a
udf in
the partitioned by, the expression is computed only when inserting
into
the
table.
Hope this answers some of your questions. Looking forward for
further
suggestions.
Best,
Dawid


On 02/03/2020 05:18, Jark Wu wrote:
Hi,

Thanks Dawid for starting such a great discussion. Reaing
metadata
and
key-part information from source is an important feature for
streaming
users.

In general, I agree with the proposal of the FLIP.
I will leave my thoughts and comments here:

1) +1 to use connector properties instead of introducing HEADER
keyword as
the reason you mentioned in the FLIP.
2) we already introduced PARTITIONED BY in FLIP-63. Maybe we
should
add a
section to explain what's the relationship between them.
Do their concepts conflict? Could INSERT PARTITION be used on
the
PARTITIONED table in this FLIP?
3) Currently, properties are hierarchical in Flink SQL. Shall we
make
the
new introduced properties more hierarchical?
For example, "timestamp" => "connector.timestamp"? (actually, I
prefer
"kafka.timestamp" which is another improvement for properties
FLINK-12557)
A single "timestamp" in properties may mislead users that the
field
is
a rowtime attribute.

I also left some minor comments in the FLIP.

Thanks,
Jark



On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz <
dwysakow...@apache.org>
wrote:

Hi,

I would like to propose an improvement that would enable
reading
table
columns from different parts of source records. Besides the
main
payload
majority (if not all of the sources) expose additional
information. It
can be simply a read-only metadata such as offset, ingestion
time
or a
read and write parts of the record that contain data but
additionally
serve different purposes (partitioning, compaction etc.), e.g.
key
or
timestamp in Kafka.

We should make it possible to read and write data from all of
those
locations. In this proposal I discuss reading partitioning
data,
for
completeness this proposal discusses also the partitioning when
writing
data out.

I am looking forward to your comments.

You can access the FLIP here:




https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode

Best,

Dawid

















Reply via email to