Hi devs, As many people are still confused about the difference option behaviours between the Kafka connector and KTable connector, Jark and I list the differences in the doc[1].
Best, Shengkai [1] https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二 下午12:05写道: > Hi Konstantin, > > Thanks for your reply. > > > It uses the "kafka" connector and does not specify a primary key. > The dimensional table `users` is a ktable connector and we can specify the > pk on the KTable. > > > Will it possible to use a "ktable" as a dimensional table in FLIP-132 > Yes. We can specify the watermark on the KTable and it can be used as a > dimension table in temporal join. > > >Introduce a new connector vs introduce a new property > The main reason behind is that the KTable connector almost has no common > options with the Kafka connector. The options that can be reused by KTable > connectors are 'topic', 'properties.bootstrap.servers' and > 'value.fields-include' . We can't set cdc format for 'key.format' and > 'value.format' in KTable connector now, which is available in Kafka > connector. Considering the difference between the options we can use, it's > more suitable to introduce an another connector rather than a property. > > We are also fine to use "compacted-kafka" as the name of the new > connector. What do you think? > > Best, > Shengkai > > Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一 下午10:15写道: > >> Hi Shengkai, >> >> Thank you for driving this effort. I believe this a very important feature >> for many users who use Kafka and Flink SQL together. A few questions and >> thoughts: >> >> * Is your example "Use KTable as a reference/dimension table" correct? It >> uses the "kafka" connector and does not specify a primary key. >> >> * Will it be possible to use a "ktable" table directly as a dimensional >> table in temporal join (*based on event time*) (FLIP-132)? This is not >> completely clear to me from the FLIP. >> >> * I'd personally prefer not to introduce a new connector and instead to >> extend the Kafka connector. We could add an additional property >> "compacted" >> = "true"|"false". If it is set to "true", we can add additional validation >> logic (e.g. "scan.startup.mode" can not be set, primary key required, >> etc.). If we stick to a separate connector I'd not call it "ktable", but >> rather "compacted-kafka" or similar. KTable seems to carry more implicit >> meaning than we want to imply here. >> >> * I agree that this is not a bounded source. If we want to support a >> bounded mode, this is an orthogonal concern that also applies to other >> unbounded sources. >> >> Best, >> >> Konstantin >> >> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <imj...@gmail.com> wrote: >> >> > Hi Danny, >> > >> > First of all, we didn't introduce any concepts from KSQL (e.g. Stream vs >> > Table notion). >> > This new connector will produce a changelog stream, so it's still a >> dynamic >> > table and doesn't conflict with Flink core concepts. >> > >> > The "ktable" is just a connector name, we can also call it >> > "compacted-kafka" or something else. >> > Calling it "ktable" is just because KSQL users can migrate to Flink SQL >> > easily. >> > >> > Regarding to why introducing a new connector vs a new property in >> existing >> > kafka connector: >> > >> > I think the main reason is that we want to have a clear separation for >> such >> > two use cases, because they are very different. >> > We also listed reasons in the FLIP, including: >> > >> > 1) It's hard to explain what's the behavior when users specify the start >> > offset from a middle position (e.g. how to process non exist delete >> > events). >> > It's dangerous if users do that. So we don't provide the offset >> option >> > in the new connector at the moment. >> > 2) It's a different perspective/abstraction on the same kafka topic >> (append >> > vs. upsert). It would be easier to understand if we can separate them >> > instead of mixing them in one connector. The new connector requires >> > hash sink partitioner, primary key declared, regular format. >> > If we mix them in one connector, it might be confusing how to use >> the >> > options correctly. >> > 3) The semantic of the KTable connector is just the same as KTable in >> Kafka >> > Stream. So it's very handy for Kafka Stream and KSQL users. >> > We have seen several questions in the mailing list asking how to >> model >> > a KTable and how to join a KTable in Flink SQL. >> > >> > Best, >> > Jark >> > >> > On Mon, 19 Oct 2020 at 19:53, Jark Wu <imj...@gmail.com> wrote: >> > >> > > Hi Jingsong, >> > > >> > > As the FLIP describes, "KTable connector produces a changelog stream, >> > > where each data record represents an update or delete event.". >> > > Therefore, a ktable source is an unbounded stream source. Selecting a >> > > ktable source is similar to selecting a kafka source with >> debezium-json >> > > format >> > > that it never ends and the results are continuously updated. >> > > >> > > It's possible to have a bounded ktable source in the future, for >> example, >> > > add an option 'bounded=true' or 'end-offset=xxx'. >> > > In this way, the ktable will produce a bounded changelog stream. >> > > So I think this can be a compatible feature in the future. >> > > >> > > I don't think we should associate with ksql related concepts. >> Actually, >> > we >> > > didn't introduce any concepts from KSQL (e.g. Stream vs Table notion). >> > > The "ktable" is just a connector name, we can also call it >> > > "compacted-kafka" or something else. >> > > Calling it "ktable" is just because KSQL users can migrate to Flink >> SQL >> > > easily. >> > > >> > > Regarding the "value.fields-include", this is an option introduced in >> > > FLIP-107 for Kafka connector. >> > > I think we should keep the same behavior with the Kafka connector. I'm >> > not >> > > sure what's the default behavior of KSQL. >> > > But I guess it also stores the keys in value from this example docs >> (see >> > > the "users_original" table) [1]. >> > > >> > > Best, >> > > Jark >> > > >> > > [1]: >> > > >> > >> https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table >> > > >> > > >> > > On Mon, 19 Oct 2020 at 18:17, Danny Chan <yuzhao....@gmail.com> >> wrote: >> > > >> > >> The concept seems conflicts with the Flink abstraction “dynamic >> table”, >> > >> in Flink we see both “stream” and “table” as a dynamic table, >> > >> >> > >> I think we should make clear first how to express stream and table >> > >> specific features on one “dynamic table”, >> > >> it is more natural for KSQL because KSQL takes stream and table as >> > >> different abstractions for representing collections. In KSQL, only >> > table is >> > >> mutable and can have a primary key. >> > >> >> > >> Does this connector belongs to the “table” scope or “stream” scope ? >> > >> >> > >> Some of the concepts (such as the primary key on stream) should be >> > >> suitable for all the connectors, not just Kafka, Shouldn’t this be an >> > >> extension of existing Kafka connector instead of a totally new >> > connector ? >> > >> What about the other connectors ? >> > >> >> > >> Because this touches the core abstraction of Flink, we better have a >> > >> top-down overall design, following the KSQL directly is not the >> answer. >> > >> >> > >> P.S. For the source >> > >> > Shouldn’t this be an extension of existing Kafka connector instead >> of >> > a >> > >> totally new connector ? >> > >> >> > >> How could we achieve that (e.g. set up the parallelism correctly) ? >> > >> >> > >> Best, >> > >> Danny Chan >> > >> 在 2020年10月19日 +0800 PM5:17,Jingsong Li <jingsongl...@gmail.com>,写道: >> > >> > Thanks Shengkai for your proposal. >> > >> > >> > >> > +1 for this feature. >> > >> > >> > >> > > Future Work: Support bounded KTable source >> > >> > >> > >> > I don't think it should be a future work, I think it is one of the >> > >> > important concepts of this FLIP. We need to understand it now. >> > >> > >> > >> > Intuitively, a ktable in my opinion is a bounded table rather than >> a >> > >> > stream, so select should produce a bounded table by default. >> > >> > >> > >> > I think we can list Kafka related knowledge, because the word >> `ktable` >> > >> is >> > >> > easy to associate with ksql related concepts. (If possible, it's >> > better >> > >> to >> > >> > unify with it) >> > >> > >> > >> > What do you think? >> > >> > >> > >> > > value.fields-include >> > >> > >> > >> > What about the default behavior of KSQL? >> > >> > >> > >> > Best, >> > >> > Jingsong >> > >> > >> > >> > On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang <fskm...@gmail.com> >> > >> wrote: >> > >> > >> > >> > > Hi, devs. >> > >> > > >> > >> > > Jark and I want to start a new FLIP to introduce the KTable >> > >> connector. The >> > >> > > KTable is a shortcut of "Kafka Table", it also has the same >> > semantics >> > >> with >> > >> > > the KTable notion in Kafka Stream. >> > >> > > >> > >> > > FLIP-149: >> > >> > > >> > >> > > >> > >> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector >> > >> > > >> > >> > > Currently many users have expressed their needs for the upsert >> Kafka >> > >> by >> > >> > > mail lists and issues. The KTable connector has several benefits >> for >> > >> users: >> > >> > > >> > >> > > 1. Users are able to interpret a compacted Kafka Topic as an >> upsert >> > >> stream >> > >> > > in Apache Flink. And also be able to write a changelog stream to >> > Kafka >> > >> > > (into a compacted topic). >> > >> > > 2. As a part of the real time pipeline, store join or aggregate >> > >> result (may >> > >> > > contain updates) into a Kafka topic for further calculation; >> > >> > > 3. The semantic of the KTable connector is just the same as >> KTable >> > in >> > >> Kafka >> > >> > > Stream. So it's very handy for Kafka Stream and KSQL users. We >> have >> > >> seen >> > >> > > several questions in the mailing list asking how to model a >> KTable >> > >> and how >> > >> > > to join a KTable in Flink SQL. >> > >> > > >> > >> > > We hope it can expand the usage of the Flink with Kafka. >> > >> > > >> > >> > > I'm looking forward to your feedback. >> > >> > > >> > >> > > Best, >> > >> > > Shengkai >> > >> > > >> > >> > >> > >> > >> > >> > -- >> > >> > Best, Jingsong Lee >> > >> >> > > >> > >> >> >> -- >> >> Konstantin Knauf >> >> https://twitter.com/snntrable >> >> https://github.com/knaufk >> >