Hi devs,

As many people are still confused about the difference option behaviours
between the Kafka connector and KTable connector, Jark and I list the
differences in the doc[1].

Best,
Shengkai

[1]
https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit

Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二 下午12:05写道:

> Hi Konstantin,
>
> Thanks for your reply.
>
> > It uses the "kafka" connector and does not specify a primary key.
> The dimensional table `users` is a ktable connector and we can specify the
> pk on the KTable.
>
> > Will it possible to use a "ktable" as a dimensional table in FLIP-132
> Yes. We can specify the watermark on the KTable and it can be used as a
> dimension table in temporal join.
>
> >Introduce a new connector vs introduce a new property
> The main reason behind is that the KTable connector almost has no common
> options with the Kafka connector. The options that can be reused by KTable
> connectors are 'topic', 'properties.bootstrap.servers' and
> 'value.fields-include' . We can't set cdc format for 'key.format' and
> 'value.format' in KTable connector now, which is  available in Kafka
> connector. Considering the difference between the options we can use, it's
> more suitable to introduce an another connector rather than a property.
>
> We are also fine to use "compacted-kafka" as the name of the new
> connector. What do you think?
>
> Best,
> Shengkai
>
> Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一 下午10:15写道:
>
>> Hi Shengkai,
>>
>> Thank you for driving this effort. I believe this a very important feature
>> for many users who use Kafka and Flink SQL together. A few questions and
>> thoughts:
>>
>> * Is your example "Use KTable as a reference/dimension table" correct? It
>> uses the "kafka" connector and does not specify a primary key.
>>
>> * Will it be possible to use a "ktable" table directly as a dimensional
>> table in temporal join (*based on event time*) (FLIP-132)? This is not
>> completely clear to me from the FLIP.
>>
>> * I'd personally prefer not to introduce a new connector and instead to
>> extend the Kafka connector. We could add an additional property
>> "compacted"
>> = "true"|"false". If it is set to "true", we can add additional validation
>> logic (e.g. "scan.startup.mode" can not be set, primary key required,
>> etc.). If we stick to a separate connector I'd not call it "ktable", but
>> rather "compacted-kafka" or similar. KTable seems to carry more implicit
>> meaning than we want to imply here.
>>
>> * I agree that this is not a bounded source. If we want to support a
>> bounded mode, this is an orthogonal concern that also applies to other
>> unbounded sources.
>>
>> Best,
>>
>> Konstantin
>>
>> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <imj...@gmail.com> wrote:
>>
>> > Hi Danny,
>> >
>> > First of all, we didn't introduce any concepts from KSQL (e.g. Stream vs
>> > Table notion).
>> > This new connector will produce a changelog stream, so it's still a
>> dynamic
>> > table and doesn't conflict with Flink core concepts.
>> >
>> > The "ktable" is just a connector name, we can also call it
>> > "compacted-kafka" or something else.
>> > Calling it "ktable" is just because KSQL users can migrate to Flink SQL
>> > easily.
>> >
>> > Regarding to why introducing a new connector vs a new property in
>> existing
>> > kafka connector:
>> >
>> > I think the main reason is that we want to have a clear separation for
>> such
>> > two use cases, because they are very different.
>> > We also listed reasons in the FLIP, including:
>> >
>> > 1) It's hard to explain what's the behavior when users specify the start
>> > offset from a middle position (e.g. how to process non exist delete
>> > events).
>> >     It's dangerous if users do that. So we don't provide the offset
>> option
>> > in the new connector at the moment.
>> > 2) It's a different perspective/abstraction on the same kafka topic
>> (append
>> > vs. upsert). It would be easier to understand if we can separate them
>> >     instead of mixing them in one connector. The new connector requires
>> > hash sink partitioner, primary key declared, regular format.
>> >     If we mix them in one connector, it might be confusing how to use
>> the
>> > options correctly.
>> > 3) The semantic of the KTable connector is just the same as KTable in
>> Kafka
>> > Stream. So it's very handy for Kafka Stream and KSQL users.
>> >     We have seen several questions in the mailing list asking how to
>> model
>> > a KTable and how to join a KTable in Flink SQL.
>> >
>> > Best,
>> > Jark
>> >
>> > On Mon, 19 Oct 2020 at 19:53, Jark Wu <imj...@gmail.com> wrote:
>> >
>> > > Hi Jingsong,
>> > >
>> > > As the FLIP describes, "KTable connector produces a changelog stream,
>> > > where each data record represents an update or delete event.".
>> > > Therefore, a ktable source is an unbounded stream source. Selecting a
>> > > ktable source is similar to selecting a kafka source with
>> debezium-json
>> > > format
>> > > that it never ends and the results are continuously updated.
>> > >
>> > > It's possible to have a bounded ktable source in the future, for
>> example,
>> > > add an option 'bounded=true' or 'end-offset=xxx'.
>> > > In this way, the ktable will produce a bounded changelog stream.
>> > > So I think this can be a compatible feature in the future.
>> > >
>> > > I don't think we should associate with ksql related concepts.
>> Actually,
>> > we
>> > > didn't introduce any concepts from KSQL (e.g. Stream vs Table notion).
>> > > The "ktable" is just a connector name, we can also call it
>> > > "compacted-kafka" or something else.
>> > > Calling it "ktable" is just because KSQL users can migrate to Flink
>> SQL
>> > > easily.
>> > >
>> > > Regarding the "value.fields-include", this is an option introduced in
>> > > FLIP-107 for Kafka connector.
>> > > I think we should keep the same behavior with the Kafka connector. I'm
>> > not
>> > > sure what's the default behavior of KSQL.
>> > > But I guess it also stores the keys in value from this example docs
>> (see
>> > > the "users_original" table) [1].
>> > >
>> > > Best,
>> > > Jark
>> > >
>> > > [1]:
>> > >
>> >
>> https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table
>> > >
>> > >
>> > > On Mon, 19 Oct 2020 at 18:17, Danny Chan <yuzhao....@gmail.com>
>> wrote:
>> > >
>> > >> The concept seems conflicts with the Flink abstraction “dynamic
>> table”,
>> > >> in Flink we see both “stream” and “table” as a dynamic table,
>> > >>
>> > >> I think we should make clear first how to express stream and table
>> > >> specific features on one “dynamic table”,
>> > >> it is more natural for KSQL because KSQL takes stream and table as
>> > >> different abstractions for representing collections. In KSQL, only
>> > table is
>> > >> mutable and can have a primary key.
>> > >>
>> > >> Does this connector belongs to the “table” scope or “stream” scope ?
>> > >>
>> > >> Some of the concepts (such as the primary key on stream) should be
>> > >> suitable for all the connectors, not just Kafka, Shouldn’t this be an
>> > >> extension of existing Kafka connector instead of a totally new
>> > connector ?
>> > >> What about the other connectors ?
>> > >>
>> > >> Because this touches the core abstraction of Flink, we better have a
>> > >> top-down overall design, following the KSQL directly is not the
>> answer.
>> > >>
>> > >> P.S. For the source
>> > >> > Shouldn’t this be an extension of existing Kafka connector instead
>> of
>> > a
>> > >> totally new connector ?
>> > >>
>> > >> How could we achieve that (e.g. set up the parallelism correctly) ?
>> > >>
>> > >> Best,
>> > >> Danny Chan
>> > >> 在 2020年10月19日 +0800 PM5:17,Jingsong Li <jingsongl...@gmail.com>,写道:
>> > >> > Thanks Shengkai for your proposal.
>> > >> >
>> > >> > +1 for this feature.
>> > >> >
>> > >> > > Future Work: Support bounded KTable source
>> > >> >
>> > >> > I don't think it should be a future work, I think it is one of the
>> > >> > important concepts of this FLIP. We need to understand it now.
>> > >> >
>> > >> > Intuitively, a ktable in my opinion is a bounded table rather than
>> a
>> > >> > stream, so select should produce a bounded table by default.
>> > >> >
>> > >> > I think we can list Kafka related knowledge, because the word
>> `ktable`
>> > >> is
>> > >> > easy to associate with ksql related concepts. (If possible, it's
>> > better
>> > >> to
>> > >> > unify with it)
>> > >> >
>> > >> > What do you think?
>> > >> >
>> > >> > > value.fields-include
>> > >> >
>> > >> > What about the default behavior of KSQL?
>> > >> >
>> > >> > Best,
>> > >> > Jingsong
>> > >> >
>> > >> > On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang <fskm...@gmail.com>
>> > >> wrote:
>> > >> >
>> > >> > > Hi, devs.
>> > >> > >
>> > >> > > Jark and I want to start a new FLIP to introduce the KTable
>> > >> connector. The
>> > >> > > KTable is a shortcut of "Kafka Table", it also has the same
>> > semantics
>> > >> with
>> > >> > > the KTable notion in Kafka Stream.
>> > >> > >
>> > >> > > FLIP-149:
>> > >> > >
>> > >> > >
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector
>> > >> > >
>> > >> > > Currently many users have expressed their needs for the upsert
>> Kafka
>> > >> by
>> > >> > > mail lists and issues. The KTable connector has several benefits
>> for
>> > >> users:
>> > >> > >
>> > >> > > 1. Users are able to interpret a compacted Kafka Topic as an
>> upsert
>> > >> stream
>> > >> > > in Apache Flink. And also be able to write a changelog stream to
>> > Kafka
>> > >> > > (into a compacted topic).
>> > >> > > 2. As a part of the real time pipeline, store join or aggregate
>> > >> result (may
>> > >> > > contain updates) into a Kafka topic for further calculation;
>> > >> > > 3. The semantic of the KTable connector is just the same as
>> KTable
>> > in
>> > >> Kafka
>> > >> > > Stream. So it's very handy for Kafka Stream and KSQL users. We
>> have
>> > >> seen
>> > >> > > several questions in the mailing list asking how to model a
>> KTable
>> > >> and how
>> > >> > > to join a KTable in Flink SQL.
>> > >> > >
>> > >> > > We hope it can expand the usage of the Flink with Kafka.
>> > >> > >
>> > >> > > I'm looking forward to your feedback.
>> > >> > >
>> > >> > > Best,
>> > >> > > Shengkai
>> > >> > >
>> > >> >
>> > >> >
>> > >> > --
>> > >> > Best, Jingsong Lee
>> > >>
>> > >
>> >
>>
>>
>> --
>>
>> Konstantin Knauf
>>
>> https://twitter.com/snntrable
>>
>> https://github.com/knaufk
>>
>

Reply via email to