Hi Timo,
Thanks for your opinions.
1) Implementation
We will have an stateful operator to generate INSERT and UPDATE_BEFORE.
This operator is keyby-ed (primary key as the shuffle key) after the source
operator.
The implementation of this operator is very similar to the existing
`DeduplicateKeepLastRowFunction`.
The operator will register a value state using the primary key fields as
keys.
When the value state is empty under current key, we will emit INSERT for
the input row.
When the value state is not empty under current key, we will emit
UPDATE_BEFORE using the row in state,
and emit UPDATE_AFTER using the input row.
When the input row is DELETE, we will clear state and emit DELETE row.
2) new option vs new connector
We recently simplified the table options to a minimum amount of
characters to be as concise as possible in the DDL.
I think this is the reason why we want to introduce a new connector,
because we can simplify the options in DDL.
For example, if using a new option, the DDL may look like this:
CREATE TABLE users (
user_id BIGINT,
user_name STRING,
user_level STRING,
region STRING,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'model' = 'table',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = '...',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest',
'key.format' = 'csv',
'key.fields' = 'user_id',
'value.format' = 'avro',
'sink.partitioner' = 'hash'
);
If using a new connector, we can have a different default value for the
options and remove unnecessary options,
the DDL can look like this which is much more concise:
CREATE TABLE pageviews_per_region (
user_id BIGINT,
user_name STRING,
user_level STRING,
region STRING,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'kafka-compacted',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = '...',
'key.format' = 'csv',
'value.format' = 'avro'
);
When people read `connector=kafka-compacted` they might not know that it
has ktable semantics. You don't need to enable log compaction in order
to use a KTable as far as I know.
We don't need to let users know it has ktable semantics, as Konstantin
mentioned this may carry more implicit
meaning than we want to imply here. I agree users don't need to enable log
compaction, but from the production perspective,
log compaction should always be enabled if it is used in this purpose.
Calling it "kafka-compacted" can even remind users to enable log compaction.
I don't agree to introduce "model = table/stream" option, or
"connector=kafka-table",
because this means we are introducing Table vs Stream concept from KSQL.
However, we don't have such top-level concept in Flink SQL now, this will
further confuse users.
In Flink SQL, all the things are STREAM, the differences are whether it is
bounded or unbounded,
whether it is insert-only or changelog.
Best,
Jark
On Thu, 22 Oct 2020 at 20:39, Timo Walther <twal...@apache.org> wrote:
Hi Shengkai, Hi Jark,
thanks for this great proposal. It is time to finally connect the
changelog processor with a compacted Kafka topic.
"The operator will produce INSERT rows, or additionally generate
UPDATE_BEFORE rows for the previous image, or produce DELETE rows with
all columns filled with values."
Could you elaborate a bit on the implementation details in the FLIP? How
are UPDATE_BEFOREs are generated. How much state is required to perform
this operation.
From a conceptual and semantical point of view, I'm fine with the
proposal. But I would like to share my opinion about how we expose this
feature:
ktable vs kafka-compacted
I'm against having an additional connector like `ktable` or
`kafka-compacted`. We recently simplified the table options to a minimum
amount of characters to be as concise as possible in the DDL. Therefore,
I would keep the `connector=kafka` and introduce an additional option.
Because a user wants to read "from Kafka". And the "how" should be
determined in the lower options.
When people read `connector=ktable` they might not know that this is
Kafka. Or they wonder where `kstream` is?
When people read `connector=kafka-compacted` they might not know that it
has ktable semantics. You don't need to enable log compaction in order
to use a KTable as far as I know. Log compaction and table semantics are
orthogonal topics.
In the end we will need 3 types of information when declaring a Kafka
connector:
CREATE TABLE ... WITH (
connector=kafka -- Some information about the connector
end-offset = XXXX -- Some information about the boundedness
model = table/stream -- Some information about interpretation
)
We can still apply all the constraints mentioned in the FLIP. When
`model` is set to `table`.
What do you think?
Regards,
Timo
On 21.10.20 14:19, Jark Wu wrote:
Hi,
IMO, if we are going to mix them in one connector,
1) either users need to set some options to a specific value explicitly,
e.g. "scan.startup.mode=earliest", "sink.partitioner=hash", etc..
This makes the connector awkward to use. Users may face to fix options
one
by one according to the exception.
Besides, in the future, it is still possible to use
"sink.partitioner=fixed" (reduce network cost) if users are aware of
the partition routing,
however, it's error-prone to have "fixed" as default for compacted mode.
2) or make those options a different default value when "compacted=true".
This would be more confusing and unpredictable if the default value of
options will change according to other options.
What happens if we have a third mode in the future?
In terms of usage and options, it's very different from the
original "kafka" connector.
It would be more handy to use and less fallible if separating them into
two
connectors.
In the implementation layer, we can reuse code as much as possible.
Therefore, I'm still +1 to have a new connector.
The "kafka-compacted" name sounds good to me.
Best,
Jark
On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf <kna...@apache.org>
wrote:
Hi Kurt, Hi Shengkai,
thanks for answering my questions and the additional clarifications. I
don't have a strong opinion on whether to extend the "kafka" connector
or
to introduce a new connector. So, from my perspective feel free to go
with
a separate connector. If we do introduce a new connector I wouldn't
call it
"ktable" for aforementioned reasons (In addition, we might suggest that
there is also a "kstreams" connector for symmetry reasons). I don't
have a
good alternative name, though, maybe "kafka-compacted" or
"compacted-kafka".
Thanks,
Konstantin
On Wed, Oct 21, 2020 at 4:43 AM Kurt Young <ykt...@gmail.com> wrote:
Hi all,
I want to describe the discussion process which drove us to have such
conclusion, this might make some of
the design choices easier to understand and keep everyone on the same
page.
Back to the motivation, what functionality do we want to provide in the
first place? We got a lot of feedback and
questions from mailing lists that people want to write Not-Insert-Only
messages into kafka. They might be
intentional or by accident, e.g. wrote an non-windowed aggregate query
or
non-windowed left outer join. And
some users from KSQL world also asked about why Flink didn't leverage
the
Key concept of every kafka topic
and make kafka as a dynamic changing keyed table.
To work with kafka better, we were thinking to extend the functionality
of
the current kafka connector by letting it
accept updates and deletions. But due to the limitation of kafka, the
update has to be "update by key", aka a table
with primary key.
This introduces a couple of conflicts with current kafka table's
options:
1. key.fields: as said above, we need the kafka table to have the
primary
key constraint. And users can also configure
key.fields freely, this might cause friction. (Sure we can do some
sanity
check on this but it also creates friction.)
2. sink.partitioner: to make the semantics right, we need to make sure
all
the updates on the same key are written to
the same kafka partition, such we should force to use a hash by key
partition inside such table. Again, this has conflicts
and creates friction with current user options.
The above things are solvable, though not perfect or most user
friendly.
Let's take a look at the reading side. The keyed kafka table contains
two
kinds of messages: upsert or deletion. What upsert
means is "If the key doesn't exist yet, it's an insert record.
Otherwise
it's an update record". For the sake of correctness or
simplicity, the Flink SQL engine also needs such information. If we
interpret all messages to "update record", some queries or
operators may not work properly. It's weird to see an update record but
you
haven't seen the insert record before.
So what Flink should do is after reading out the records from such
table,
it needs to create a state to record which messages have
been seen and then generate the correct row type correspondingly. This
kind
of couples the state and the data of the message
queue, and it also creates conflicts with current kafka connector.
Think about if users suspend a running job (which contains some reading
state now), and then change the start offset of the reader.
By changing the reading offset, it actually change the whole story of
"which records should be insert messages and which records
should be update messages). And it will also make Flink to deal with
another weird situation that it might receive a deletion
on a non existing message.
We were unsatisfied with all the frictions and conflicts it will create
if
we enable the "upsert & deletion" support to the current kafka
connector. And later we begin to realize that we shouldn't treat it as
a
normal message queue, but should treat it as a changing keyed
table. We should be able to always get the whole data of such table (by
disabling the start offset option) and we can also read the
changelog out of such table. It's like a HBase table with binlog
support
but doesn't have random access capability (which can be fulfilled
by Flink's state).
So our intention was instead of telling and persuading users what kind
of
options they should or should not use by extending
current kafka connector when enable upsert support, we are actually
create
a whole new and different connector that has total
different abstractions in SQL layer, and should be treated totally
different with current kafka connector.
Hope this can clarify some of the concerns.
Best,
Kurt
On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang <fskm...@gmail.com>
wrote:
Hi devs,
As many people are still confused about the difference option
behaviours
between the Kafka connector and KTable connector, Jark and I list the
differences in the doc[1].
Best,
Shengkai
[1]
https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit
Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二 下午12:05写道:
Hi Konstantin,
Thanks for your reply.
It uses the "kafka" connector and does not specify a primary key.
The dimensional table `users` is a ktable connector and we can
specify
the
pk on the KTable.
Will it possible to use a "ktable" as a dimensional table in
FLIP-132
Yes. We can specify the watermark on the KTable and it can be used
as a
dimension table in temporal join.
Introduce a new connector vs introduce a new property
The main reason behind is that the KTable connector almost has no
common
options with the Kafka connector. The options that can be reused by
KTable
connectors are 'topic', 'properties.bootstrap.servers' and
'value.fields-include' . We can't set cdc format for 'key.format' and
'value.format' in KTable connector now, which is available in Kafka
connector. Considering the difference between the options we can use,
it's
more suitable to introduce an another connector rather than a
property.
We are also fine to use "compacted-kafka" as the name of the new
connector. What do you think?
Best,
Shengkai
Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一 下午10:15写道:
Hi Shengkai,
Thank you for driving this effort. I believe this a very important
feature
for many users who use Kafka and Flink SQL together. A few questions
and
thoughts:
* Is your example "Use KTable as a reference/dimension table"
correct?
It
uses the "kafka" connector and does not specify a primary key.
* Will it be possible to use a "ktable" table directly as a
dimensional
table in temporal join (*based on event time*) (FLIP-132)? This is
not
completely clear to me from the FLIP.
* I'd personally prefer not to introduce a new connector and instead
to
extend the Kafka connector. We could add an additional property
"compacted"
= "true"|"false". If it is set to "true", we can add additional
validation
logic (e.g. "scan.startup.mode" can not be set, primary key
required,
etc.). If we stick to a separate connector I'd not call it "ktable",
but
rather "compacted-kafka" or similar. KTable seems to carry more
implicit
meaning than we want to imply here.
* I agree that this is not a bounded source. If we want to support a
bounded mode, this is an orthogonal concern that also applies to
other
unbounded sources.
Best,
Konstantin
On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <imj...@gmail.com> wrote:
Hi Danny,
First of all, we didn't introduce any concepts from KSQL (e.g.
Stream
vs
Table notion).
This new connector will produce a changelog stream, so it's still
a
dynamic
table and doesn't conflict with Flink core concepts.
The "ktable" is just a connector name, we can also call it
"compacted-kafka" or something else.
Calling it "ktable" is just because KSQL users can migrate to
Flink
SQL
easily.
Regarding to why introducing a new connector vs a new property in
existing
kafka connector:
I think the main reason is that we want to have a clear separation
for
such
two use cases, because they are very different.
We also listed reasons in the FLIP, including:
1) It's hard to explain what's the behavior when users specify the
start
offset from a middle position (e.g. how to process non exist
delete
events).
It's dangerous if users do that. So we don't provide the
offset
option
in the new connector at the moment.
2) It's a different perspective/abstraction on the same kafka
topic
(append
vs. upsert). It would be easier to understand if we can separate
them
instead of mixing them in one connector. The new connector
requires
hash sink partitioner, primary key declared, regular format.
If we mix them in one connector, it might be confusing how to
use
the
options correctly.
3) The semantic of the KTable connector is just the same as KTable
in
Kafka
Stream. So it's very handy for Kafka Stream and KSQL users.
We have seen several questions in the mailing list asking how
to
model
a KTable and how to join a KTable in Flink SQL.
Best,
Jark
On Mon, 19 Oct 2020 at 19:53, Jark Wu <imj...@gmail.com> wrote:
Hi Jingsong,
As the FLIP describes, "KTable connector produces a changelog
stream,
where each data record represents an update or delete event.".
Therefore, a ktable source is an unbounded stream source.
Selecting
a
ktable source is similar to selecting a kafka source with
debezium-json
format
that it never ends and the results are continuously updated.
It's possible to have a bounded ktable source in the future, for
example,
add an option 'bounded=true' or 'end-offset=xxx'.
In this way, the ktable will produce a bounded changelog stream.
So I think this can be a compatible feature in the future.
I don't think we should associate with ksql related concepts.
Actually,
we
didn't introduce any concepts from KSQL (e.g. Stream vs Table
notion).
The "ktable" is just a connector name, we can also call it
"compacted-kafka" or something else.
Calling it "ktable" is just because KSQL users can migrate to
Flink
SQL
easily.
Regarding the "value.fields-include", this is an option
introduced
in
FLIP-107 for Kafka connector.
I think we should keep the same behavior with the Kafka
connector.
I'm
not
sure what's the default behavior of KSQL.
But I guess it also stores the keys in value from this example
docs
(see
the "users_original" table) [1].
Best,
Jark
[1]:
https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table
On Mon, 19 Oct 2020 at 18:17, Danny Chan <yuzhao....@gmail.com>
wrote:
The concept seems conflicts with the Flink abstraction “dynamic
table”,
in Flink we see both “stream” and “table” as a dynamic table,
I think we should make clear first how to express stream and
table
specific features on one “dynamic table”,
it is more natural for KSQL because KSQL takes stream and table
as
different abstractions for representing collections. In KSQL,
only
table is
mutable and can have a primary key.
Does this connector belongs to the “table” scope or “stream”
scope
?
Some of the concepts (such as the primary key on stream) should
be
suitable for all the connectors, not just Kafka, Shouldn’t this
be
an
extension of existing Kafka connector instead of a totally new
connector ?
What about the other connectors ?
Because this touches the core abstraction of Flink, we better
have
a
top-down overall design, following the KSQL directly is not the
answer.
P.S. For the source
Shouldn’t this be an extension of existing Kafka connector
instead
of
a
totally new connector ?
How could we achieve that (e.g. set up the parallelism
correctly) ?
Best,
Danny Chan
在 2020年10月19日 +0800 PM5:17,Jingsong Li <jingsongl...@gmail.com
,写道:
Thanks Shengkai for your proposal.
+1 for this feature.
Future Work: Support bounded KTable source
I don't think it should be a future work, I think it is one
of
the
important concepts of this FLIP. We need to understand it
now.
Intuitively, a ktable in my opinion is a bounded table rather
than
a
stream, so select should produce a bounded table by default.
I think we can list Kafka related knowledge, because the word
`ktable`
is
easy to associate with ksql related concepts. (If possible,
it's
better
to
unify with it)
What do you think?
value.fields-include
What about the default behavior of KSQL?
Best,
Jingsong
On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang <
fskm...@gmail.com
wrote:
Hi, devs.
Jark and I want to start a new FLIP to introduce the KTable
connector. The
KTable is a shortcut of "Kafka Table", it also has the same
semantics
with
the KTable notion in Kafka Stream.
FLIP-149:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector
Currently many users have expressed their needs for the
upsert
Kafka
by
mail lists and issues. The KTable connector has several
benefits
for
users:
1. Users are able to interpret a compacted Kafka Topic as
an
upsert
stream
in Apache Flink. And also be able to write a changelog
stream
to
Kafka
(into a compacted topic).
2. As a part of the real time pipeline, store join or
aggregate
result (may
contain updates) into a Kafka topic for further
calculation;
3. The semantic of the KTable connector is just the same as
KTable
in
Kafka
Stream. So it's very handy for Kafka Stream and KSQL users.
We
have
seen
several questions in the mailing list asking how to model a
KTable
and how
to join a KTable in Flink SQL.
We hope it can expand the usage of the Flink with Kafka.
I'm looking forward to your feedback.
Best,
Shengkai
--
Best, Jingsong Lee
--
Konstantin Knauf
https://twitter.com/snntrable
https://github.com/knaufk
--
Konstantin Knauf
https://twitter.com/snntrable
https://github.com/knaufk