Hi Shengkai, Thank you for driving this effort. I believe this a very important feature for many users who use Kafka and Flink SQL together. A few questions and thoughts:
* Is your example "Use KTable as a reference/dimension table" correct? It uses the "kafka" connector and does not specify a primary key. * Will it be possible to use a "ktable" table directly as a dimensional table in temporal join (*based on event time*) (FLIP-132)? This is not completely clear to me from the FLIP. * I'd personally prefer not to introduce a new connector and instead to extend the Kafka connector. We could add an additional property "compacted" = "true"|"false". If it is set to "true", we can add additional validation logic (e.g. "scan.startup.mode" can not be set, primary key required, etc.). If we stick to a separate connector I'd not call it "ktable", but rather "compacted-kafka" or similar. KTable seems to carry more implicit meaning than we want to imply here. * I agree that this is not a bounded source. If we want to support a bounded mode, this is an orthogonal concern that also applies to other unbounded sources. Best, Konstantin On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <imj...@gmail.com> wrote: > Hi Danny, > > First of all, we didn't introduce any concepts from KSQL (e.g. Stream vs > Table notion). > This new connector will produce a changelog stream, so it's still a dynamic > table and doesn't conflict with Flink core concepts. > > The "ktable" is just a connector name, we can also call it > "compacted-kafka" or something else. > Calling it "ktable" is just because KSQL users can migrate to Flink SQL > easily. > > Regarding to why introducing a new connector vs a new property in existing > kafka connector: > > I think the main reason is that we want to have a clear separation for such > two use cases, because they are very different. > We also listed reasons in the FLIP, including: > > 1) It's hard to explain what's the behavior when users specify the start > offset from a middle position (e.g. how to process non exist delete > events). > It's dangerous if users do that. So we don't provide the offset option > in the new connector at the moment. > 2) It's a different perspective/abstraction on the same kafka topic (append > vs. upsert). It would be easier to understand if we can separate them > instead of mixing them in one connector. The new connector requires > hash sink partitioner, primary key declared, regular format. > If we mix them in one connector, it might be confusing how to use the > options correctly. > 3) The semantic of the KTable connector is just the same as KTable in Kafka > Stream. So it's very handy for Kafka Stream and KSQL users. > We have seen several questions in the mailing list asking how to model > a KTable and how to join a KTable in Flink SQL. > > Best, > Jark > > On Mon, 19 Oct 2020 at 19:53, Jark Wu <imj...@gmail.com> wrote: > > > Hi Jingsong, > > > > As the FLIP describes, "KTable connector produces a changelog stream, > > where each data record represents an update or delete event.". > > Therefore, a ktable source is an unbounded stream source. Selecting a > > ktable source is similar to selecting a kafka source with debezium-json > > format > > that it never ends and the results are continuously updated. > > > > It's possible to have a bounded ktable source in the future, for example, > > add an option 'bounded=true' or 'end-offset=xxx'. > > In this way, the ktable will produce a bounded changelog stream. > > So I think this can be a compatible feature in the future. > > > > I don't think we should associate with ksql related concepts. Actually, > we > > didn't introduce any concepts from KSQL (e.g. Stream vs Table notion). > > The "ktable" is just a connector name, we can also call it > > "compacted-kafka" or something else. > > Calling it "ktable" is just because KSQL users can migrate to Flink SQL > > easily. > > > > Regarding the "value.fields-include", this is an option introduced in > > FLIP-107 for Kafka connector. > > I think we should keep the same behavior with the Kafka connector. I'm > not > > sure what's the default behavior of KSQL. > > But I guess it also stores the keys in value from this example docs (see > > the "users_original" table) [1]. > > > > Best, > > Jark > > > > [1]: > > > https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table > > > > > > On Mon, 19 Oct 2020 at 18:17, Danny Chan <yuzhao....@gmail.com> wrote: > > > >> The concept seems conflicts with the Flink abstraction “dynamic table”, > >> in Flink we see both “stream” and “table” as a dynamic table, > >> > >> I think we should make clear first how to express stream and table > >> specific features on one “dynamic table”, > >> it is more natural for KSQL because KSQL takes stream and table as > >> different abstractions for representing collections. In KSQL, only > table is > >> mutable and can have a primary key. > >> > >> Does this connector belongs to the “table” scope or “stream” scope ? > >> > >> Some of the concepts (such as the primary key on stream) should be > >> suitable for all the connectors, not just Kafka, Shouldn’t this be an > >> extension of existing Kafka connector instead of a totally new > connector ? > >> What about the other connectors ? > >> > >> Because this touches the core abstraction of Flink, we better have a > >> top-down overall design, following the KSQL directly is not the answer. > >> > >> P.S. For the source > >> > Shouldn’t this be an extension of existing Kafka connector instead of > a > >> totally new connector ? > >> > >> How could we achieve that (e.g. set up the parallelism correctly) ? > >> > >> Best, > >> Danny Chan > >> 在 2020年10月19日 +0800 PM5:17,Jingsong Li <jingsongl...@gmail.com>,写道: > >> > Thanks Shengkai for your proposal. > >> > > >> > +1 for this feature. > >> > > >> > > Future Work: Support bounded KTable source > >> > > >> > I don't think it should be a future work, I think it is one of the > >> > important concepts of this FLIP. We need to understand it now. > >> > > >> > Intuitively, a ktable in my opinion is a bounded table rather than a > >> > stream, so select should produce a bounded table by default. > >> > > >> > I think we can list Kafka related knowledge, because the word `ktable` > >> is > >> > easy to associate with ksql related concepts. (If possible, it's > better > >> to > >> > unify with it) > >> > > >> > What do you think? > >> > > >> > > value.fields-include > >> > > >> > What about the default behavior of KSQL? > >> > > >> > Best, > >> > Jingsong > >> > > >> > On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang <fskm...@gmail.com> > >> wrote: > >> > > >> > > Hi, devs. > >> > > > >> > > Jark and I want to start a new FLIP to introduce the KTable > >> connector. The > >> > > KTable is a shortcut of "Kafka Table", it also has the same > semantics > >> with > >> > > the KTable notion in Kafka Stream. > >> > > > >> > > FLIP-149: > >> > > > >> > > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector > >> > > > >> > > Currently many users have expressed their needs for the upsert Kafka > >> by > >> > > mail lists and issues. The KTable connector has several benefits for > >> users: > >> > > > >> > > 1. Users are able to interpret a compacted Kafka Topic as an upsert > >> stream > >> > > in Apache Flink. And also be able to write a changelog stream to > Kafka > >> > > (into a compacted topic). > >> > > 2. As a part of the real time pipeline, store join or aggregate > >> result (may > >> > > contain updates) into a Kafka topic for further calculation; > >> > > 3. The semantic of the KTable connector is just the same as KTable > in > >> Kafka > >> > > Stream. So it's very handy for Kafka Stream and KSQL users. We have > >> seen > >> > > several questions in the mailing list asking how to model a KTable > >> and how > >> > > to join a KTable in Flink SQL. > >> > > > >> > > We hope it can expand the usage of the Flink with Kafka. > >> > > > >> > > I'm looking forward to your feedback. > >> > > > >> > > Best, > >> > > Shengkai > >> > > > >> > > >> > > >> > -- > >> > Best, Jingsong Lee > >> > > > -- Konstantin Knauf https://twitter.com/snntrable https://github.com/knaufk