Hi,

IMO, if we are going to mix them in one connector,
1) either users need to set some options to a specific value explicitly,
e.g. "scan.startup.mode=earliest", "sink.partitioner=hash", etc..
This makes the connector awkward to use. Users may face to fix options one
by one according to the exception.
Besides, in the future, it is still possible to use
"sink.partitioner=fixed" (reduce network cost) if users are aware of
the partition routing,
however, it's error-prone to have "fixed" as default for compacted mode.

2) or make those options a different default value when "compacted=true".
This would be more confusing and unpredictable if the default value of
options will change according to other options.
What happens if we have a third mode in the future?

In terms of usage and options, it's very different from the
original "kafka" connector.
It would be more handy to use and less fallible if separating them into two
connectors.
In the implementation layer, we can reuse code as much as possible.

Therefore, I'm still +1 to have a new connector.
The "kafka-compacted" name sounds good to me.

Best,
Jark


On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf <kna...@apache.org> wrote:

> Hi Kurt, Hi Shengkai,
>
> thanks for answering my questions and the additional clarifications. I
> don't have a strong opinion on whether to extend the "kafka" connector or
> to introduce a new connector. So, from my perspective feel free to go with
> a separate connector. If we do introduce a new connector I wouldn't call it
> "ktable" for aforementioned reasons (In addition, we might suggest that
> there is also a "kstreams" connector for symmetry reasons). I don't have a
> good alternative name, though, maybe "kafka-compacted" or
> "compacted-kafka".
>
> Thanks,
>
> Konstantin
>
>
> On Wed, Oct 21, 2020 at 4:43 AM Kurt Young <ykt...@gmail.com> wrote:
>
> > Hi all,
> >
> > I want to describe the discussion process which drove us to have such
> > conclusion, this might make some of
> > the design choices easier to understand and keep everyone on the same
> page.
> >
> > Back to the motivation, what functionality do we want to provide in the
> > first place? We got a lot of feedback and
> > questions from mailing lists that people want to write Not-Insert-Only
> > messages into kafka. They might be
> > intentional or by accident, e.g. wrote an non-windowed aggregate query or
> > non-windowed left outer join. And
> > some users from KSQL world also asked about why Flink didn't leverage the
> > Key concept of every kafka topic
> > and make kafka as a dynamic changing keyed table.
> >
> > To work with kafka better, we were thinking to extend the functionality
> of
> > the current kafka connector by letting it
> > accept updates and deletions. But due to the limitation of kafka, the
> > update has to be "update by key", aka a table
> > with primary key.
> >
> > This introduces a couple of conflicts with current kafka table's options:
> > 1. key.fields: as said above, we need the kafka table to have the primary
> > key constraint. And users can also configure
> > key.fields freely, this might cause friction. (Sure we can do some sanity
> > check on this but it also creates friction.)
> > 2. sink.partitioner: to make the semantics right, we need to make sure
> all
> > the updates on the same key are written to
> > the same kafka partition, such we should force to use a hash by key
> > partition inside such table. Again, this has conflicts
> > and creates friction with current user options.
> >
> > The above things are solvable, though not perfect or most user friendly.
> >
> > Let's take a look at the reading side. The keyed kafka table contains two
> > kinds of messages: upsert or deletion. What upsert
> > means is "If the key doesn't exist yet, it's an insert record. Otherwise
> > it's an update record". For the sake of correctness or
> > simplicity, the Flink SQL engine also needs such information. If we
> > interpret all messages to "update record", some queries or
> > operators may not work properly. It's weird to see an update record but
> you
> > haven't seen the insert record before.
> >
> > So what Flink should do is after reading out the records from such table,
> > it needs to create a state to record which messages have
> > been seen and then generate the correct row type correspondingly. This
> kind
> > of couples the state and the data of the message
> > queue, and it also creates conflicts with current kafka connector.
> >
> > Think about if users suspend a running job (which contains some reading
> > state now), and then change the start offset of the reader.
> > By changing the reading offset, it actually change the whole story of
> > "which records should be insert messages and which records
> > should be update messages). And it will also make Flink to deal with
> > another weird situation that it might receive a deletion
> > on a non existing message.
> >
> > We were unsatisfied with all the frictions and conflicts it will create
> if
> > we enable the "upsert & deletion" support to the current kafka
> > connector. And later we begin to realize that we shouldn't treat it as a
> > normal message queue, but should treat it as a changing keyed
> > table. We should be able to always get the whole data of such table (by
> > disabling the start offset option) and we can also read the
> > changelog out of such table. It's like a HBase table with binlog support
> > but doesn't have random access capability (which can be fulfilled
> > by Flink's state).
> >
> > So our intention was instead of telling and persuading users what kind of
> > options they should or should not use by extending
> > current kafka connector when enable upsert support, we are actually
> create
> > a whole new and different connector that has total
> > different abstractions in SQL layer, and should be treated totally
> > different with current kafka connector.
> >
> > Hope this can clarify some of the concerns.
> >
> > Best,
> > Kurt
> >
> >
> > On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang <fskm...@gmail.com> wrote:
> >
> > > Hi devs,
> > >
> > > As many people are still confused about the difference option
> behaviours
> > > between the Kafka connector and KTable connector, Jark and I list the
> > > differences in the doc[1].
> > >
> > > Best,
> > > Shengkai
> > >
> > > [1]
> > >
> > >
> >
> https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit
> > >
> > > Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二 下午12:05写道:
> > >
> > > > Hi Konstantin,
> > > >
> > > > Thanks for your reply.
> > > >
> > > > > It uses the "kafka" connector and does not specify a primary key.
> > > > The dimensional table `users` is a ktable connector and we can
> specify
> > > the
> > > > pk on the KTable.
> > > >
> > > > > Will it possible to use a "ktable" as a dimensional table in
> FLIP-132
> > > > Yes. We can specify the watermark on the KTable and it can be used
> as a
> > > > dimension table in temporal join.
> > > >
> > > > >Introduce a new connector vs introduce a new property
> > > > The main reason behind is that the KTable connector almost has no
> > common
> > > > options with the Kafka connector. The options that can be reused by
> > > KTable
> > > > connectors are 'topic', 'properties.bootstrap.servers' and
> > > > 'value.fields-include' . We can't set cdc format for 'key.format' and
> > > > 'value.format' in KTable connector now, which is  available in Kafka
> > > > connector. Considering the difference between the options we can use,
> > > it's
> > > > more suitable to introduce an another connector rather than a
> property.
> > > >
> > > > We are also fine to use "compacted-kafka" as the name of the new
> > > > connector. What do you think?
> > > >
> > > > Best,
> > > > Shengkai
> > > >
> > > > Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一 下午10:15写道:
> > > >
> > > >> Hi Shengkai,
> > > >>
> > > >> Thank you for driving this effort. I believe this a very important
> > > feature
> > > >> for many users who use Kafka and Flink SQL together. A few questions
> > and
> > > >> thoughts:
> > > >>
> > > >> * Is your example "Use KTable as a reference/dimension table"
> correct?
> > > It
> > > >> uses the "kafka" connector and does not specify a primary key.
> > > >>
> > > >> * Will it be possible to use a "ktable" table directly as a
> > dimensional
> > > >> table in temporal join (*based on event time*) (FLIP-132)? This is
> not
> > > >> completely clear to me from the FLIP.
> > > >>
> > > >> * I'd personally prefer not to introduce a new connector and instead
> > to
> > > >> extend the Kafka connector. We could add an additional property
> > > >> "compacted"
> > > >> = "true"|"false". If it is set to "true", we can add additional
> > > validation
> > > >> logic (e.g. "scan.startup.mode" can not be set, primary key
> required,
> > > >> etc.). If we stick to a separate connector I'd not call it "ktable",
> > but
> > > >> rather "compacted-kafka" or similar. KTable seems to carry more
> > implicit
> > > >> meaning than we want to imply here.
> > > >>
> > > >> * I agree that this is not a bounded source. If we want to support a
> > > >> bounded mode, this is an orthogonal concern that also applies to
> other
> > > >> unbounded sources.
> > > >>
> > > >> Best,
> > > >>
> > > >> Konstantin
> > > >>
> > > >> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <imj...@gmail.com> wrote:
> > > >>
> > > >> > Hi Danny,
> > > >> >
> > > >> > First of all, we didn't introduce any concepts from KSQL (e.g.
> > Stream
> > > vs
> > > >> > Table notion).
> > > >> > This new connector will produce a changelog stream, so it's still
> a
> > > >> dynamic
> > > >> > table and doesn't conflict with Flink core concepts.
> > > >> >
> > > >> > The "ktable" is just a connector name, we can also call it
> > > >> > "compacted-kafka" or something else.
> > > >> > Calling it "ktable" is just because KSQL users can migrate to
> Flink
> > > SQL
> > > >> > easily.
> > > >> >
> > > >> > Regarding to why introducing a new connector vs a new property in
> > > >> existing
> > > >> > kafka connector:
> > > >> >
> > > >> > I think the main reason is that we want to have a clear separation
> > for
> > > >> such
> > > >> > two use cases, because they are very different.
> > > >> > We also listed reasons in the FLIP, including:
> > > >> >
> > > >> > 1) It's hard to explain what's the behavior when users specify the
> > > start
> > > >> > offset from a middle position (e.g. how to process non exist
> delete
> > > >> > events).
> > > >> >     It's dangerous if users do that. So we don't provide the
> offset
> > > >> option
> > > >> > in the new connector at the moment.
> > > >> > 2) It's a different perspective/abstraction on the same kafka
> topic
> > > >> (append
> > > >> > vs. upsert). It would be easier to understand if we can separate
> > them
> > > >> >     instead of mixing them in one connector. The new connector
> > > requires
> > > >> > hash sink partitioner, primary key declared, regular format.
> > > >> >     If we mix them in one connector, it might be confusing how to
> > use
> > > >> the
> > > >> > options correctly.
> > > >> > 3) The semantic of the KTable connector is just the same as KTable
> > in
> > > >> Kafka
> > > >> > Stream. So it's very handy for Kafka Stream and KSQL users.
> > > >> >     We have seen several questions in the mailing list asking how
> to
> > > >> model
> > > >> > a KTable and how to join a KTable in Flink SQL.
> > > >> >
> > > >> > Best,
> > > >> > Jark
> > > >> >
> > > >> > On Mon, 19 Oct 2020 at 19:53, Jark Wu <imj...@gmail.com> wrote:
> > > >> >
> > > >> > > Hi Jingsong,
> > > >> > >
> > > >> > > As the FLIP describes, "KTable connector produces a changelog
> > > stream,
> > > >> > > where each data record represents an update or delete event.".
> > > >> > > Therefore, a ktable source is an unbounded stream source.
> > Selecting
> > > a
> > > >> > > ktable source is similar to selecting a kafka source with
> > > >> debezium-json
> > > >> > > format
> > > >> > > that it never ends and the results are continuously updated.
> > > >> > >
> > > >> > > It's possible to have a bounded ktable source in the future, for
> > > >> example,
> > > >> > > add an option 'bounded=true' or 'end-offset=xxx'.
> > > >> > > In this way, the ktable will produce a bounded changelog stream.
> > > >> > > So I think this can be a compatible feature in the future.
> > > >> > >
> > > >> > > I don't think we should associate with ksql related concepts.
> > > >> Actually,
> > > >> > we
> > > >> > > didn't introduce any concepts from KSQL (e.g. Stream vs Table
> > > notion).
> > > >> > > The "ktable" is just a connector name, we can also call it
> > > >> > > "compacted-kafka" or something else.
> > > >> > > Calling it "ktable" is just because KSQL users can migrate to
> > Flink
> > > >> SQL
> > > >> > > easily.
> > > >> > >
> > > >> > > Regarding the "value.fields-include", this is an option
> introduced
> > > in
> > > >> > > FLIP-107 for Kafka connector.
> > > >> > > I think we should keep the same behavior with the Kafka
> connector.
> > > I'm
> > > >> > not
> > > >> > > sure what's the default behavior of KSQL.
> > > >> > > But I guess it also stores the keys in value from this example
> > docs
> > > >> (see
> > > >> > > the "users_original" table) [1].
> > > >> > >
> > > >> > > Best,
> > > >> > > Jark
> > > >> > >
> > > >> > > [1]:
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table
> > > >> > >
> > > >> > >
> > > >> > > On Mon, 19 Oct 2020 at 18:17, Danny Chan <yuzhao....@gmail.com>
> > > >> wrote:
> > > >> > >
> > > >> > >> The concept seems conflicts with the Flink abstraction “dynamic
> > > >> table”,
> > > >> > >> in Flink we see both “stream” and “table” as a dynamic table,
> > > >> > >>
> > > >> > >> I think we should make clear first how to express stream and
> > table
> > > >> > >> specific features on one “dynamic table”,
> > > >> > >> it is more natural for KSQL because KSQL takes stream and table
> > as
> > > >> > >> different abstractions for representing collections. In KSQL,
> > only
> > > >> > table is
> > > >> > >> mutable and can have a primary key.
> > > >> > >>
> > > >> > >> Does this connector belongs to the “table” scope or “stream”
> > scope
> > > ?
> > > >> > >>
> > > >> > >> Some of the concepts (such as the primary key on stream) should
> > be
> > > >> > >> suitable for all the connectors, not just Kafka, Shouldn’t this
> > be
> > > an
> > > >> > >> extension of existing Kafka connector instead of a totally new
> > > >> > connector ?
> > > >> > >> What about the other connectors ?
> > > >> > >>
> > > >> > >> Because this touches the core abstraction of Flink, we better
> > have
> > > a
> > > >> > >> top-down overall design, following the KSQL directly is not the
> > > >> answer.
> > > >> > >>
> > > >> > >> P.S. For the source
> > > >> > >> > Shouldn’t this be an extension of existing Kafka connector
> > > instead
> > > >> of
> > > >> > a
> > > >> > >> totally new connector ?
> > > >> > >>
> > > >> > >> How could we achieve that (e.g. set up the parallelism
> > correctly) ?
> > > >> > >>
> > > >> > >> Best,
> > > >> > >> Danny Chan
> > > >> > >> 在 2020年10月19日 +0800 PM5:17,Jingsong Li <jingsongl...@gmail.com
> > > >,写道:
> > > >> > >> > Thanks Shengkai for your proposal.
> > > >> > >> >
> > > >> > >> > +1 for this feature.
> > > >> > >> >
> > > >> > >> > > Future Work: Support bounded KTable source
> > > >> > >> >
> > > >> > >> > I don't think it should be a future work, I think it is one
> of
> > > the
> > > >> > >> > important concepts of this FLIP. We need to understand it
> now.
> > > >> > >> >
> > > >> > >> > Intuitively, a ktable in my opinion is a bounded table rather
> > > than
> > > >> a
> > > >> > >> > stream, so select should produce a bounded table by default.
> > > >> > >> >
> > > >> > >> > I think we can list Kafka related knowledge, because the word
> > > >> `ktable`
> > > >> > >> is
> > > >> > >> > easy to associate with ksql related concepts. (If possible,
> > it's
> > > >> > better
> > > >> > >> to
> > > >> > >> > unify with it)
> > > >> > >> >
> > > >> > >> > What do you think?
> > > >> > >> >
> > > >> > >> > > value.fields-include
> > > >> > >> >
> > > >> > >> > What about the default behavior of KSQL?
> > > >> > >> >
> > > >> > >> > Best,
> > > >> > >> > Jingsong
> > > >> > >> >
> > > >> > >> > On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang <
> > fskm...@gmail.com
> > > >
> > > >> > >> wrote:
> > > >> > >> >
> > > >> > >> > > Hi, devs.
> > > >> > >> > >
> > > >> > >> > > Jark and I want to start a new FLIP to introduce the KTable
> > > >> > >> connector. The
> > > >> > >> > > KTable is a shortcut of "Kafka Table", it also has the same
> > > >> > semantics
> > > >> > >> with
> > > >> > >> > > the KTable notion in Kafka Stream.
> > > >> > >> > >
> > > >> > >> > > FLIP-149:
> > > >> > >> > >
> > > >> > >> > >
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector
> > > >> > >> > >
> > > >> > >> > > Currently many users have expressed their needs for the
> > upsert
> > > >> Kafka
> > > >> > >> by
> > > >> > >> > > mail lists and issues. The KTable connector has several
> > > benefits
> > > >> for
> > > >> > >> users:
> > > >> > >> > >
> > > >> > >> > > 1. Users are able to interpret a compacted Kafka Topic as
> an
> > > >> upsert
> > > >> > >> stream
> > > >> > >> > > in Apache Flink. And also be able to write a changelog
> stream
> > > to
> > > >> > Kafka
> > > >> > >> > > (into a compacted topic).
> > > >> > >> > > 2. As a part of the real time pipeline, store join or
> > aggregate
> > > >> > >> result (may
> > > >> > >> > > contain updates) into a Kafka topic for further
> calculation;
> > > >> > >> > > 3. The semantic of the KTable connector is just the same as
> > > >> KTable
> > > >> > in
> > > >> > >> Kafka
> > > >> > >> > > Stream. So it's very handy for Kafka Stream and KSQL users.
> > We
> > > >> have
> > > >> > >> seen
> > > >> > >> > > several questions in the mailing list asking how to model a
> > > >> KTable
> > > >> > >> and how
> > > >> > >> > > to join a KTable in Flink SQL.
> > > >> > >> > >
> > > >> > >> > > We hope it can expand the usage of the Flink with Kafka.
> > > >> > >> > >
> > > >> > >> > > I'm looking forward to your feedback.
> > > >> > >> > >
> > > >> > >> > > Best,
> > > >> > >> > > Shengkai
> > > >> > >> > >
> > > >> > >> >
> > > >> > >> >
> > > >> > >> > --
> > > >> > >> > Best, Jingsong Lee
> > > >> > >>
> > > >> > >
> > > >> >
> > > >>
> > > >>
> > > >> --
> > > >>
> > > >> Konstantin Knauf
> > > >>
> > > >> https://twitter.com/snntrable
> > > >>
> > > >> https://github.com/knaufk
> > > >>
> > > >
> > >
> >
>
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>

Reply via email to