Hi Konstantin,

Thanks for your reply.

> It uses the "kafka" connector and does not specify a primary key.
The dimensional table `users` is a ktable connector and we can specify the
pk on the KTable.

> Will it possible to use a "ktable" as a dimensional table in FLIP-132
Yes. We can specify the watermark on the KTable and it can be used as a
dimension table in temporal join.

>Introduce a new connector vs introduce a new property
The main reason behind is that the KTable connector almost has no common
options with the Kafka connector. The options that can be reused by KTable
connectors are 'topic', 'properties.bootstrap.servers' and
'value.fields-include' . We can't set cdc format for 'key.format' and
'value.format' in KTable connector now, which is  available in Kafka
connector. Considering the difference between the options we can use, it's
more suitable to introduce an another connector rather than a property.

We are also fine to use "compacted-kafka" as the name of the new connector.
What do you think?

Best,
Shengkai

Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一 下午10:15写道:

> Hi Shengkai,
>
> Thank you for driving this effort. I believe this a very important feature
> for many users who use Kafka and Flink SQL together. A few questions and
> thoughts:
>
> * Is your example "Use KTable as a reference/dimension table" correct? It
> uses the "kafka" connector and does not specify a primary key.
>
> * Will it be possible to use a "ktable" table directly as a dimensional
> table in temporal join (*based on event time*) (FLIP-132)? This is not
> completely clear to me from the FLIP.
>
> * I'd personally prefer not to introduce a new connector and instead to
> extend the Kafka connector. We could add an additional property "compacted"
> = "true"|"false". If it is set to "true", we can add additional validation
> logic (e.g. "scan.startup.mode" can not be set, primary key required,
> etc.). If we stick to a separate connector I'd not call it "ktable", but
> rather "compacted-kafka" or similar. KTable seems to carry more implicit
> meaning than we want to imply here.
>
> * I agree that this is not a bounded source. If we want to support a
> bounded mode, this is an orthogonal concern that also applies to other
> unbounded sources.
>
> Best,
>
> Konstantin
>
> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <imj...@gmail.com> wrote:
>
> > Hi Danny,
> >
> > First of all, we didn't introduce any concepts from KSQL (e.g. Stream vs
> > Table notion).
> > This new connector will produce a changelog stream, so it's still a
> dynamic
> > table and doesn't conflict with Flink core concepts.
> >
> > The "ktable" is just a connector name, we can also call it
> > "compacted-kafka" or something else.
> > Calling it "ktable" is just because KSQL users can migrate to Flink SQL
> > easily.
> >
> > Regarding to why introducing a new connector vs a new property in
> existing
> > kafka connector:
> >
> > I think the main reason is that we want to have a clear separation for
> such
> > two use cases, because they are very different.
> > We also listed reasons in the FLIP, including:
> >
> > 1) It's hard to explain what's the behavior when users specify the start
> > offset from a middle position (e.g. how to process non exist delete
> > events).
> >     It's dangerous if users do that. So we don't provide the offset
> option
> > in the new connector at the moment.
> > 2) It's a different perspective/abstraction on the same kafka topic
> (append
> > vs. upsert). It would be easier to understand if we can separate them
> >     instead of mixing them in one connector. The new connector requires
> > hash sink partitioner, primary key declared, regular format.
> >     If we mix them in one connector, it might be confusing how to use the
> > options correctly.
> > 3) The semantic of the KTable connector is just the same as KTable in
> Kafka
> > Stream. So it's very handy for Kafka Stream and KSQL users.
> >     We have seen several questions in the mailing list asking how to
> model
> > a KTable and how to join a KTable in Flink SQL.
> >
> > Best,
> > Jark
> >
> > On Mon, 19 Oct 2020 at 19:53, Jark Wu <imj...@gmail.com> wrote:
> >
> > > Hi Jingsong,
> > >
> > > As the FLIP describes, "KTable connector produces a changelog stream,
> > > where each data record represents an update or delete event.".
> > > Therefore, a ktable source is an unbounded stream source. Selecting a
> > > ktable source is similar to selecting a kafka source with debezium-json
> > > format
> > > that it never ends and the results are continuously updated.
> > >
> > > It's possible to have a bounded ktable source in the future, for
> example,
> > > add an option 'bounded=true' or 'end-offset=xxx'.
> > > In this way, the ktable will produce a bounded changelog stream.
> > > So I think this can be a compatible feature in the future.
> > >
> > > I don't think we should associate with ksql related concepts. Actually,
> > we
> > > didn't introduce any concepts from KSQL (e.g. Stream vs Table notion).
> > > The "ktable" is just a connector name, we can also call it
> > > "compacted-kafka" or something else.
> > > Calling it "ktable" is just because KSQL users can migrate to Flink SQL
> > > easily.
> > >
> > > Regarding the "value.fields-include", this is an option introduced in
> > > FLIP-107 for Kafka connector.
> > > I think we should keep the same behavior with the Kafka connector. I'm
> > not
> > > sure what's the default behavior of KSQL.
> > > But I guess it also stores the keys in value from this example docs
> (see
> > > the "users_original" table) [1].
> > >
> > > Best,
> > > Jark
> > >
> > > [1]:
> > >
> >
> https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table
> > >
> > >
> > > On Mon, 19 Oct 2020 at 18:17, Danny Chan <yuzhao....@gmail.com> wrote:
> > >
> > >> The concept seems conflicts with the Flink abstraction “dynamic
> table”,
> > >> in Flink we see both “stream” and “table” as a dynamic table,
> > >>
> > >> I think we should make clear first how to express stream and table
> > >> specific features on one “dynamic table”,
> > >> it is more natural for KSQL because KSQL takes stream and table as
> > >> different abstractions for representing collections. In KSQL, only
> > table is
> > >> mutable and can have a primary key.
> > >>
> > >> Does this connector belongs to the “table” scope or “stream” scope ?
> > >>
> > >> Some of the concepts (such as the primary key on stream) should be
> > >> suitable for all the connectors, not just Kafka, Shouldn’t this be an
> > >> extension of existing Kafka connector instead of a totally new
> > connector ?
> > >> What about the other connectors ?
> > >>
> > >> Because this touches the core abstraction of Flink, we better have a
> > >> top-down overall design, following the KSQL directly is not the
> answer.
> > >>
> > >> P.S. For the source
> > >> > Shouldn’t this be an extension of existing Kafka connector instead
> of
> > a
> > >> totally new connector ?
> > >>
> > >> How could we achieve that (e.g. set up the parallelism correctly) ?
> > >>
> > >> Best,
> > >> Danny Chan
> > >> 在 2020年10月19日 +0800 PM5:17,Jingsong Li <jingsongl...@gmail.com>,写道:
> > >> > Thanks Shengkai for your proposal.
> > >> >
> > >> > +1 for this feature.
> > >> >
> > >> > > Future Work: Support bounded KTable source
> > >> >
> > >> > I don't think it should be a future work, I think it is one of the
> > >> > important concepts of this FLIP. We need to understand it now.
> > >> >
> > >> > Intuitively, a ktable in my opinion is a bounded table rather than a
> > >> > stream, so select should produce a bounded table by default.
> > >> >
> > >> > I think we can list Kafka related knowledge, because the word
> `ktable`
> > >> is
> > >> > easy to associate with ksql related concepts. (If possible, it's
> > better
> > >> to
> > >> > unify with it)
> > >> >
> > >> > What do you think?
> > >> >
> > >> > > value.fields-include
> > >> >
> > >> > What about the default behavior of KSQL?
> > >> >
> > >> > Best,
> > >> > Jingsong
> > >> >
> > >> > On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang <fskm...@gmail.com>
> > >> wrote:
> > >> >
> > >> > > Hi, devs.
> > >> > >
> > >> > > Jark and I want to start a new FLIP to introduce the KTable
> > >> connector. The
> > >> > > KTable is a shortcut of "Kafka Table", it also has the same
> > semantics
> > >> with
> > >> > > the KTable notion in Kafka Stream.
> > >> > >
> > >> > > FLIP-149:
> > >> > >
> > >> > >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector
> > >> > >
> > >> > > Currently many users have expressed their needs for the upsert
> Kafka
> > >> by
> > >> > > mail lists and issues. The KTable connector has several benefits
> for
> > >> users:
> > >> > >
> > >> > > 1. Users are able to interpret a compacted Kafka Topic as an
> upsert
> > >> stream
> > >> > > in Apache Flink. And also be able to write a changelog stream to
> > Kafka
> > >> > > (into a compacted topic).
> > >> > > 2. As a part of the real time pipeline, store join or aggregate
> > >> result (may
> > >> > > contain updates) into a Kafka topic for further calculation;
> > >> > > 3. The semantic of the KTable connector is just the same as KTable
> > in
> > >> Kafka
> > >> > > Stream. So it's very handy for Kafka Stream and KSQL users. We
> have
> > >> seen
> > >> > > several questions in the mailing list asking how to model a KTable
> > >> and how
> > >> > > to join a KTable in Flink SQL.
> > >> > >
> > >> > > We hope it can expand the usage of the Flink with Kafka.
> > >> > >
> > >> > > I'm looking forward to your feedback.
> > >> > >
> > >> > > Best,
> > >> > > Shengkai
> > >> > >
> > >> >
> > >> >
> > >> > --
> > >> > Best, Jingsong Lee
> > >>
> > >
> >
>
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>

Reply via email to