Hi Shengkai,

Thank you for driving this effort. I believe this a very important feature
for many users who use Kafka and Flink SQL together. A few questions and
thoughts:

* Is your example "Use KTable as a reference/dimension table" correct? It
uses the "kafka" connector and does not specify a primary key.

* Will it be possible to use a "ktable" table directly as a dimensional
table in temporal join (*based on event time*) (FLIP-132)? This is not
completely clear to me from the FLIP.

* I'd personally prefer not to introduce a new connector and instead to
extend the Kafka connector. We could add an additional property "compacted"
= "true"|"false". If it is set to "true", we can add additional validation
logic (e.g. "scan.startup.mode" can not be set, primary key required,
etc.). If we stick to a separate connector I'd not call it "ktable", but
rather "compacted-kafka" or similar. KTable seems to carry more implicit
meaning than we want to imply here.

* I agree that this is not a bounded source. If we want to support a
bounded mode, this is an orthogonal concern that also applies to other
unbounded sources.

Best,

Konstantin

On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <imj...@gmail.com> wrote:

> Hi Danny,
>
> First of all, we didn't introduce any concepts from KSQL (e.g. Stream vs
> Table notion).
> This new connector will produce a changelog stream, so it's still a dynamic
> table and doesn't conflict with Flink core concepts.
>
> The "ktable" is just a connector name, we can also call it
> "compacted-kafka" or something else.
> Calling it "ktable" is just because KSQL users can migrate to Flink SQL
> easily.
>
> Regarding to why introducing a new connector vs a new property in existing
> kafka connector:
>
> I think the main reason is that we want to have a clear separation for such
> two use cases, because they are very different.
> We also listed reasons in the FLIP, including:
>
> 1) It's hard to explain what's the behavior when users specify the start
> offset from a middle position (e.g. how to process non exist delete
> events).
>     It's dangerous if users do that. So we don't provide the offset option
> in the new connector at the moment.
> 2) It's a different perspective/abstraction on the same kafka topic (append
> vs. upsert). It would be easier to understand if we can separate them
>     instead of mixing them in one connector. The new connector requires
> hash sink partitioner, primary key declared, regular format.
>     If we mix them in one connector, it might be confusing how to use the
> options correctly.
> 3) The semantic of the KTable connector is just the same as KTable in Kafka
> Stream. So it's very handy for Kafka Stream and KSQL users.
>     We have seen several questions in the mailing list asking how to model
> a KTable and how to join a KTable in Flink SQL.
>
> Best,
> Jark
>
> On Mon, 19 Oct 2020 at 19:53, Jark Wu <imj...@gmail.com> wrote:
>
> > Hi Jingsong,
> >
> > As the FLIP describes, "KTable connector produces a changelog stream,
> > where each data record represents an update or delete event.".
> > Therefore, a ktable source is an unbounded stream source. Selecting a
> > ktable source is similar to selecting a kafka source with debezium-json
> > format
> > that it never ends and the results are continuously updated.
> >
> > It's possible to have a bounded ktable source in the future, for example,
> > add an option 'bounded=true' or 'end-offset=xxx'.
> > In this way, the ktable will produce a bounded changelog stream.
> > So I think this can be a compatible feature in the future.
> >
> > I don't think we should associate with ksql related concepts. Actually,
> we
> > didn't introduce any concepts from KSQL (e.g. Stream vs Table notion).
> > The "ktable" is just a connector name, we can also call it
> > "compacted-kafka" or something else.
> > Calling it "ktable" is just because KSQL users can migrate to Flink SQL
> > easily.
> >
> > Regarding the "value.fields-include", this is an option introduced in
> > FLIP-107 for Kafka connector.
> > I think we should keep the same behavior with the Kafka connector. I'm
> not
> > sure what's the default behavior of KSQL.
> > But I guess it also stores the keys in value from this example docs (see
> > the "users_original" table) [1].
> >
> > Best,
> > Jark
> >
> > [1]:
> >
> https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table
> >
> >
> > On Mon, 19 Oct 2020 at 18:17, Danny Chan <yuzhao....@gmail.com> wrote:
> >
> >> The concept seems conflicts with the Flink abstraction “dynamic table”,
> >> in Flink we see both “stream” and “table” as a dynamic table,
> >>
> >> I think we should make clear first how to express stream and table
> >> specific features on one “dynamic table”,
> >> it is more natural for KSQL because KSQL takes stream and table as
> >> different abstractions for representing collections. In KSQL, only
> table is
> >> mutable and can have a primary key.
> >>
> >> Does this connector belongs to the “table” scope or “stream” scope ?
> >>
> >> Some of the concepts (such as the primary key on stream) should be
> >> suitable for all the connectors, not just Kafka, Shouldn’t this be an
> >> extension of existing Kafka connector instead of a totally new
> connector ?
> >> What about the other connectors ?
> >>
> >> Because this touches the core abstraction of Flink, we better have a
> >> top-down overall design, following the KSQL directly is not the answer.
> >>
> >> P.S. For the source
> >> > Shouldn’t this be an extension of existing Kafka connector instead of
> a
> >> totally new connector ?
> >>
> >> How could we achieve that (e.g. set up the parallelism correctly) ?
> >>
> >> Best,
> >> Danny Chan
> >> 在 2020年10月19日 +0800 PM5:17,Jingsong Li <jingsongl...@gmail.com>,写道:
> >> > Thanks Shengkai for your proposal.
> >> >
> >> > +1 for this feature.
> >> >
> >> > > Future Work: Support bounded KTable source
> >> >
> >> > I don't think it should be a future work, I think it is one of the
> >> > important concepts of this FLIP. We need to understand it now.
> >> >
> >> > Intuitively, a ktable in my opinion is a bounded table rather than a
> >> > stream, so select should produce a bounded table by default.
> >> >
> >> > I think we can list Kafka related knowledge, because the word `ktable`
> >> is
> >> > easy to associate with ksql related concepts. (If possible, it's
> better
> >> to
> >> > unify with it)
> >> >
> >> > What do you think?
> >> >
> >> > > value.fields-include
> >> >
> >> > What about the default behavior of KSQL?
> >> >
> >> > Best,
> >> > Jingsong
> >> >
> >> > On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang <fskm...@gmail.com>
> >> wrote:
> >> >
> >> > > Hi, devs.
> >> > >
> >> > > Jark and I want to start a new FLIP to introduce the KTable
> >> connector. The
> >> > > KTable is a shortcut of "Kafka Table", it also has the same
> semantics
> >> with
> >> > > the KTable notion in Kafka Stream.
> >> > >
> >> > > FLIP-149:
> >> > >
> >> > >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector
> >> > >
> >> > > Currently many users have expressed their needs for the upsert Kafka
> >> by
> >> > > mail lists and issues. The KTable connector has several benefits for
> >> users:
> >> > >
> >> > > 1. Users are able to interpret a compacted Kafka Topic as an upsert
> >> stream
> >> > > in Apache Flink. And also be able to write a changelog stream to
> Kafka
> >> > > (into a compacted topic).
> >> > > 2. As a part of the real time pipeline, store join or aggregate
> >> result (may
> >> > > contain updates) into a Kafka topic for further calculation;
> >> > > 3. The semantic of the KTable connector is just the same as KTable
> in
> >> Kafka
> >> > > Stream. So it's very handy for Kafka Stream and KSQL users. We have
> >> seen
> >> > > several questions in the mailing list asking how to model a KTable
> >> and how
> >> > > to join a KTable in Flink SQL.
> >> > >
> >> > > We hope it can expand the usage of the Flink with Kafka.
> >> > >
> >> > > I'm looking forward to your feedback.
> >> > >
> >> > > Best,
> >> > > Shengkai
> >> > >
> >> >
> >> >
> >> > --
> >> > Best, Jingsong Lee
> >>
> >
>


-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk

Reply via email to