Hi Kurt, Hi Shengkai, thanks for answering my questions and the additional clarifications. I don't have a strong opinion on whether to extend the "kafka" connector or to introduce a new connector. So, from my perspective feel free to go with a separate connector. If we do introduce a new connector I wouldn't call it "ktable" for aforementioned reasons (In addition, we might suggest that there is also a "kstreams" connector for symmetry reasons). I don't have a good alternative name, though, maybe "kafka-compacted" or "compacted-kafka".
Thanks, Konstantin On Wed, Oct 21, 2020 at 4:43 AM Kurt Young <ykt...@gmail.com> wrote: > Hi all, > > I want to describe the discussion process which drove us to have such > conclusion, this might make some of > the design choices easier to understand and keep everyone on the same page. > > Back to the motivation, what functionality do we want to provide in the > first place? We got a lot of feedback and > questions from mailing lists that people want to write Not-Insert-Only > messages into kafka. They might be > intentional or by accident, e.g. wrote an non-windowed aggregate query or > non-windowed left outer join. And > some users from KSQL world also asked about why Flink didn't leverage the > Key concept of every kafka topic > and make kafka as a dynamic changing keyed table. > > To work with kafka better, we were thinking to extend the functionality of > the current kafka connector by letting it > accept updates and deletions. But due to the limitation of kafka, the > update has to be "update by key", aka a table > with primary key. > > This introduces a couple of conflicts with current kafka table's options: > 1. key.fields: as said above, we need the kafka table to have the primary > key constraint. And users can also configure > key.fields freely, this might cause friction. (Sure we can do some sanity > check on this but it also creates friction.) > 2. sink.partitioner: to make the semantics right, we need to make sure all > the updates on the same key are written to > the same kafka partition, such we should force to use a hash by key > partition inside such table. Again, this has conflicts > and creates friction with current user options. > > The above things are solvable, though not perfect or most user friendly. > > Let's take a look at the reading side. The keyed kafka table contains two > kinds of messages: upsert or deletion. What upsert > means is "If the key doesn't exist yet, it's an insert record. Otherwise > it's an update record". For the sake of correctness or > simplicity, the Flink SQL engine also needs such information. If we > interpret all messages to "update record", some queries or > operators may not work properly. It's weird to see an update record but you > haven't seen the insert record before. > > So what Flink should do is after reading out the records from such table, > it needs to create a state to record which messages have > been seen and then generate the correct row type correspondingly. This kind > of couples the state and the data of the message > queue, and it also creates conflicts with current kafka connector. > > Think about if users suspend a running job (which contains some reading > state now), and then change the start offset of the reader. > By changing the reading offset, it actually change the whole story of > "which records should be insert messages and which records > should be update messages). And it will also make Flink to deal with > another weird situation that it might receive a deletion > on a non existing message. > > We were unsatisfied with all the frictions and conflicts it will create if > we enable the "upsert & deletion" support to the current kafka > connector. And later we begin to realize that we shouldn't treat it as a > normal message queue, but should treat it as a changing keyed > table. We should be able to always get the whole data of such table (by > disabling the start offset option) and we can also read the > changelog out of such table. It's like a HBase table with binlog support > but doesn't have random access capability (which can be fulfilled > by Flink's state). > > So our intention was instead of telling and persuading users what kind of > options they should or should not use by extending > current kafka connector when enable upsert support, we are actually create > a whole new and different connector that has total > different abstractions in SQL layer, and should be treated totally > different with current kafka connector. > > Hope this can clarify some of the concerns. > > Best, > Kurt > > > On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang <fskm...@gmail.com> wrote: > > > Hi devs, > > > > As many people are still confused about the difference option behaviours > > between the Kafka connector and KTable connector, Jark and I list the > > differences in the doc[1]. > > > > Best, > > Shengkai > > > > [1] > > > > > https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit > > > > Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二 下午12:05写道: > > > > > Hi Konstantin, > > > > > > Thanks for your reply. > > > > > > > It uses the "kafka" connector and does not specify a primary key. > > > The dimensional table `users` is a ktable connector and we can specify > > the > > > pk on the KTable. > > > > > > > Will it possible to use a "ktable" as a dimensional table in FLIP-132 > > > Yes. We can specify the watermark on the KTable and it can be used as a > > > dimension table in temporal join. > > > > > > >Introduce a new connector vs introduce a new property > > > The main reason behind is that the KTable connector almost has no > common > > > options with the Kafka connector. The options that can be reused by > > KTable > > > connectors are 'topic', 'properties.bootstrap.servers' and > > > 'value.fields-include' . We can't set cdc format for 'key.format' and > > > 'value.format' in KTable connector now, which is available in Kafka > > > connector. Considering the difference between the options we can use, > > it's > > > more suitable to introduce an another connector rather than a property. > > > > > > We are also fine to use "compacted-kafka" as the name of the new > > > connector. What do you think? > > > > > > Best, > > > Shengkai > > > > > > Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一 下午10:15写道: > > > > > >> Hi Shengkai, > > >> > > >> Thank you for driving this effort. I believe this a very important > > feature > > >> for many users who use Kafka and Flink SQL together. A few questions > and > > >> thoughts: > > >> > > >> * Is your example "Use KTable as a reference/dimension table" correct? > > It > > >> uses the "kafka" connector and does not specify a primary key. > > >> > > >> * Will it be possible to use a "ktable" table directly as a > dimensional > > >> table in temporal join (*based on event time*) (FLIP-132)? This is not > > >> completely clear to me from the FLIP. > > >> > > >> * I'd personally prefer not to introduce a new connector and instead > to > > >> extend the Kafka connector. We could add an additional property > > >> "compacted" > > >> = "true"|"false". If it is set to "true", we can add additional > > validation > > >> logic (e.g. "scan.startup.mode" can not be set, primary key required, > > >> etc.). If we stick to a separate connector I'd not call it "ktable", > but > > >> rather "compacted-kafka" or similar. KTable seems to carry more > implicit > > >> meaning than we want to imply here. > > >> > > >> * I agree that this is not a bounded source. If we want to support a > > >> bounded mode, this is an orthogonal concern that also applies to other > > >> unbounded sources. > > >> > > >> Best, > > >> > > >> Konstantin > > >> > > >> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <imj...@gmail.com> wrote: > > >> > > >> > Hi Danny, > > >> > > > >> > First of all, we didn't introduce any concepts from KSQL (e.g. > Stream > > vs > > >> > Table notion). > > >> > This new connector will produce a changelog stream, so it's still a > > >> dynamic > > >> > table and doesn't conflict with Flink core concepts. > > >> > > > >> > The "ktable" is just a connector name, we can also call it > > >> > "compacted-kafka" or something else. > > >> > Calling it "ktable" is just because KSQL users can migrate to Flink > > SQL > > >> > easily. > > >> > > > >> > Regarding to why introducing a new connector vs a new property in > > >> existing > > >> > kafka connector: > > >> > > > >> > I think the main reason is that we want to have a clear separation > for > > >> such > > >> > two use cases, because they are very different. > > >> > We also listed reasons in the FLIP, including: > > >> > > > >> > 1) It's hard to explain what's the behavior when users specify the > > start > > >> > offset from a middle position (e.g. how to process non exist delete > > >> > events). > > >> > It's dangerous if users do that. So we don't provide the offset > > >> option > > >> > in the new connector at the moment. > > >> > 2) It's a different perspective/abstraction on the same kafka topic > > >> (append > > >> > vs. upsert). It would be easier to understand if we can separate > them > > >> > instead of mixing them in one connector. The new connector > > requires > > >> > hash sink partitioner, primary key declared, regular format. > > >> > If we mix them in one connector, it might be confusing how to > use > > >> the > > >> > options correctly. > > >> > 3) The semantic of the KTable connector is just the same as KTable > in > > >> Kafka > > >> > Stream. So it's very handy for Kafka Stream and KSQL users. > > >> > We have seen several questions in the mailing list asking how to > > >> model > > >> > a KTable and how to join a KTable in Flink SQL. > > >> > > > >> > Best, > > >> > Jark > > >> > > > >> > On Mon, 19 Oct 2020 at 19:53, Jark Wu <imj...@gmail.com> wrote: > > >> > > > >> > > Hi Jingsong, > > >> > > > > >> > > As the FLIP describes, "KTable connector produces a changelog > > stream, > > >> > > where each data record represents an update or delete event.". > > >> > > Therefore, a ktable source is an unbounded stream source. > Selecting > > a > > >> > > ktable source is similar to selecting a kafka source with > > >> debezium-json > > >> > > format > > >> > > that it never ends and the results are continuously updated. > > >> > > > > >> > > It's possible to have a bounded ktable source in the future, for > > >> example, > > >> > > add an option 'bounded=true' or 'end-offset=xxx'. > > >> > > In this way, the ktable will produce a bounded changelog stream. > > >> > > So I think this can be a compatible feature in the future. > > >> > > > > >> > > I don't think we should associate with ksql related concepts. > > >> Actually, > > >> > we > > >> > > didn't introduce any concepts from KSQL (e.g. Stream vs Table > > notion). > > >> > > The "ktable" is just a connector name, we can also call it > > >> > > "compacted-kafka" or something else. > > >> > > Calling it "ktable" is just because KSQL users can migrate to > Flink > > >> SQL > > >> > > easily. > > >> > > > > >> > > Regarding the "value.fields-include", this is an option introduced > > in > > >> > > FLIP-107 for Kafka connector. > > >> > > I think we should keep the same behavior with the Kafka connector. > > I'm > > >> > not > > >> > > sure what's the default behavior of KSQL. > > >> > > But I guess it also stores the keys in value from this example > docs > > >> (see > > >> > > the "users_original" table) [1]. > > >> > > > > >> > > Best, > > >> > > Jark > > >> > > > > >> > > [1]: > > >> > > > > >> > > > >> > > > https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table > > >> > > > > >> > > > > >> > > On Mon, 19 Oct 2020 at 18:17, Danny Chan <yuzhao....@gmail.com> > > >> wrote: > > >> > > > > >> > >> The concept seems conflicts with the Flink abstraction “dynamic > > >> table”, > > >> > >> in Flink we see both “stream” and “table” as a dynamic table, > > >> > >> > > >> > >> I think we should make clear first how to express stream and > table > > >> > >> specific features on one “dynamic table”, > > >> > >> it is more natural for KSQL because KSQL takes stream and table > as > > >> > >> different abstractions for representing collections. In KSQL, > only > > >> > table is > > >> > >> mutable and can have a primary key. > > >> > >> > > >> > >> Does this connector belongs to the “table” scope or “stream” > scope > > ? > > >> > >> > > >> > >> Some of the concepts (such as the primary key on stream) should > be > > >> > >> suitable for all the connectors, not just Kafka, Shouldn’t this > be > > an > > >> > >> extension of existing Kafka connector instead of a totally new > > >> > connector ? > > >> > >> What about the other connectors ? > > >> > >> > > >> > >> Because this touches the core abstraction of Flink, we better > have > > a > > >> > >> top-down overall design, following the KSQL directly is not the > > >> answer. > > >> > >> > > >> > >> P.S. For the source > > >> > >> > Shouldn’t this be an extension of existing Kafka connector > > instead > > >> of > > >> > a > > >> > >> totally new connector ? > > >> > >> > > >> > >> How could we achieve that (e.g. set up the parallelism > correctly) ? > > >> > >> > > >> > >> Best, > > >> > >> Danny Chan > > >> > >> 在 2020年10月19日 +0800 PM5:17,Jingsong Li <jingsongl...@gmail.com > > >,写道: > > >> > >> > Thanks Shengkai for your proposal. > > >> > >> > > > >> > >> > +1 for this feature. > > >> > >> > > > >> > >> > > Future Work: Support bounded KTable source > > >> > >> > > > >> > >> > I don't think it should be a future work, I think it is one of > > the > > >> > >> > important concepts of this FLIP. We need to understand it now. > > >> > >> > > > >> > >> > Intuitively, a ktable in my opinion is a bounded table rather > > than > > >> a > > >> > >> > stream, so select should produce a bounded table by default. > > >> > >> > > > >> > >> > I think we can list Kafka related knowledge, because the word > > >> `ktable` > > >> > >> is > > >> > >> > easy to associate with ksql related concepts. (If possible, > it's > > >> > better > > >> > >> to > > >> > >> > unify with it) > > >> > >> > > > >> > >> > What do you think? > > >> > >> > > > >> > >> > > value.fields-include > > >> > >> > > > >> > >> > What about the default behavior of KSQL? > > >> > >> > > > >> > >> > Best, > > >> > >> > Jingsong > > >> > >> > > > >> > >> > On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang < > fskm...@gmail.com > > > > > >> > >> wrote: > > >> > >> > > > >> > >> > > Hi, devs. > > >> > >> > > > > >> > >> > > Jark and I want to start a new FLIP to introduce the KTable > > >> > >> connector. The > > >> > >> > > KTable is a shortcut of "Kafka Table", it also has the same > > >> > semantics > > >> > >> with > > >> > >> > > the KTable notion in Kafka Stream. > > >> > >> > > > > >> > >> > > FLIP-149: > > >> > >> > > > > >> > >> > > > > >> > >> > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector > > >> > >> > > > > >> > >> > > Currently many users have expressed their needs for the > upsert > > >> Kafka > > >> > >> by > > >> > >> > > mail lists and issues. The KTable connector has several > > benefits > > >> for > > >> > >> users: > > >> > >> > > > > >> > >> > > 1. Users are able to interpret a compacted Kafka Topic as an > > >> upsert > > >> > >> stream > > >> > >> > > in Apache Flink. And also be able to write a changelog stream > > to > > >> > Kafka > > >> > >> > > (into a compacted topic). > > >> > >> > > 2. As a part of the real time pipeline, store join or > aggregate > > >> > >> result (may > > >> > >> > > contain updates) into a Kafka topic for further calculation; > > >> > >> > > 3. The semantic of the KTable connector is just the same as > > >> KTable > > >> > in > > >> > >> Kafka > > >> > >> > > Stream. So it's very handy for Kafka Stream and KSQL users. > We > > >> have > > >> > >> seen > > >> > >> > > several questions in the mailing list asking how to model a > > >> KTable > > >> > >> and how > > >> > >> > > to join a KTable in Flink SQL. > > >> > >> > > > > >> > >> > > We hope it can expand the usage of the Flink with Kafka. > > >> > >> > > > > >> > >> > > I'm looking forward to your feedback. > > >> > >> > > > > >> > >> > > Best, > > >> > >> > > Shengkai > > >> > >> > > > > >> > >> > > > >> > >> > > > >> > >> > -- > > >> > >> > Best, Jingsong Lee > > >> > >> > > >> > > > > >> > > > >> > > >> > > >> -- > > >> > > >> Konstantin Knauf > > >> > > >> https://twitter.com/snntrable > > >> > > >> https://github.com/knaufk > > >> > > > > > > -- Konstantin Knauf https://twitter.com/snntrable https://github.com/knaufk