Hi Kurt, Hi Shengkai,

thanks for answering my questions and the additional clarifications. I
don't have a strong opinion on whether to extend the "kafka" connector or
to introduce a new connector. So, from my perspective feel free to go with
a separate connector. If we do introduce a new connector I wouldn't call it
"ktable" for aforementioned reasons (In addition, we might suggest that
there is also a "kstreams" connector for symmetry reasons). I don't have a
good alternative name, though, maybe "kafka-compacted" or
"compacted-kafka".

Thanks,

Konstantin


On Wed, Oct 21, 2020 at 4:43 AM Kurt Young <ykt...@gmail.com> wrote:

> Hi all,
>
> I want to describe the discussion process which drove us to have such
> conclusion, this might make some of
> the design choices easier to understand and keep everyone on the same page.
>
> Back to the motivation, what functionality do we want to provide in the
> first place? We got a lot of feedback and
> questions from mailing lists that people want to write Not-Insert-Only
> messages into kafka. They might be
> intentional or by accident, e.g. wrote an non-windowed aggregate query or
> non-windowed left outer join. And
> some users from KSQL world also asked about why Flink didn't leverage the
> Key concept of every kafka topic
> and make kafka as a dynamic changing keyed table.
>
> To work with kafka better, we were thinking to extend the functionality of
> the current kafka connector by letting it
> accept updates and deletions. But due to the limitation of kafka, the
> update has to be "update by key", aka a table
> with primary key.
>
> This introduces a couple of conflicts with current kafka table's options:
> 1. key.fields: as said above, we need the kafka table to have the primary
> key constraint. And users can also configure
> key.fields freely, this might cause friction. (Sure we can do some sanity
> check on this but it also creates friction.)
> 2. sink.partitioner: to make the semantics right, we need to make sure all
> the updates on the same key are written to
> the same kafka partition, such we should force to use a hash by key
> partition inside such table. Again, this has conflicts
> and creates friction with current user options.
>
> The above things are solvable, though not perfect or most user friendly.
>
> Let's take a look at the reading side. The keyed kafka table contains two
> kinds of messages: upsert or deletion. What upsert
> means is "If the key doesn't exist yet, it's an insert record. Otherwise
> it's an update record". For the sake of correctness or
> simplicity, the Flink SQL engine also needs such information. If we
> interpret all messages to "update record", some queries or
> operators may not work properly. It's weird to see an update record but you
> haven't seen the insert record before.
>
> So what Flink should do is after reading out the records from such table,
> it needs to create a state to record which messages have
> been seen and then generate the correct row type correspondingly. This kind
> of couples the state and the data of the message
> queue, and it also creates conflicts with current kafka connector.
>
> Think about if users suspend a running job (which contains some reading
> state now), and then change the start offset of the reader.
> By changing the reading offset, it actually change the whole story of
> "which records should be insert messages and which records
> should be update messages). And it will also make Flink to deal with
> another weird situation that it might receive a deletion
> on a non existing message.
>
> We were unsatisfied with all the frictions and conflicts it will create if
> we enable the "upsert & deletion" support to the current kafka
> connector. And later we begin to realize that we shouldn't treat it as a
> normal message queue, but should treat it as a changing keyed
> table. We should be able to always get the whole data of such table (by
> disabling the start offset option) and we can also read the
> changelog out of such table. It's like a HBase table with binlog support
> but doesn't have random access capability (which can be fulfilled
> by Flink's state).
>
> So our intention was instead of telling and persuading users what kind of
> options they should or should not use by extending
> current kafka connector when enable upsert support, we are actually create
> a whole new and different connector that has total
> different abstractions in SQL layer, and should be treated totally
> different with current kafka connector.
>
> Hope this can clarify some of the concerns.
>
> Best,
> Kurt
>
>
> On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang <fskm...@gmail.com> wrote:
>
> > Hi devs,
> >
> > As many people are still confused about the difference option behaviours
> > between the Kafka connector and KTable connector, Jark and I list the
> > differences in the doc[1].
> >
> > Best,
> > Shengkai
> >
> > [1]
> >
> >
> https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit
> >
> > Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二 下午12:05写道:
> >
> > > Hi Konstantin,
> > >
> > > Thanks for your reply.
> > >
> > > > It uses the "kafka" connector and does not specify a primary key.
> > > The dimensional table `users` is a ktable connector and we can specify
> > the
> > > pk on the KTable.
> > >
> > > > Will it possible to use a "ktable" as a dimensional table in FLIP-132
> > > Yes. We can specify the watermark on the KTable and it can be used as a
> > > dimension table in temporal join.
> > >
> > > >Introduce a new connector vs introduce a new property
> > > The main reason behind is that the KTable connector almost has no
> common
> > > options with the Kafka connector. The options that can be reused by
> > KTable
> > > connectors are 'topic', 'properties.bootstrap.servers' and
> > > 'value.fields-include' . We can't set cdc format for 'key.format' and
> > > 'value.format' in KTable connector now, which is  available in Kafka
> > > connector. Considering the difference between the options we can use,
> > it's
> > > more suitable to introduce an another connector rather than a property.
> > >
> > > We are also fine to use "compacted-kafka" as the name of the new
> > > connector. What do you think?
> > >
> > > Best,
> > > Shengkai
> > >
> > > Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一 下午10:15写道:
> > >
> > >> Hi Shengkai,
> > >>
> > >> Thank you for driving this effort. I believe this a very important
> > feature
> > >> for many users who use Kafka and Flink SQL together. A few questions
> and
> > >> thoughts:
> > >>
> > >> * Is your example "Use KTable as a reference/dimension table" correct?
> > It
> > >> uses the "kafka" connector and does not specify a primary key.
> > >>
> > >> * Will it be possible to use a "ktable" table directly as a
> dimensional
> > >> table in temporal join (*based on event time*) (FLIP-132)? This is not
> > >> completely clear to me from the FLIP.
> > >>
> > >> * I'd personally prefer not to introduce a new connector and instead
> to
> > >> extend the Kafka connector. We could add an additional property
> > >> "compacted"
> > >> = "true"|"false". If it is set to "true", we can add additional
> > validation
> > >> logic (e.g. "scan.startup.mode" can not be set, primary key required,
> > >> etc.). If we stick to a separate connector I'd not call it "ktable",
> but
> > >> rather "compacted-kafka" or similar. KTable seems to carry more
> implicit
> > >> meaning than we want to imply here.
> > >>
> > >> * I agree that this is not a bounded source. If we want to support a
> > >> bounded mode, this is an orthogonal concern that also applies to other
> > >> unbounded sources.
> > >>
> > >> Best,
> > >>
> > >> Konstantin
> > >>
> > >> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <imj...@gmail.com> wrote:
> > >>
> > >> > Hi Danny,
> > >> >
> > >> > First of all, we didn't introduce any concepts from KSQL (e.g.
> Stream
> > vs
> > >> > Table notion).
> > >> > This new connector will produce a changelog stream, so it's still a
> > >> dynamic
> > >> > table and doesn't conflict with Flink core concepts.
> > >> >
> > >> > The "ktable" is just a connector name, we can also call it
> > >> > "compacted-kafka" or something else.
> > >> > Calling it "ktable" is just because KSQL users can migrate to Flink
> > SQL
> > >> > easily.
> > >> >
> > >> > Regarding to why introducing a new connector vs a new property in
> > >> existing
> > >> > kafka connector:
> > >> >
> > >> > I think the main reason is that we want to have a clear separation
> for
> > >> such
> > >> > two use cases, because they are very different.
> > >> > We also listed reasons in the FLIP, including:
> > >> >
> > >> > 1) It's hard to explain what's the behavior when users specify the
> > start
> > >> > offset from a middle position (e.g. how to process non exist delete
> > >> > events).
> > >> >     It's dangerous if users do that. So we don't provide the offset
> > >> option
> > >> > in the new connector at the moment.
> > >> > 2) It's a different perspective/abstraction on the same kafka topic
> > >> (append
> > >> > vs. upsert). It would be easier to understand if we can separate
> them
> > >> >     instead of mixing them in one connector. The new connector
> > requires
> > >> > hash sink partitioner, primary key declared, regular format.
> > >> >     If we mix them in one connector, it might be confusing how to
> use
> > >> the
> > >> > options correctly.
> > >> > 3) The semantic of the KTable connector is just the same as KTable
> in
> > >> Kafka
> > >> > Stream. So it's very handy for Kafka Stream and KSQL users.
> > >> >     We have seen several questions in the mailing list asking how to
> > >> model
> > >> > a KTable and how to join a KTable in Flink SQL.
> > >> >
> > >> > Best,
> > >> > Jark
> > >> >
> > >> > On Mon, 19 Oct 2020 at 19:53, Jark Wu <imj...@gmail.com> wrote:
> > >> >
> > >> > > Hi Jingsong,
> > >> > >
> > >> > > As the FLIP describes, "KTable connector produces a changelog
> > stream,
> > >> > > where each data record represents an update or delete event.".
> > >> > > Therefore, a ktable source is an unbounded stream source.
> Selecting
> > a
> > >> > > ktable source is similar to selecting a kafka source with
> > >> debezium-json
> > >> > > format
> > >> > > that it never ends and the results are continuously updated.
> > >> > >
> > >> > > It's possible to have a bounded ktable source in the future, for
> > >> example,
> > >> > > add an option 'bounded=true' or 'end-offset=xxx'.
> > >> > > In this way, the ktable will produce a bounded changelog stream.
> > >> > > So I think this can be a compatible feature in the future.
> > >> > >
> > >> > > I don't think we should associate with ksql related concepts.
> > >> Actually,
> > >> > we
> > >> > > didn't introduce any concepts from KSQL (e.g. Stream vs Table
> > notion).
> > >> > > The "ktable" is just a connector name, we can also call it
> > >> > > "compacted-kafka" or something else.
> > >> > > Calling it "ktable" is just because KSQL users can migrate to
> Flink
> > >> SQL
> > >> > > easily.
> > >> > >
> > >> > > Regarding the "value.fields-include", this is an option introduced
> > in
> > >> > > FLIP-107 for Kafka connector.
> > >> > > I think we should keep the same behavior with the Kafka connector.
> > I'm
> > >> > not
> > >> > > sure what's the default behavior of KSQL.
> > >> > > But I guess it also stores the keys in value from this example
> docs
> > >> (see
> > >> > > the "users_original" table) [1].
> > >> > >
> > >> > > Best,
> > >> > > Jark
> > >> > >
> > >> > > [1]:
> > >> > >
> > >> >
> > >>
> >
> https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table
> > >> > >
> > >> > >
> > >> > > On Mon, 19 Oct 2020 at 18:17, Danny Chan <yuzhao....@gmail.com>
> > >> wrote:
> > >> > >
> > >> > >> The concept seems conflicts with the Flink abstraction “dynamic
> > >> table”,
> > >> > >> in Flink we see both “stream” and “table” as a dynamic table,
> > >> > >>
> > >> > >> I think we should make clear first how to express stream and
> table
> > >> > >> specific features on one “dynamic table”,
> > >> > >> it is more natural for KSQL because KSQL takes stream and table
> as
> > >> > >> different abstractions for representing collections. In KSQL,
> only
> > >> > table is
> > >> > >> mutable and can have a primary key.
> > >> > >>
> > >> > >> Does this connector belongs to the “table” scope or “stream”
> scope
> > ?
> > >> > >>
> > >> > >> Some of the concepts (such as the primary key on stream) should
> be
> > >> > >> suitable for all the connectors, not just Kafka, Shouldn’t this
> be
> > an
> > >> > >> extension of existing Kafka connector instead of a totally new
> > >> > connector ?
> > >> > >> What about the other connectors ?
> > >> > >>
> > >> > >> Because this touches the core abstraction of Flink, we better
> have
> > a
> > >> > >> top-down overall design, following the KSQL directly is not the
> > >> answer.
> > >> > >>
> > >> > >> P.S. For the source
> > >> > >> > Shouldn’t this be an extension of existing Kafka connector
> > instead
> > >> of
> > >> > a
> > >> > >> totally new connector ?
> > >> > >>
> > >> > >> How could we achieve that (e.g. set up the parallelism
> correctly) ?
> > >> > >>
> > >> > >> Best,
> > >> > >> Danny Chan
> > >> > >> 在 2020年10月19日 +0800 PM5:17,Jingsong Li <jingsongl...@gmail.com
> > >,写道:
> > >> > >> > Thanks Shengkai for your proposal.
> > >> > >> >
> > >> > >> > +1 for this feature.
> > >> > >> >
> > >> > >> > > Future Work: Support bounded KTable source
> > >> > >> >
> > >> > >> > I don't think it should be a future work, I think it is one of
> > the
> > >> > >> > important concepts of this FLIP. We need to understand it now.
> > >> > >> >
> > >> > >> > Intuitively, a ktable in my opinion is a bounded table rather
> > than
> > >> a
> > >> > >> > stream, so select should produce a bounded table by default.
> > >> > >> >
> > >> > >> > I think we can list Kafka related knowledge, because the word
> > >> `ktable`
> > >> > >> is
> > >> > >> > easy to associate with ksql related concepts. (If possible,
> it's
> > >> > better
> > >> > >> to
> > >> > >> > unify with it)
> > >> > >> >
> > >> > >> > What do you think?
> > >> > >> >
> > >> > >> > > value.fields-include
> > >> > >> >
> > >> > >> > What about the default behavior of KSQL?
> > >> > >> >
> > >> > >> > Best,
> > >> > >> > Jingsong
> > >> > >> >
> > >> > >> > On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang <
> fskm...@gmail.com
> > >
> > >> > >> wrote:
> > >> > >> >
> > >> > >> > > Hi, devs.
> > >> > >> > >
> > >> > >> > > Jark and I want to start a new FLIP to introduce the KTable
> > >> > >> connector. The
> > >> > >> > > KTable is a shortcut of "Kafka Table", it also has the same
> > >> > semantics
> > >> > >> with
> > >> > >> > > the KTable notion in Kafka Stream.
> > >> > >> > >
> > >> > >> > > FLIP-149:
> > >> > >> > >
> > >> > >> > >
> > >> > >>
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector
> > >> > >> > >
> > >> > >> > > Currently many users have expressed their needs for the
> upsert
> > >> Kafka
> > >> > >> by
> > >> > >> > > mail lists and issues. The KTable connector has several
> > benefits
> > >> for
> > >> > >> users:
> > >> > >> > >
> > >> > >> > > 1. Users are able to interpret a compacted Kafka Topic as an
> > >> upsert
> > >> > >> stream
> > >> > >> > > in Apache Flink. And also be able to write a changelog stream
> > to
> > >> > Kafka
> > >> > >> > > (into a compacted topic).
> > >> > >> > > 2. As a part of the real time pipeline, store join or
> aggregate
> > >> > >> result (may
> > >> > >> > > contain updates) into a Kafka topic for further calculation;
> > >> > >> > > 3. The semantic of the KTable connector is just the same as
> > >> KTable
> > >> > in
> > >> > >> Kafka
> > >> > >> > > Stream. So it's very handy for Kafka Stream and KSQL users.
> We
> > >> have
> > >> > >> seen
> > >> > >> > > several questions in the mailing list asking how to model a
> > >> KTable
> > >> > >> and how
> > >> > >> > > to join a KTable in Flink SQL.
> > >> > >> > >
> > >> > >> > > We hope it can expand the usage of the Flink with Kafka.
> > >> > >> > >
> > >> > >> > > I'm looking forward to your feedback.
> > >> > >> > >
> > >> > >> > > Best,
> > >> > >> > > Shengkai
> > >> > >> > >
> > >> > >> >
> > >> > >> >
> > >> > >> > --
> > >> > >> > Best, Jingsong Lee
> > >> > >>
> > >> > >
> > >> >
> > >>
> > >>
> > >> --
> > >>
> > >> Konstantin Knauf
> > >>
> > >> https://twitter.com/snntrable
> > >>
> > >> https://github.com/knaufk
> > >>
> > >
> >
>


-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk

Reply via email to