Thanks for driving this Timo, +1 for voting ~
Best,
Danny Chan
在 2020年9月10日 +0800 PM3:47,Timo Walther <twal...@apache.org>,写道:
Thanks everyone for this healthy discussion. I updated the FLIP with the
outcome. I think the result is very powerful but also very easy to
declare. Thanks for all the contributions.
If there are no objections, I would continue with a voting.
What do you think?
Regards,
Timo
On 09.09.20 16:52, Timo Walther wrote:
"If virtual by default, when a user types "timestamp int" ==>
persisted
column, then adds a "metadata" after that ==> virtual column, then
adds
a "persisted" after that ==> persisted column."
Thanks for this nice mental model explanation, Jark. This makes total
sense to me. Also making the the most common case as short at just
adding `METADATA` is a very good idea. Thanks, Danny!
Let me update the FLIP again with all these ideas.
Regards,
Timo
On 09.09.20 15:03, Jark Wu wrote:
I'm also +1 to Danny's proposal: timestamp INT METADATA [FROM
'my-timestamp-field'] [VIRTUAL]
Especially I like the shortcut: timestamp INT METADATA, this makes
the
most
common case to be supported in the simplest way.
I also think the default should be "PERSISTED", so VIRTUAL is
optional
when
you are accessing a read-only metadata. Because:
1. The "timestamp INT METADATA" should be a normal column, because
"METADATA" is just a modifier to indicate it is from metadata, a
normal
column should be persisted.
If virtual by default, when a user types "timestamp int" ==>
persisted
column, then adds a "metadata" after that ==> virtual column, then
adds a
"persisted" after that ==> persisted column.
I think this looks reversed several times and makes users
confused.
Physical fields are also prefixed with "fieldName TYPE", so
"timestamp
INT
METADATA" is persisted is very straightforward.
2. From the collected user question [1], we can see that "timestamp"
is the
most common use case. "timestamp" is a read-write metadata.
Persisted by
default doesn't break the reading behavior.
Best,
Jark
[1]: https://issues.apache.org/jira/browse/FLINK-15869
On Wed, 9 Sep 2020 at 20:56, Leonard Xu <xbjt...@gmail.com> wrote:
Thanks @Dawid for the nice summary, I think you catch all
opinions of
the
long discussion well.
@Danny
“ timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL]
Note that the "FROM 'field name'" is only needed when the name
conflict
with the declared table column name, when there are no
conflicts,
we can
simplify it to
timestamp INT METADATA"
I really like the proposal, there is no confusion with computed
column any
more, and it’s concise enough.
@Timo @Dawid
“We use `SYSTEM_TIME` for temporal tables. I think prefixing with
SYSTEM
makes it clearer that it comes magically from the system.”
“As for the issue of shortening the SYSTEM_METADATA to METADATA.
Here I
very much prefer the SYSTEM_ prefix.”
I think `SYSTEM_TIME` is different with `SYSTEM_METADATA ` a lot,
First of all, the word `TIME` has broad meanings but the word
`METADATA `
not, `METADATA ` has specific meaning,
Secondly, `FOR SYSTEM_TIME AS OF` exists in SQL standard but
`SYSTEM_METADATA ` not.
Personally, I like more simplify way,sometimes less is more.
Best,
Leonard
Timo Walther <twal...@apache.org> 于2020年9月9日周三 下午6:41写道:
Hi everyone,
"key" and "value" in the properties are a special case
because they
need
to configure a format. So key and value are more than just
metadata.
Jark's example for setting a timestamp would work but as the
FLIP
discusses, we have way more metadata fields like headers,
epoch-leader,
etc. Having a property for all of this metadata would mess up
the WITH
section entirely. Furthermore, we also want to deal with
metadata from
the formats. Solving this through properties as well would
further
complicate the property design.
Personally, I still like the computed column design more
because it
allows to have full flexibility to compute the final column:
timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS
TIMESTAMP(3)))
Instead of having a helper column and a real column in the
table:
helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3))
realTimestamp AS adjustTimestamp(helperTimestamp)
But I see that the discussion leans towards:
timestamp INT SYSTEM_METADATA("ts")
Which is fine with me. It is the shortest solution, because
we don't
need additional CAST. We can discuss the syntax, so that
confusion
with
computed columns can be avoided.
timestamp INT USING SYSTEM_METADATA("ts")
timestamp INT FROM SYSTEM_METADATA("ts")
timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED
We use `SYSTEM_TIME` for temporal tables. I think prefixing
with
SYSTEM
makes it clearer that it comes magically from the system.
What do you think?
Regards,
Timo
On 09.09.20 11:41, Jark Wu wrote:
Hi Danny,
This is not Oracle and MySQL computed column syntax,
because there is
no
"AS" after the type.
Hi everyone,
If we want to use "offset INT SYSTEM_METADATA("offset")",
then I
think
we
must further discuss about "PERSISED" or "VIRTUAL" keyword
for
query-sink
schema problem.
Personally, I think we can use a shorter keyword "METADATA"
for
"SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a
system
function
and confuse users this looks like a computed column.
Best,
Jark
On Wed, 9 Sep 2020 at 17:23, Danny Chan <
danny0...@apache.org> wrote:
"offset INT SYSTEM_METADATA("offset")"
This is actually Oracle or MySQL style computed column
syntax.
"You are right that one could argue that "timestamp",
"headers" are
something like "key" and "value""
I have the same feeling, both key value and headers
timestamp are
*real*
data
stored in the consumed record, they are not computed or
generated.
"Trying to solve everything via properties sounds rather
like a hack
to
me"
Things are not that hack if we can unify the routines or
the
definitions
(all from the computed column way or all from the table
options), i
also
think that it is a hacky that we mix in 2 kinds of syntax
for
different
kinds of metadata (read-only and read-write). In this
FLIP, we
declare
the
Kafka key fields with table options but SYSTEM_METADATA
for other
metadata,
that is a hacky thing or something in-consistent.
Kurt Young <ykt...@gmail.com> 于2020年9月9日周三 下午4:48写道:
I would vote for `offset INT
SYSTEM_METADATA("offset")`.
I don't think we can stick with the SQL standard in DDL
part
forever,
especially as there are more and more
requirements coming from different connectors and
external systems.
Best,
Kurt
On Wed, Sep 9, 2020 at 4:40 PM Timo Walther <
twal...@apache.org>
wrote:
Hi Jark,
now we are back at the original design proposed by
Dawid :D
Yes, we
should be cautious about adding new syntax. But the
length of this
discussion shows that we are looking for a good
long-term
solution.
In
this case I would rather vote for a deep integration
into the
syntax.
Computed columns are also not SQL standard compliant.
And our
DDL is
neither, so we have some degree of freedom here.
Trying to solve everything via properties sounds
rather like a
hack
to
me. You are right that one could argue that
"timestamp", "headers"
are
something like "key" and "value". However, mixing
`offset AS SYSTEM_METADATA("offset")`
and
`'timestamp.field' = 'ts'`
looks more confusing to users that an explicit
`offset AS CAST(SYSTEM_METADATA("offset") AS INT)`
or
`offset INT SYSTEM_METADATA("offset")`
that is symetric for both source and sink.
What do others think?
Regards,
Timo
On 09.09.20 10:09, Jark Wu wrote:
Hi everyone,
I think we have a conclusion that the writable
metadata shouldn't
be
defined as a computed column, but a normal column.
"timestamp STRING SYSTEM_METADATA('timestamp')" is
one of the
approaches.
However, it is not SQL standard compliant, we need
to be cautious
enough
when adding new syntax.
Besides, we have to introduce the `PERSISTED` or
`VIRTUAL`
keyword
to
resolve the query-sink schema problem if it is
read-only
metadata.
That
adds more stuff to learn for users.
From my point of view, the "timestamp",
"headers" are something
like
"key"
and "value" that stores with the real data. So why
not define the
"timestamp" in the same way with "key" by using a
"timestamp.field"
connector option?
On the other side, the read-only metadata, such as
"offset",
shouldn't
be
defined as a normal column. So why not use the
existing computed
column
syntax for such metadata? Then we don't have the
query-sink
schema
problem.
So here is my proposal:
CREATE TABLE kafka_table (
id BIGINT,
name STRING,
col1 STRING,
col2 STRING,
ts TIMESTAMP(3) WITH LOCAL TIME ZONE, -- ts
is a normal
field,
so
can
be read and written.
offset AS SYSTEM_METADATA("offset")
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'key.fields' = 'id, name',
'key.format' = 'csv',
'value.format' = 'avro',
'timestamp.field' = 'ts' -- define the
mapping of Kafka
timestamp
);
INSERT INTO kafka_table
SELECT id, name, col1, col2, rowtime FROM
another_table;
I think this can solve all the problems without
introducing
any new
syntax.
The only minor disadvantage is that we separate the
definition
way/syntax
of read-only metadata and read-write fields.
However, I don't think this is a big problem.
Best,
Jark
On Wed, 9 Sep 2020 at 15:09, Timo Walther <
twal...@apache.org>
wrote:
Hi Kurt,
thanks for sharing your opinion. I'm totally up
for not reusing
computed
columns. I think Jark was a big supporter of this
syntax, @Jark
are
you
fine with this as well? The non-computed column
approach was
only
a
"slightly rejected alternative".
Furthermore, we would need to think about how
such a new design
influences the LIKE clause though.
However, we should still keep the `PERSISTED`
keyword as it
influences
the query->sink schema. If you look at the list
of metadata for
existing
connectors and formats, we currently offer only
two writable
metadata
fields. Otherwise, one would need to declare two
tables
whenever a
metadata columns is read (one for the source, one
for the sink).
This
can be quite inconvientient e.g. for just reading
the topic.
Regards,
Timo
On 09.09.20 08:52, Kurt Young wrote:
I also share the concern that reusing the
computed column
syntax
but
have
different semantics
would confuse users a lot.
Besides, I think metadata fields are
conceptually not the same
with
computed columns. The metadata
field is a connector specific thing and it only
contains the
information
that where does the field come
from (during source) or where does the field
need to write to
(during
sink). It's more similar with normal
fields, with assumption that all these fields
need going to the
data
part.
Thus I'm more lean to the rejected alternative
that Timo
mentioned.
And I
think we don't need the
PERSISTED keyword, SYSTEM_METADATA should be
enough.
During implementation, the framework only needs
to pass such
<field,
metadata field> information to the
connector, and the logic of handling such
fields inside the
connector
should be straightforward.
Regarding the downside Timo mentioned:
The disadvantage is that users cannot call
UDFs or parse
timestamps.
I think this is fairly simple to solve. Since
the metadata
field
isn't
a
computed column anymore, we can support
referencing such fields in the computed column.
For example:
CREATE TABLE kafka_table (
id BIGINT,
name STRING,
timestamp STRING
SYSTEM_METADATA("timestamp"), //
get the
timestamp
field from metadata
ts AS to_timestamp(timestamp) // normal
computed
column,
parse
the
string to TIMESTAMP type by using the metadata
field
) WITH (
...
)
Best,
Kurt
On Tue, Sep 8, 2020 at 11:57 PM Timo Walther
<twal...@apache.org
wrote:
Hi Leonard,
the only alternative I see is that we
introduce a concept that
is
completely different to computed columns.
This is also
mentioned
in
the
rejected alternative section of the FLIP.
Something like:
CREATE TABLE kafka_table (
id BIGINT,
name STRING,
timestamp INT
SYSTEM_METADATA("timestamp") PERSISTED,
headers MAP<STRING, BYTES>
SYSTEM_METADATA("headers")
PERSISTED
) WITH (
...
)
This way we would avoid confusion at all and
can easily map
columns
to
metadata columns. The disadvantage is that
users cannot call
UDFs
or
parse timestamps. This would need to be done
in a real
computed
column.
I'm happy about better alternatives.
Regards,
Timo
On 08.09.20 15:37, Leonard Xu wrote:
HI, Timo
Thanks for driving this FLIP.
Sorry but I have a concern about Writing
metadata via
DynamicTableSink
section:
CREATE TABLE kafka_table (
id BIGINT,
name STRING,
timestamp AS
CAST(SYSTEM_METADATA("timestamp") AS
BIGINT)
PERSISTED,
headers AS
CAST(SYSTEM_METADATA("headers") AS
MAP<STRING,
BYTES>)
PERSISTED
) WITH (
...
)
An insert statement could look like:
INSERT INTO kafka_table VALUES (
(1, "ABC", 1599133672, MAP('checksum',
computeChecksum(...)))
)
The proposed INERT syntax does not make
sense to me,
because it
contains
computed(generated) column.
Both SQL server and Postgresql do not allow
to insert
value to
computed
columns even they are persisted, this boke
the generated
column
semantics
and may confuse user much.
For SQL server computed column[1]:
column_name AS computed_column_expression
[ PERSISTED [ NOT
NULL ]
]...
NOTE: A computed column cannot be the
target of an INSERT or
UPDATE
statement.
For Postgresql generated column[2]:
height_in numeric GENERATED ALWAYS
AS (height_cm /
2.54)
STORED
NOTE: A generated column cannot be
written to directly. In
INSERT
or
UPDATE commands, a value cannot be specified
for a generated
column,
but
the keyword DEFAULT may be specified.
It shouldn't be allowed to set/update value
for generated
column
after
lookup the SQL 2016:
<insert statement> ::=
INSERT INTO <insertion target> <insert
columns and source>
If <contextually typed table value
constructor> CTTVC is
specified,
then every <contextually typed row
value constructor element> simply
contained in CTTVC whose
positionally
corresponding <column name>
in <insert column list> references a
column of which some
underlying
column is a generated column shall
be a <default specification>.
A <default specification> specifies the
default value of
some
associated item.
[1]
https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
<
https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
[2]
https://www.postgresql.org/docs/12/ddl-generated-columns.html
<
https://www.postgresql.org/docs/12/ddl-generated-columns.html>
在 2020年9月8日,17:31,Timo Walther <
twal...@apache.org>
写道:
Hi Jark,
according to Flink's and Calcite's
casting definition in
[1][2]
TIMESTAMP WITH LOCAL TIME ZONE should be
castable from BIGINT.
If
not,
we
will make it possible ;-)
I'm aware of
DeserializationSchema.getProducedType but I
think
that
this method is actually misplaced. The type
should rather be
passed
to
the
source itself.
For our Kafka SQL source, we will also
not use this method
because
the
Kafka source will add own metadata in
addition to the
DeserializationSchema. So
DeserializationSchema.getProducedType
will
never
be read.
For now I suggest to leave out the
`DataType` from
DecodingFormat.applyReadableMetadata. Also
because the
format's
physical
type is passed later in
`createRuntimeDecoder`. If
necessary, it
can
be
computed manually by consumedType + metadata
types. We will
provide
a
metadata utility class for that.
Regards,
Timo
[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200
[2]
https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254
On 08.09.20 10:52, Jark Wu wrote:
Hi Timo,
The updated CAST SYSTEM_METADATA
behavior sounds good to
me.
I
just
noticed
that a BIGINT can't be converted to
"TIMESTAMP(3) WITH
LOCAL
TIME
ZONE".
So maybe we need to support this, or
use "TIMESTAMP(3) WITH
LOCAL
TIME
ZONE" as the defined type of Kafka
timestamp? I think this
makes
sense,
because it represents the milli-seconds
since epoch.
Regarding "DeserializationSchema
doesn't need TypeInfo", I
don't
think
so.
The DeserializationSchema implements
ResultTypeQueryable,
thus
the
implementation needs to return an
output TypeInfo.
Besides, FlinkKafkaConsumer also
calls
DeserializationSchema.getProducedType as the produced
type
of
the
source function [1].
Best,
Jark
[1]:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066
On Tue, 8 Sep 2020 at 16:35, Timo
Walther <
twal...@apache.org>
wrote:
Hi everyone,
I updated the FLIP again and hope
that I could address the
mentioned
concerns.
@Leonard: Thanks for the explanation.
I wasn't aware that
ts_ms
and
source.ts_ms have different
semantics. I updated the FLIP
and
expose
the
most commonly used properties
separately. So frequently
used
properties
are not hidden in the MAP anymore:
debezium-json.ingestion-timestamp
debezium-json.source.timestamp
debezium-json.source.database
debezium-json.source.schema
debezium-json.source.table
However, since other properties
depend on the used
connector/vendor,
the
remaining options are stored in:
debezium-json.source.properties
And accessed with:
CAST(SYSTEM_METADATA('debezium-json.source.properties') AS
MAP<STRING,
STRING>)['table']
Otherwise it is not possible to
figure out the value and
column
type
during validation.
@Jark: You convinced me in relaxing
the CAST
constraints. I
added
a
dedicacated sub-section to the FLIP:
For making the use of SYSTEM_METADATA
easier and avoid
nested
casting
we
allow explicit casting to a target
data type:
rowtime AS
CAST(SYSTEM_METADATA("timestamp") AS
TIMESTAMP(3)
WITH
LOCAL
TIME ZONE)
A connector still produces and
consumes the data type
returned
by
`listMetadata()`. The planner will
insert necessary
explicit
casts.
In any case, the user must provide a
CAST such that the
computed
column
receives a valid data type when
constructing the table
schema.
"I don't see a reason why
`DecodingFormat#applyReadableMetadata`
needs a
DataType argument."
Correct he DeserializationSchema
doesn't need TypeInfo, it
is
always
executed locally. It is the source
that needs TypeInfo for
serializing
the record to the next operator. And
that's this is
what we
provide.
@Danny:
“SYSTEM_METADATA("offset")` returns
the NULL type by
default”
We can also use some other means to
represent an UNKNOWN
data
type.
In
the Flink type system, we use the
NullType for it. The
important
part
is
that the final data type is known for
the entire computed
column.
As I
mentioned before, I would avoid the
suggested option b)
that
would
be
similar to your suggestion. The CAST
should be enough and
allows
for
complex expressions in the computed
column. Option b)
would
need
parser
changes.
Regards,
Timo
On 08.09.20 06:21, Leonard Xu wrote:
Hi, Timo
Thanks for you explanation and
update, I have only one
question
for
the latest FLIP.
About the MAP<STRING, STRING>
DataType of key
'debezium-json.source', if
user want to use the table name
metadata, they need to
write:
tableName STRING AS
CAST(SYSTEM_METADATA('debeuim-json.source')
AS
MAP<STRING, STRING>)['table']
the expression is a little complex
for user, Could we
only
support
necessary metas with simple DataType
as following?
tableName STRING AS
CAST(SYSTEM_METADATA('debeuim-json.source.table') AS
STRING),
transactionTime LONG AS
CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS
BIGINT),
In this way, we can simplify the
expression, the mainly
used
metadata in
changelog format may include
'database','table','source.ts_ms','ts_ms' from
my side,
maybe we could only support them at
first version.
Both Debezium and Canal have above
four metadata, and I‘m
willing
to
take some subtasks in next
development if necessary.
Debezium:
{
"before": null,
"after": { "id":
101,"name": "scooter"},
"source": {
"db":
"inventory", # 1.
database
name
the
changelog belongs to.
"table":
"products", # 2.
table name
the
changelog
belongs to.
"ts_ms":
1589355504100, # 3.
timestamp
of
the
change
happened in database system, i.e.:
transaction time in
database.
"connector": "mysql",
….
},
"ts_ms":
1589355606100, # 4.
timestamp
when
the
debezium
processed the changelog.
"op": "c",
"transaction": null
}
Canal:
{
"data": [{ "id": "102",
"name": "car battery" }],
"database":
"inventory", # 1. database
name the
changelog
belongs to.
"table":
"products", # 2. table name the
changelog
belongs
to.
"es":
1589374013000, # 3. execution
time of
the
change
in
database system, i.e.: transaction
time in database.
"ts":
1589374013680, # 4. timestamp
when the
cannal
processed the changelog.
"isDdl": false,
"mysqlType": {},
....
}
Best
Leonard
在 2020年9月8日,11:57,Danny Chan
<yuzhao....@gmail.com> 写道:
Thanks Timo ~
The FLIP was already in pretty
good shape, I have only 2
questions
here:
1.
“`CAST(SYSTEM_METADATA("offset") AS INT)` would be a
valid
read-only
computed column for Kafka and can be
extracted by the
planner.”
What is the pros we follow the
SQL-SERVER syntax here ?
Usually
an
expression return type can be
inferred automatically.
But I
guess
SQL-SERVER does not have function
like SYSTEM_METADATA
which
actually
does
not have a specific return type.
And why not use the Oracle or
MySQL syntax there ?
column_name [datatype] [GENERATED
ALWAYS] AS
(expression)
[VIRTUAL]
Which is more straight-forward.
2. “SYSTEM_METADATA("offset")`
returns the NULL type by
default”
The default type should not be
NULL because only NULL
literal
does
that. Usually we use ANY as the type
if we do not know the
specific
type in
the SQL context. ANY means the
physical value can be any
java
object.
[1]
https://oracle-base.com/articles/11g/virtual-columns-11gr1
[2]
https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html
Best,
Danny Chan
在 2020年9月4日 +0800 PM4:48,Timo
Walther
<twal...@apache.org
,写道:
Hi everyone,
I completely reworked FLIP-107.
It now covers the full
story
how
to
read
and write metadata from/to
connectors and formats. It
considers
all of
the latest FLIPs, namely
FLIP-95, FLIP-132 and
FLIP-122.
It
introduces
the concept of PERSISTED
computed columns and leaves
out
partitioning
for now.
Looking forward to your
feedback.
Regards,
Timo
On 04.03.20 09:45, Kurt Young
wrote:
Sorry, forgot one question.
4. Can we make the
value.fields-include more
orthogonal?
Like
one
can
specify it as "EXCEPT_KEY,
EXCEPT_TIMESTAMP".
With current EXCEPT_KEY and
EXCEPT_KEY_TIMESTAMP,
users
can
not
config to
just ignore timestamp but
keep key.
Best,
Kurt
On Wed, Mar 4, 2020 at 4:42
PM Kurt Young <
ykt...@gmail.com
wrote:
Hi Dawid,
I have a couple of
questions around key fields,
actually
I
also
have
some
other questions but want to
be focused on key fields
first.
1. I don't fully understand
the usage of
"key.fields".
Is
this
option only
valid during write
operation? Because for
reading, I can't imagine
how such options can be
applied. I
would
expect
that there might be a
SYSTEM_METADATA("key")
to read and assign the key
to a normal field?
2. If "key.fields" is only
valid in write
operation, I
want
to
propose we
can simplify the options to
not introducing
key.format.type
and
other related options. I
think a single "key.field"
(not
fields)
would be
enough, users can use UDF
to calculate whatever key
they
want before sink.
3. Also I don't want to
introduce "value.format.type"
and
"value.format.xxx" with the
"value" prefix. Not every
connector
has a
concept
of key and values. The old
parameter "format.type"
already
good
enough to
use.
Best,
Kurt
On Tue, Mar 3, 2020 at
10:40 PM Jark Wu <
imj...@gmail.com>
wrote:
Thanks Dawid,
I have two more questions.
SupportsMetadata
Introducing
SupportsMetadata sounds good to me.
But I
have
some
questions
regarding to this
interface.
1) How do the source know
what the expected return
type
of
each
metadata?
2) Where to put the
metadata fields? Append to the
existing
physical
fields?
If yes, I would suggest
to change the signature to
`TableSource
appendMetadataFields(String[] metadataNames,
DataType[]
metadataTypes)`
SYSTEM_METADATA("partition")
Can SYSTEM_METADATA()
function be used nested in a
computed
column
expression? If yes, how
to specify the return
type of
SYSTEM_METADATA?
Best,
Jark
On Tue, 3 Mar 2020 at
17:06, Dawid Wysakowicz <
dwysakow...@apache.org>
wrote:
Hi,
1. I thought a bit more
on how the source would
emit
the
columns
and I
now see its not exactly
the same as regular
columns.
I
see
a
need
to
elaborate a bit more on
that in the FLIP as you
asked,
Jark.
I do agree mostly with
Danny on how we should do
that.
One
additional
things I would
introduce is an
interface
SupportsMetadata {
boolean
supportsMetadata(Set<String>
metadataFields);
TableSource
generateMetadataFields(Set<String>
metadataFields);
}
This way the source
would have to declare/emit only
the
requested
metadata fields. In
order not to clash with user
defined
fields.
When
emitting the metadata
field I would prepend the
column
name
with
__system_{property_name}. Therefore when requested
SYSTEM_METADATA("partition") the source would
append
a
field
__system_partition to
the schema. This would be
never
visible
to
the
user as it would be
used only for the subsequent
computed
columns.
If
that makes sense to
you, I will update the FLIP
with
this
description.
2. CAST vs explicit
type in computed columns
Here I agree with
Danny. It is also the current
state
of
the
proposal.
3. Partitioning on
computed column vs function
Here I also agree with
Danny. I also think those
are
orthogonal. I
would
leave out the STORED
computed columns out of the
discussion.
I
don't see
how do they relate to
the partitioning. I
already put
both
of
those
cases in the document.
We can either partition on a
computed
column or
use a udf in a
partioned by clause. I am fine with
leaving
out
the
partitioning by udf in
the first version if you
still
have
some
concerns.
As for your question
Danny. It depends which
partitioning
strategy
you
use.
For the HASH
partitioning strategy I thought it
would
work
as
you
explained. It would be
N = MOD(expr, num). I am not
sure
though if
we
should introduce the
PARTITIONS clause. Usually
Flink
does
not
own
the
data and the partitions
are already an intrinsic
property
of
the
underlying source e.g.
for kafka we do not create
topics,
but
we
just
describe pre-existing
pre-partitioned topic.
4. timestamp vs
timestamp.field vs
connector.field vs
...
I am fine with changing
it to timestamp.field to be
consistent
with
other value.fields and
key.fields. Actually that
was
also
my
initial
proposal in a first
draft I prepared. I changed it
afterwards
to
shorten
the key.
Best,
Dawid
On 03/03/2020 09:00,
Danny Chan wrote:
Thanks Dawid for
bringing up this discussion, I
think
it
is
a
useful
feature ~
About how the
metadata outputs from source
I think it is
completely orthogonal, computed
column
push
down is
another topic, this
should not be a blocker but a
promotion,
if we
do
not
have any filters on the
computed column, there
is no
need
to
do any
pushings; the source
node just emit the complete
record
with
full
metadata
with the declared
physical schema, then when
generating
the
virtual
columns, we would
extract the metadata info and
output
as
full
columns(with
full schema).
About the type of
metadata column
Personally i prefer
explicit type instead of CAST,
they
are
symantic
equivalent though,
explict type is more
straight-forward
and
we can
declare
the nullable attribute
there.
About option A:
partitioning based on acomputed
column
VS
option
B:
partitioning with just
a function
From the FLIP,
it seems that B's
partitioning is
just
a
strategy
when
writing data, the
partiton column is not
included in
the
table
schema,
so
it's just useless when
reading from that.
- Compared to A, we
do not need to generate the
partition
column
when
selecting from the
table(but insert into)
- For A we can also
mark the column as STORED when
we
want
to
persist
that
So in my opition they
are orthogonal, we can
support
both, i
saw
that
MySQL/Oracle[1][2]
would suggest to also define the
PARTITIONS
num, and
the
partitions are managed
under a "tablenamespace",
the
partition
in
which
the
record is stored is
partition number N, where N =
MOD(expr,
num),
for
your
design, which partiton
the record would persist ?
[1]
https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html
[2]
https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270
Best,
Danny Chan
在 2020年3月2日 +0800
PM6:16,Dawid Wysakowicz <
dwysakow...@apache.org
,写道:
Hi Jark,
Ad. 2 I added a
section to discuss relation to
FLIP-63
Ad. 3 Yes, I also
tried to somewhat keep
hierarchy
of
properties.
Therefore you have the
key.format.type.
I also considered
exactly what you are suggesting
(prefixing
with
connector or kafka). I
should've put that into an
Option/Rejected
alternatives.
I agree timestamp,
key.*, value.* are connector
properties.
Why I
wanted to suggest not
adding that prefix in the
first
version
is
that
actually all the
properties in the WITH section are
connector
properties.
Even format is in the
end a connector property as
some
of
the
sources
might
not have a format, imo.
The benefit of not
adding the
prefix
is
that it
makes the keys a bit
shorter. Imagine prefixing all
the
properties
with
connector (or if we go
with FLINK-12557:
elasticsearch):
elasticsearch.key.format.type: csv
elasticsearch.key.format.field: ....
elasticsearch.key.format.delimiter: ....
elasticsearch.key.format.*: ....
I am fine with
doing it though if this is a
preferred
approach
in the
community.
Ad in-line comments:
I forgot to update
the `value.fields.include`
property.
It
should be
value.fields-include.
Which I think you also
suggested
in
the
comment,
right?
As for the cast vs
declaring output type of
computed
column.
I
think
it's better not to use
CAST, but declare a type
of an
expression
and
later
on infer the output
type of SYSTEM_METADATA. The
reason
is
I
think
this
way
it will be easier to
implement e.g. filter push
downs
when
working
with
the
native types of the
source, e.g. in case of Kafka's
offset, i
think it's
better to pushdown long
rather than string. This
could
let
us
push
expression like e.g.
offset > 12345 & offset <
59382.
Otherwise we
would
have to push down
cast(offset, long) > 12345 &&
cast(offset,
long)
<
59382.
Moreover I think we
need to introduce the type for
computed
columns
anyway
to support functions
that infer output type
based on
expected
return
type.
As for the computed
column push down. Yes,
SYSTEM_METADATA
would
have
to be pushed down to
the source. If it is not
possible
the
planner
should
fail. As far as I know
computed columns push down
will
be
part
of
source
rework, won't it? ;)
As for the
persisted computed column. I think
it is
completely
orthogonal. In my
current proposal you can also
partition
by
a
computed
column. The difference
between using a udf in
partitioned
by
vs
partitioned
by a computed column is
that when you partition
by a
computed
column
this
column must be also
computed when reading the
table.
If
you
use a
udf in
the partitioned by, the
expression is computed only
when
inserting
into
the
table.
Hope this answers
some of your questions. Looking
forward
for
further
suggestions.
Best,
Dawid
On 02/03/2020
05:18, Jark Wu wrote:
Hi,
Thanks Dawid for
starting such a great
discussion.
Reaing
metadata
and
key-part
information from source is an important
feature
for
streaming
users.
In general, I
agree with the proposal of the
FLIP.
I will leave my
thoughts and comments here:
1) +1 to use
connector properties instead of
introducing
HEADER
keyword as
the reason you
mentioned in the FLIP.
2) we already
introduced PARTITIONED BY in
FLIP-63.
Maybe
we
should
add a
section to
explain what's the relationship
between
them.
Do their concepts
conflict? Could INSERT
PARTITION
be
used
on
the
PARTITIONED table
in this FLIP?
3) Currently,
properties are hierarchical in
Flink
SQL.
Shall we
make
the
new introduced
properties more hierarchical?
For example,
"timestamp" =>
"connector.timestamp"?
(actually, I
prefer
"kafka.timestamp"
which is another
improvement for
properties
FLINK-12557)
A single
"timestamp" in properties may mislead
users
that
the
field
is
a rowtime
attribute.
I also left some
minor comments in the FLIP.
Thanks,
Jark
On Sun, 1 Mar
2020 at 22:30, Dawid Wysakowicz <
dwysakow...@apache.org>
wrote:
Hi,
I would like to
propose an improvement that
would
enable
reading
table
columns from
different parts of source records.
Besides
the
main
payload
majority (if
not all of the sources) expose
additional
information. It
can be simply a
read-only metadata such as
offset,
ingestion
time
or a
read and write
parts of the record that contain
data
but
additionally
serve different
purposes (partitioning,
compaction
etc.),
e.g.
key
or
timestamp in
Kafka.
We should make
it possible to read and write
data
from
all
of
those
locations. In
this proposal I discuss reading
partitioning
data,
for
completeness
this proposal discusses also the
partitioning
when
writing
data out.
I am looking
forward to your comments.
You can access
the FLIP here:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode
Best,
Dawid