+1 for voting

Regards,
Timo

On 23.10.20 09:07, Jark Wu wrote:
Thanks Shengkai!

+1 to start voting.

Best,
Jark

On Fri, 23 Oct 2020 at 15:02, Shengkai Fang <fskm...@gmail.com> wrote:

Add one more message, I have already updated the FLIP[1].

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector

Shengkai Fang <fskm...@gmail.com> 于2020年10月23日周五 下午2:55写道:

Hi, all.
It seems we have reached a consensus on the FLIP. If no one has other
objections, I would like to start the vote for FLIP-149.

Best,
Shengkai

Jingsong Li <jingsongl...@gmail.com> 于2020年10月23日周五 下午2:25写道:

Thanks for explanation,

I am OK for `upsert`. Yes, Its concept has been accepted by many
systems.

Best,
Jingsong

On Fri, Oct 23, 2020 at 12:38 PM Jark Wu <imj...@gmail.com> wrote:

Hi Timo,

I have some concerns about `kafka-cdc`,
1) cdc is an abbreviation of Change Data Capture which is commonly
used
for
databases, not for message queues.
2) usually, cdc produces full content of changelog, including
UPDATE_BEFORE, however "upsert kafka" doesn't
3) `kafka-cdc` sounds like a natively support for `debezium-json`
format,
however, it is not and even we don't want
    "upsert kafka" to support "debezium-json"


Hi Jingsong,

I think the terminology of "upsert" is fine, because Kafka also uses
"upsert" to define such behavior in their official documentation [1]:

a data record in a changelog stream is interpreted as an UPSERT aka
INSERT/UPDATE

Materialize uses the "UPSERT" keyword to define such behavior too [2].
Users have been requesting such feature using "upsert kafka"
terminology in
user mailing lists [3][4].
Many other systems support "UPSERT" statement natively, such as impala
[5],
SAP [6], Phoenix [7], Oracle NoSQL [8], etc..

Therefore, I think we don't need to be afraid of introducing "upsert"
terminology, it is widely accepted by users.

Best,
Jark


[1]:



https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable
[2]:



https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic
[3]:



http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-materialized-upsert-tables-td18482.html#a18503
[4]:



http://apache-flink.147419.n8.nabble.com/Kafka-Sink-AppendStreamTableSink-doesn-t-support-consuming-update-changes-td5959.html
[5]:
https://impala.apache.org/docs/build/html/topics/impala_upsert.html
[6]:



https://help.sap.com/viewer/7c78579ce9b14a669c1f3295b0d8ca16/Cloud/en-US/ea8b6773be584203bcd99da76844c5ed.html
[7]: https://phoenix.apache.org/atomic_upsert.html
[8]:



https://docs.oracle.com/en/database/other-databases/nosql-database/18.3/sqlfornosql/adding-table-rows-using-insert-and-upsert-statements.html

On Fri, 23 Oct 2020 at 10:36, Jingsong Li <jingsongl...@gmail.com>
wrote:

The `kafka-cdc` looks good to me.
We can even give options to indicate whether to turn on compact,
because
compact is just an optimization?

- ktable let me think about KSQL.
- kafka-compacted it is not just compacted, more than that, it still
has
the ability of CDC
- upsert-kafka , upsert is back, and I don't really want to see it
again
since we have CDC

Best,
Jingsong

On Fri, Oct 23, 2020 at 2:21 AM Timo Walther <twal...@apache.org>
wrote:

Hi Jark,

I would be fine with `connector=upsert-kafka`. Another idea would
be to
align the name to other available Flink connectors [1]:

`connector=kafka-cdc`.

Regards,
Timo

[1] https://github.com/ververica/flink-cdc-connectors

On 22.10.20 17:17, Jark Wu wrote:
Another name is "connector=upsert-kafka', I think this can solve
Timo's
concern on the "compacted" word.

Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify
such
kafka
sources.
I think "upsert" is a well-known terminology widely used in many
systems
and matches the
   behavior of how we handle the kafka messages.

What do you think?

Best,
Jark

[1]:





https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic




On Thu, 22 Oct 2020 at 22:53, Kurt Young <ykt...@gmail.com>
wrote:

Good validation messages can't solve the broken user
experience,
especially
that
such update mode option will implicitly make half of current
kafka
options
invalid or doesn't
make sense.

Best,
Kurt


On Thu, Oct 22, 2020 at 10:31 PM Jark Wu <imj...@gmail.com>
wrote:

Hi Timo, Seth,

The default value "inserting" of "mode" might be not suitable,
because "debezium-json" emits changelog messages which include
updates.

On Thu, 22 Oct 2020 at 22:10, Seth Wiesman <
s...@ververica.com>
wrote:

+1 for supporting upsert results into Kafka.

I have no comments on the implementation details.

As far as configuration goes, I tend to favor Timo's option
where
we
add
a
"mode" property to the existing Kafka table with default
value
"inserting".
If the mode is set to "updating" then the validation changes
to
the
new
requirements. I personally find it more intuitive than a
seperate
connector, my fear is users won't understand its the same
physical
kafka
sink under the hood and it will lead to other confusion like
does
it
offer
the same persistence guarantees? I think we are capable of
adding
good
valdiation messaging that solves Jark and Kurts concerns.


On Thu, Oct 22, 2020 at 8:51 AM Timo Walther <
twal...@apache.org>
wrote:

Hi Jark,

"calling it "kafka-compacted" can even remind users to
enable
log
compaction"

But sometimes users like to store a lineage of changes in
their
topics.
Indepent of any ktable/kstream interpretation.

I let the majority decide on this topic to not further block
this
effort. But we might find a better name like:

connector = kafka
mode = updating/inserting

OR

connector = kafka-updating

...

Regards,
Timo




On 22.10.20 15:24, Jark Wu wrote:
Hi Timo,

Thanks for your opinions.

1) Implementation
We will have an stateful operator to generate INSERT and
UPDATE_BEFORE.
This operator is keyby-ed (primary key as the shuffle key)
after
the
source
operator.
The implementation of this operator is very similar to the
existing
`DeduplicateKeepLastRowFunction`.
The operator will register a value state using the primary
key
fields
as
keys.
When the value state is empty under current key, we will
emit
INSERT
for
the input row.
When the value state is not empty under current key, we
will
emit
UPDATE_BEFORE using the row in state,
and emit UPDATE_AFTER using the input row.
When the input row is DELETE, we will clear state and emit
DELETE
row.

2) new option vs new connector
We recently simplified the table options to a minimum
amount of
characters to be as concise as possible in the DDL.
I think this is the reason why we want to introduce a new
connector,
because we can simplify the options in DDL.
For example, if using a new option, the DDL may look like
this:

CREATE TABLE users (
     user_id BIGINT,
     user_name STRING,
     user_level STRING,
     region STRING,
     PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
     'connector' = 'kafka',
     'model' = 'table',
     'topic' = 'pageviews_per_region',
     'properties.bootstrap.servers' = '...',
     'properties.group.id' = 'testGroup',
     'scan.startup.mode' = 'earliest',
     'key.format' = 'csv',
     'key.fields' = 'user_id',
     'value.format' = 'avro',
     'sink.partitioner' = 'hash'
);

If using a new connector, we can have a different default
value
for
the
options and remove unnecessary options,
the DDL can look like this which is much more concise:

CREATE TABLE pageviews_per_region (
     user_id BIGINT,
     user_name STRING,
     user_level STRING,
     region STRING,
     PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
     'connector' = 'kafka-compacted',
     'topic' = 'pageviews_per_region',
     'properties.bootstrap.servers' = '...',
     'key.format' = 'csv',
     'value.format' = 'avro'
);

When people read `connector=kafka-compacted` they might
not
know
that
it
has ktable semantics. You don't need to enable log
compaction
in
order
to use a KTable as far as I know.
We don't need to let users know it has ktable semantics, as
Konstantin
mentioned this may carry more implicit
meaning than we want to imply here. I agree users don't
need
to
enable
log
compaction, but from the production perspective,
log compaction should always be enabled if it is used in
this
purpose.
Calling it "kafka-compacted" can even remind users to
enable
log
compaction.

I don't agree to introduce "model = table/stream" option,
or
"connector=kafka-table",
because this means we are introducing Table vs Stream
concept
from
KSQL.
However, we don't have such top-level concept in Flink SQL
now,
this
will
further confuse users.
In Flink SQL, all the things are STREAM, the differences
are
whether
it
is
bounded or unbounded,
    whether it is insert-only or changelog.


Best,
Jark


On Thu, 22 Oct 2020 at 20:39, Timo Walther <
twal...@apache.org>
wrote:

Hi Shengkai, Hi Jark,

thanks for this great proposal. It is time to finally
connect
the
changelog processor with a compacted Kafka topic.

"The operator will produce INSERT rows, or additionally
generate
UPDATE_BEFORE rows for the previous image, or produce
DELETE
rows
with
all columns filled with values."

Could you elaborate a bit on the implementation details in
the
FLIP?
How
are UPDATE_BEFOREs are generated. How much state is
required to
perform
this operation.

    From a conceptual and semantical point of view, I'm
fine
with
the
proposal. But I would like to share my opinion about how
we
expose
this
feature:

ktable vs kafka-compacted

I'm against having an additional connector like `ktable`
or
`kafka-compacted`. We recently simplified the table
options
to
a
minimum
amount of characters to be as concise as possible in the
DDL.
Therefore,
I would keep the `connector=kafka` and introduce an
additional
option.
Because a user wants to read "from Kafka". And the "how"
should
be
determined in the lower options.

When people read `connector=ktable` they might not know
that
this
is
Kafka. Or they wonder where `kstream` is?

When people read `connector=kafka-compacted` they might
not
know
that
it
has ktable semantics. You don't need to enable log
compaction
in
order
to use a KTable as far as I know. Log compaction and table
semantics
are
orthogonal topics.

In the end we will need 3 types of information when
declaring a
Kafka
connector:

CREATE TABLE ... WITH (
      connector=kafka        -- Some information about the
connector
      end-offset = XXXX      -- Some information about the
boundedness
      model = table/stream   -- Some information about
interpretation
)


We can still apply all the constraints mentioned in the
FLIP.
When
`model` is set to `table`.

What do you think?

Regards,
Timo


On 21.10.20 14:19, Jark Wu wrote:
Hi,

IMO, if we are going to mix them in one connector,
1) either users need to set some options to a specific
value
explicitly,
e.g. "scan.startup.mode=earliest",
"sink.partitioner=hash",
etc..
This makes the connector awkward to use. Users may face
to
fix
options
one
by one according to the exception.
Besides, in the future, it is still possible to use
"sink.partitioner=fixed" (reduce network cost) if users
are
aware
of
the partition routing,
however, it's error-prone to have "fixed" as default for
compacted
mode.

2) or make those options a different default value when
"compacted=true".
This would be more confusing and unpredictable if the
default
value
of
options will change according to other options.
What happens if we have a third mode in the future?

In terms of usage and options, it's very different from
the
original "kafka" connector.
It would be more handy to use and less fallible if
separating
them
into
two
connectors.
In the implementation layer, we can reuse code as much as
possible.

Therefore, I'm still +1 to have a new connector.
The "kafka-compacted" name sounds good to me.

Best,
Jark


On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf <
kna...@apache.org>
wrote:

Hi Kurt, Hi Shengkai,

thanks for answering my questions and the additional
clarifications. I
don't have a strong opinion on whether to extend the
"kafka"
connector
or
to introduce a new connector. So, from my perspective
feel
free
to
go
with
a separate connector. If we do introduce a new
connector I
wouldn't
call it
"ktable" for aforementioned reasons (In addition, we
might
suggest
that
there is also a "kstreams" connector for symmetry
reasons). I
don't
have a
good alternative name, though, maybe "kafka-compacted"
or
"compacted-kafka".

Thanks,

Konstantin


On Wed, Oct 21, 2020 at 4:43 AM Kurt Young <
ykt...@gmail.com

wrote:

Hi all,

I want to describe the discussion process which drove
us
to
have
such
conclusion, this might make some of
the design choices easier to understand and keep
everyone on
the
same
page.

Back to the motivation, what functionality do we want
to
provide
in
the
first place? We got a lot of feedback and
questions from mailing lists that people want to write
Not-Insert-Only
messages into kafka. They might be
intentional or by accident, e.g. wrote an non-windowed
aggregate
query
or
non-windowed left outer join. And
some users from KSQL world also asked about why Flink
didn't
leverage
the
Key concept of every kafka topic
and make kafka as a dynamic changing keyed table.

To work with kafka better, we were thinking to extend
the
functionality
of
the current kafka connector by letting it
accept updates and deletions. But due to the limitation
of
kafka,
the
update has to be "update by key", aka a table
with primary key.

This introduces a couple of conflicts with current
kafka
table's
options:
1. key.fields: as said above, we need the kafka table
to
have
the
primary
key constraint. And users can also configure
key.fields freely, this might cause friction. (Sure we
can
do
some
sanity
check on this but it also creates friction.)
2. sink.partitioner: to make the semantics right, we
need to
make
sure
all
the updates on the same key are written to
the same kafka partition, such we should force to use a
hash
by
key
partition inside such table. Again, this has conflicts
and creates friction with current user options.

The above things are solvable, though not perfect or
most
user
friendly.

Let's take a look at the reading side. The keyed kafka
table
contains
two
kinds of messages: upsert or deletion. What upsert
means is "If the key doesn't exist yet, it's an insert
record.
Otherwise
it's an update record". For the sake of correctness or
simplicity, the Flink SQL engine also needs such
information.
If
we
interpret all messages to "update record", some queries
or
operators may not work properly. It's weird to see an
update
record
but
you
haven't seen the insert record before.

So what Flink should do is after reading out the
records
from
such
table,
it needs to create a state to record which messages
have
been seen and then generate the correct row type
correspondingly.
This
kind
of couples the state and the data of the message
queue, and it also creates conflicts with current kafka
connector.

Think about if users suspend a running job (which
contains
some
reading
state now), and then change the start offset of the
reader.
By changing the reading offset, it actually change the
whole
story
of
"which records should be insert messages and which
records
should be update messages). And it will also make Flink
to
deal
with
another weird situation that it might receive a
deletion
on a non existing message.

We were unsatisfied with all the frictions and
conflicts
it
will
create
if
we enable the "upsert & deletion" support to the
current
kafka
connector. And later we begin to realize that we
shouldn't
treat
it
as
a
normal message queue, but should treat it as a changing
keyed
table. We should be able to always get the whole data
of
such
table
(by
disabling the start offset option) and we can also read
the
changelog out of such table. It's like a HBase table
with
binlog
support
but doesn't have random access capability (which can be
fulfilled
by Flink's state).

So our intention was instead of telling and persuading
users
what
kind
of
options they should or should not use by extending
current kafka connector when enable upsert support, we
are
actually
create
a whole new and different connector that has total
different abstractions in SQL layer, and should be
treated
totally
different with current kafka connector.

Hope this can clarify some of the concerns.

Best,
Kurt


On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang <
fskm...@gmail.com

wrote:

Hi devs,

As many people are still confused about the difference
option
behaviours
between the Kafka connector and KTable connector, Jark
and
I
list
the
differences in the doc[1].

Best,
Shengkai

[1]













https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit

Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二
下午12:05写道:

Hi Konstantin,

Thanks for your reply.

It uses the "kafka" connector and does not specify a
primary
key.
The dimensional table `users` is a ktable connector
and we
can
specify
the
pk on the KTable.

Will it possible to use a "ktable" as a dimensional
table
in
FLIP-132
Yes. We can specify the watermark on the KTable and
it
can
be
used
as a
dimension table in temporal join.

Introduce a new connector vs introduce a new
property
The main reason behind is that the KTable connector
almost
has
no
common
options with the Kafka connector. The options that
can
be
reused
by
KTable
connectors are 'topic',
'properties.bootstrap.servers'
and
'value.fields-include' . We can't set cdc format for
'key.format'
and
'value.format' in KTable connector now, which is
available
in
Kafka
connector. Considering the difference between the
options
we
can
use,
it's
more suitable to introduce an another connector
rather
than
a
property.

We are also fine to use "compacted-kafka" as the name
of
the
new
connector. What do you think?

Best,
Shengkai

Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一
下午10:15写道:

Hi Shengkai,

Thank you for driving this effort. I believe this a
very
important
feature
for many users who use Kafka and Flink SQL
together. A
few
questions
and
thoughts:

* Is your example "Use KTable as a
reference/dimension
table"
correct?
It
uses the "kafka" connector and does not specify a
primary
key.

* Will it be possible to use a "ktable" table
directly
as a
dimensional
table in temporal join (*based on event time*)
(FLIP-132)?
This
is
not
completely clear to me from the FLIP.

* I'd personally prefer not to introduce a new
connector
and
instead
to
extend the Kafka connector. We could add an
additional
property
"compacted"
= "true"|"false". If it is set to "true", we can add
additional
validation
logic (e.g. "scan.startup.mode" can not be set,
primary
key
required,
etc.). If we stick to a separate connector I'd not
call
it
"ktable",
but
rather "compacted-kafka" or similar. KTable seems to
carry
more
implicit
meaning than we want to imply here.

* I agree that this is not a bounded source. If we
want
to
support a
bounded mode, this is an orthogonal concern that
also
applies
to
other
unbounded sources.

Best,

Konstantin

On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <
imj...@gmail.com>
wrote:

Hi Danny,

First of all, we didn't introduce any concepts from
KSQL
(e.g.
Stream
vs
Table notion).
This new connector will produce a changelog stream,
so
it's
still
a
dynamic
table and doesn't conflict with Flink core
concepts.

The "ktable" is just a connector name, we can also
call
it
"compacted-kafka" or something else.
Calling it "ktable" is just because KSQL users can
migrate
to
Flink
SQL
easily.

Regarding to why introducing a new connector vs a
new
property
in
existing
kafka connector:

I think the main reason is that we want to have a
clear
separation
for
such
two use cases, because they are very different.
We also listed reasons in the FLIP, including:

1) It's hard to explain what's the behavior when
users
specify
the
start
offset from a middle position (e.g. how to process
non
exist
delete
events).
        It's dangerous if users do that. So we don't
provide
the
offset
option
in the new connector at the moment.
2) It's a different perspective/abstraction on the
same
kafka
topic
(append
vs. upsert). It would be easier to understand if we
can
separate
them
        instead of mixing them in one connector. The
new
connector
requires
hash sink partitioner, primary key declared,
regular
format.
        If we mix them in one connector, it might be
confusing
how
to
use
the
options correctly.
3) The semantic of the KTable connector is just the
same
as
KTable
in
Kafka
Stream. So it's very handy for Kafka Stream and
KSQL
users.
        We have seen several questions in the
mailing
list
asking
how
to
model
a KTable and how to join a KTable in Flink SQL.

Best,
Jark

On Mon, 19 Oct 2020 at 19:53, Jark Wu <
imj...@gmail.com

wrote:

Hi Jingsong,

As the FLIP describes, "KTable connector produces
a
changelog
stream,
where each data record represents an update or
delete
event.".
Therefore, a ktable source is an unbounded stream
source.
Selecting
a
ktable source is similar to selecting a kafka
source
with
debezium-json
format
that it never ends and the results are
continuously
updated.

It's possible to have a bounded ktable source in
the
future,
for
example,
add an option 'bounded=true' or 'end-offset=xxx'.
In this way, the ktable will produce a bounded
changelog
stream.
So I think this can be a compatible feature in the
future.

I don't think we should associate with ksql
related
concepts.
Actually,
we
didn't introduce any concepts from KSQL (e.g.
Stream vs
Table
notion).
The "ktable" is just a connector name, we can also
call
it
"compacted-kafka" or something else.
Calling it "ktable" is just because KSQL users can
migrate
to
Flink
SQL
easily.

Regarding the "value.fields-include", this is an
option
introduced
in
FLIP-107 for Kafka connector.
I think we should keep the same behavior with the
Kafka
connector.
I'm
not
sure what's the default behavior of KSQL.
But I guess it also stores the keys in value from
this
example
docs
(see
the "users_original" table) [1].

Best,
Jark

[1]:















https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table


On Mon, 19 Oct 2020 at 18:17, Danny Chan <
yuzhao....@gmail.com>
wrote:

The concept seems conflicts with the Flink
abstraction
“dynamic
table”,
in Flink we see both “stream” and “table” as a
dynamic
table,

I think we should make clear first how to express
stream
and
table
specific features on one “dynamic table”,
it is more natural for KSQL because KSQL takes
stream
and
table
as
different abstractions for representing
collections.
In
KSQL,
only
table is
mutable and can have a primary key.

Does this connector belongs to the “table” scope
or
“stream”
scope
?

Some of the concepts (such as the primary key on
stream)
should
be
suitable for all the connectors, not just Kafka,
Shouldn’t
this
be
an
extension of existing Kafka connector instead of
a
totally
new
connector ?
What about the other connectors ?

Because this touches the core abstraction of
Flink, we
better
have
a
top-down overall design, following the KSQL
directly
is
not
the
answer.

P.S. For the source
Shouldn’t this be an extension of existing Kafka
connector
instead
of
a
totally new connector ?

How could we achieve that (e.g. set up the
parallelism
correctly) ?

Best,
Danny Chan
在 2020年10月19日 +0800 PM5:17,Jingsong Li <
jingsongl...@gmail.com
,写道:
Thanks Shengkai for your proposal.

+1 for this feature.

Future Work: Support bounded KTable source

I don't think it should be a future work, I
think
it
is
one
of
the
important concepts of this FLIP. We need to
understand
it
now.

Intuitively, a ktable in my opinion is a bounded
table
rather
than
a
stream, so select should produce a bounded table
by
default.

I think we can list Kafka related knowledge,
because
the
word
`ktable`
is
easy to associate with ksql related concepts.
(If
possible,
it's
better
to
unify with it)

What do you think?

value.fields-include

What about the default behavior of KSQL?

Best,
Jingsong

On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang <
fskm...@gmail.com

wrote:

Hi, devs.

Jark and I want to start a new FLIP to
introduce
the
KTable
connector. The
KTable is a shortcut of "Kafka Table", it also
has
the
same
semantics
with
the KTable notion in Kafka Stream.

FLIP-149:

















https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector

Currently many users have expressed their needs
for
the
upsert
Kafka
by
mail lists and issues. The KTable connector has
several
benefits
for
users:

1. Users are able to interpret a compacted
Kafka
Topic
as
an
upsert
stream
in Apache Flink. And also be able to write a
changelog
stream
to
Kafka
(into a compacted topic).
2. As a part of the real time pipeline, store
join
or
aggregate
result (may
contain updates) into a Kafka topic for further
calculation;
3. The semantic of the KTable connector is just
the
same
as
KTable
in
Kafka
Stream. So it's very handy for Kafka Stream and
KSQL
users.
We
have
seen
several questions in the mailing list asking
how
to
model a
KTable
and how
to join a KTable in Flink SQL.

We hope it can expand the usage of the Flink
with
Kafka.

I'm looking forward to your feedback.

Best,
Shengkai



--
Best, Jingsong Lee





--

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk






--

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk








--

Seth Wiesman | Solutions Architect

+1 314 387 1463

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache
Flink
Conference

Stream Processing | Event Driven | Real Time







--
Best, Jingsong Lee




--
Best, Jingsong Lee





Reply via email to