Another name is "connector=upsert-kafka', I think this can solve Timo's concern on the "compacted" word.
Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify such kafka sources. I think "upsert" is a well-known terminology widely used in many systems and matches the behavior of how we handle the kafka messages. What do you think? Best, Jark [1]: https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic On Thu, 22 Oct 2020 at 22:53, Kurt Young <[email protected]> wrote: > Good validation messages can't solve the broken user experience, especially > that > such update mode option will implicitly make half of current kafka options > invalid or doesn't > make sense. > > Best, > Kurt > > > On Thu, Oct 22, 2020 at 10:31 PM Jark Wu <[email protected]> wrote: > > > Hi Timo, Seth, > > > > The default value "inserting" of "mode" might be not suitable, > > because "debezium-json" emits changelog messages which include updates. > > > > On Thu, 22 Oct 2020 at 22:10, Seth Wiesman <[email protected]> wrote: > > > > > +1 for supporting upsert results into Kafka. > > > > > > I have no comments on the implementation details. > > > > > > As far as configuration goes, I tend to favor Timo's option where we > add > > a > > > "mode" property to the existing Kafka table with default value > > "inserting". > > > If the mode is set to "updating" then the validation changes to the new > > > requirements. I personally find it more intuitive than a seperate > > > connector, my fear is users won't understand its the same physical > kafka > > > sink under the hood and it will lead to other confusion like does it > > offer > > > the same persistence guarantees? I think we are capable of adding good > > > valdiation messaging that solves Jark and Kurts concerns. > > > > > > > > > On Thu, Oct 22, 2020 at 8:51 AM Timo Walther <[email protected]> > wrote: > > > > > > > Hi Jark, > > > > > > > > "calling it "kafka-compacted" can even remind users to enable log > > > > compaction" > > > > > > > > But sometimes users like to store a lineage of changes in their > topics. > > > > Indepent of any ktable/kstream interpretation. > > > > > > > > I let the majority decide on this topic to not further block this > > > > effort. But we might find a better name like: > > > > > > > > connector = kafka > > > > mode = updating/inserting > > > > > > > > OR > > > > > > > > connector = kafka-updating > > > > > > > > ... > > > > > > > > Regards, > > > > Timo > > > > > > > > > > > > > > > > > > > > On 22.10.20 15:24, Jark Wu wrote: > > > > > Hi Timo, > > > > > > > > > > Thanks for your opinions. > > > > > > > > > > 1) Implementation > > > > > We will have an stateful operator to generate INSERT and > > UPDATE_BEFORE. > > > > > This operator is keyby-ed (primary key as the shuffle key) after > the > > > > source > > > > > operator. > > > > > The implementation of this operator is very similar to the existing > > > > > `DeduplicateKeepLastRowFunction`. > > > > > The operator will register a value state using the primary key > fields > > > as > > > > > keys. > > > > > When the value state is empty under current key, we will emit > INSERT > > > for > > > > > the input row. > > > > > When the value state is not empty under current key, we will emit > > > > > UPDATE_BEFORE using the row in state, > > > > > and emit UPDATE_AFTER using the input row. > > > > > When the input row is DELETE, we will clear state and emit DELETE > > row. > > > > > > > > > > 2) new option vs new connector > > > > >> We recently simplified the table options to a minimum amount of > > > > > characters to be as concise as possible in the DDL. > > > > > I think this is the reason why we want to introduce a new > connector, > > > > > because we can simplify the options in DDL. > > > > > For example, if using a new option, the DDL may look like this: > > > > > > > > > > CREATE TABLE users ( > > > > > user_id BIGINT, > > > > > user_name STRING, > > > > > user_level STRING, > > > > > region STRING, > > > > > PRIMARY KEY (user_id) NOT ENFORCED > > > > > ) WITH ( > > > > > 'connector' = 'kafka', > > > > > 'model' = 'table', > > > > > 'topic' = 'pageviews_per_region', > > > > > 'properties.bootstrap.servers' = '...', > > > > > 'properties.group.id' = 'testGroup', > > > > > 'scan.startup.mode' = 'earliest', > > > > > 'key.format' = 'csv', > > > > > 'key.fields' = 'user_id', > > > > > 'value.format' = 'avro', > > > > > 'sink.partitioner' = 'hash' > > > > > ); > > > > > > > > > > If using a new connector, we can have a different default value for > > the > > > > > options and remove unnecessary options, > > > > > the DDL can look like this which is much more concise: > > > > > > > > > > CREATE TABLE pageviews_per_region ( > > > > > user_id BIGINT, > > > > > user_name STRING, > > > > > user_level STRING, > > > > > region STRING, > > > > > PRIMARY KEY (user_id) NOT ENFORCED > > > > > ) WITH ( > > > > > 'connector' = 'kafka-compacted', > > > > > 'topic' = 'pageviews_per_region', > > > > > 'properties.bootstrap.servers' = '...', > > > > > 'key.format' = 'csv', > > > > > 'value.format' = 'avro' > > > > > ); > > > > > > > > > >> When people read `connector=kafka-compacted` they might not know > > that > > > it > > > > >> has ktable semantics. You don't need to enable log compaction in > > order > > > > >> to use a KTable as far as I know. > > > > > We don't need to let users know it has ktable semantics, as > > Konstantin > > > > > mentioned this may carry more implicit > > > > > meaning than we want to imply here. I agree users don't need to > > enable > > > > log > > > > > compaction, but from the production perspective, > > > > > log compaction should always be enabled if it is used in this > > purpose. > > > > > Calling it "kafka-compacted" can even remind users to enable log > > > > compaction. > > > > > > > > > > I don't agree to introduce "model = table/stream" option, or > > > > > "connector=kafka-table", > > > > > because this means we are introducing Table vs Stream concept from > > > KSQL. > > > > > However, we don't have such top-level concept in Flink SQL now, > this > > > will > > > > > further confuse users. > > > > > In Flink SQL, all the things are STREAM, the differences are > whether > > it > > > > is > > > > > bounded or unbounded, > > > > > whether it is insert-only or changelog. > > > > > > > > > > > > > > > Best, > > > > > Jark > > > > > > > > > > > > > > > On Thu, 22 Oct 2020 at 20:39, Timo Walther <[email protected]> > > wrote: > > > > > > > > > >> Hi Shengkai, Hi Jark, > > > > >> > > > > >> thanks for this great proposal. It is time to finally connect the > > > > >> changelog processor with a compacted Kafka topic. > > > > >> > > > > >> "The operator will produce INSERT rows, or additionally generate > > > > >> UPDATE_BEFORE rows for the previous image, or produce DELETE rows > > with > > > > >> all columns filled with values." > > > > >> > > > > >> Could you elaborate a bit on the implementation details in the > FLIP? > > > How > > > > >> are UPDATE_BEFOREs are generated. How much state is required to > > > perform > > > > >> this operation. > > > > >> > > > > >> From a conceptual and semantical point of view, I'm fine with > the > > > > >> proposal. But I would like to share my opinion about how we expose > > > this > > > > >> feature: > > > > >> > > > > >> ktable vs kafka-compacted > > > > >> > > > > >> I'm against having an additional connector like `ktable` or > > > > >> `kafka-compacted`. We recently simplified the table options to a > > > minimum > > > > >> amount of characters to be as concise as possible in the DDL. > > > Therefore, > > > > >> I would keep the `connector=kafka` and introduce an additional > > option. > > > > >> Because a user wants to read "from Kafka". And the "how" should be > > > > >> determined in the lower options. > > > > >> > > > > >> When people read `connector=ktable` they might not know that this > is > > > > >> Kafka. Or they wonder where `kstream` is? > > > > >> > > > > >> When people read `connector=kafka-compacted` they might not know > > that > > > it > > > > >> has ktable semantics. You don't need to enable log compaction in > > order > > > > >> to use a KTable as far as I know. Log compaction and table > semantics > > > are > > > > >> orthogonal topics. > > > > >> > > > > >> In the end we will need 3 types of information when declaring a > > Kafka > > > > >> connector: > > > > >> > > > > >> CREATE TABLE ... WITH ( > > > > >> connector=kafka -- Some information about the connector > > > > >> end-offset = XXXX -- Some information about the > boundedness > > > > >> model = table/stream -- Some information about > interpretation > > > > >> ) > > > > >> > > > > >> > > > > >> We can still apply all the constraints mentioned in the FLIP. When > > > > >> `model` is set to `table`. > > > > >> > > > > >> What do you think? > > > > >> > > > > >> Regards, > > > > >> Timo > > > > >> > > > > >> > > > > >> On 21.10.20 14:19, Jark Wu wrote: > > > > >>> Hi, > > > > >>> > > > > >>> IMO, if we are going to mix them in one connector, > > > > >>> 1) either users need to set some options to a specific value > > > > explicitly, > > > > >>> e.g. "scan.startup.mode=earliest", "sink.partitioner=hash", etc.. > > > > >>> This makes the connector awkward to use. Users may face to fix > > > options > > > > >> one > > > > >>> by one according to the exception. > > > > >>> Besides, in the future, it is still possible to use > > > > >>> "sink.partitioner=fixed" (reduce network cost) if users are aware > > of > > > > >>> the partition routing, > > > > >>> however, it's error-prone to have "fixed" as default for > compacted > > > > mode. > > > > >>> > > > > >>> 2) or make those options a different default value when > > > > "compacted=true". > > > > >>> This would be more confusing and unpredictable if the default > value > > > of > > > > >>> options will change according to other options. > > > > >>> What happens if we have a third mode in the future? > > > > >>> > > > > >>> In terms of usage and options, it's very different from the > > > > >>> original "kafka" connector. > > > > >>> It would be more handy to use and less fallible if separating > them > > > into > > > > >> two > > > > >>> connectors. > > > > >>> In the implementation layer, we can reuse code as much as > possible. > > > > >>> > > > > >>> Therefore, I'm still +1 to have a new connector. > > > > >>> The "kafka-compacted" name sounds good to me. > > > > >>> > > > > >>> Best, > > > > >>> Jark > > > > >>> > > > > >>> > > > > >>> On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf < > [email protected]> > > > > >> wrote: > > > > >>> > > > > >>>> Hi Kurt, Hi Shengkai, > > > > >>>> > > > > >>>> thanks for answering my questions and the additional > > > clarifications. I > > > > >>>> don't have a strong opinion on whether to extend the "kafka" > > > connector > > > > >> or > > > > >>>> to introduce a new connector. So, from my perspective feel free > to > > > go > > > > >> with > > > > >>>> a separate connector. If we do introduce a new connector I > > wouldn't > > > > >> call it > > > > >>>> "ktable" for aforementioned reasons (In addition, we might > suggest > > > > that > > > > >>>> there is also a "kstreams" connector for symmetry reasons). I > > don't > > > > >> have a > > > > >>>> good alternative name, though, maybe "kafka-compacted" or > > > > >>>> "compacted-kafka". > > > > >>>> > > > > >>>> Thanks, > > > > >>>> > > > > >>>> Konstantin > > > > >>>> > > > > >>>> > > > > >>>> On Wed, Oct 21, 2020 at 4:43 AM Kurt Young <[email protected]> > > > wrote: > > > > >>>> > > > > >>>>> Hi all, > > > > >>>>> > > > > >>>>> I want to describe the discussion process which drove us to > have > > > such > > > > >>>>> conclusion, this might make some of > > > > >>>>> the design choices easier to understand and keep everyone on > the > > > same > > > > >>>> page. > > > > >>>>> > > > > >>>>> Back to the motivation, what functionality do we want to > provide > > in > > > > the > > > > >>>>> first place? We got a lot of feedback and > > > > >>>>> questions from mailing lists that people want to write > > > > Not-Insert-Only > > > > >>>>> messages into kafka. They might be > > > > >>>>> intentional or by accident, e.g. wrote an non-windowed > aggregate > > > > query > > > > >> or > > > > >>>>> non-windowed left outer join. And > > > > >>>>> some users from KSQL world also asked about why Flink didn't > > > leverage > > > > >> the > > > > >>>>> Key concept of every kafka topic > > > > >>>>> and make kafka as a dynamic changing keyed table. > > > > >>>>> > > > > >>>>> To work with kafka better, we were thinking to extend the > > > > functionality > > > > >>>> of > > > > >>>>> the current kafka connector by letting it > > > > >>>>> accept updates and deletions. But due to the limitation of > kafka, > > > the > > > > >>>>> update has to be "update by key", aka a table > > > > >>>>> with primary key. > > > > >>>>> > > > > >>>>> This introduces a couple of conflicts with current kafka > table's > > > > >> options: > > > > >>>>> 1. key.fields: as said above, we need the kafka table to have > the > > > > >> primary > > > > >>>>> key constraint. And users can also configure > > > > >>>>> key.fields freely, this might cause friction. (Sure we can do > > some > > > > >> sanity > > > > >>>>> check on this but it also creates friction.) > > > > >>>>> 2. sink.partitioner: to make the semantics right, we need to > make > > > > sure > > > > >>>> all > > > > >>>>> the updates on the same key are written to > > > > >>>>> the same kafka partition, such we should force to use a hash by > > key > > > > >>>>> partition inside such table. Again, this has conflicts > > > > >>>>> and creates friction with current user options. > > > > >>>>> > > > > >>>>> The above things are solvable, though not perfect or most user > > > > >> friendly. > > > > >>>>> > > > > >>>>> Let's take a look at the reading side. The keyed kafka table > > > contains > > > > >> two > > > > >>>>> kinds of messages: upsert or deletion. What upsert > > > > >>>>> means is "If the key doesn't exist yet, it's an insert record. > > > > >> Otherwise > > > > >>>>> it's an update record". For the sake of correctness or > > > > >>>>> simplicity, the Flink SQL engine also needs such information. > If > > we > > > > >>>>> interpret all messages to "update record", some queries or > > > > >>>>> operators may not work properly. It's weird to see an update > > record > > > > but > > > > >>>> you > > > > >>>>> haven't seen the insert record before. > > > > >>>>> > > > > >>>>> So what Flink should do is after reading out the records from > > such > > > > >> table, > > > > >>>>> it needs to create a state to record which messages have > > > > >>>>> been seen and then generate the correct row type > correspondingly. > > > > This > > > > >>>> kind > > > > >>>>> of couples the state and the data of the message > > > > >>>>> queue, and it also creates conflicts with current kafka > > connector. > > > > >>>>> > > > > >>>>> Think about if users suspend a running job (which contains some > > > > reading > > > > >>>>> state now), and then change the start offset of the reader. > > > > >>>>> By changing the reading offset, it actually change the whole > > story > > > of > > > > >>>>> "which records should be insert messages and which records > > > > >>>>> should be update messages). And it will also make Flink to deal > > > with > > > > >>>>> another weird situation that it might receive a deletion > > > > >>>>> on a non existing message. > > > > >>>>> > > > > >>>>> We were unsatisfied with all the frictions and conflicts it > will > > > > create > > > > >>>> if > > > > >>>>> we enable the "upsert & deletion" support to the current kafka > > > > >>>>> connector. And later we begin to realize that we shouldn't > treat > > it > > > > as > > > > >> a > > > > >>>>> normal message queue, but should treat it as a changing keyed > > > > >>>>> table. We should be able to always get the whole data of such > > table > > > > (by > > > > >>>>> disabling the start offset option) and we can also read the > > > > >>>>> changelog out of such table. It's like a HBase table with > binlog > > > > >> support > > > > >>>>> but doesn't have random access capability (which can be > fulfilled > > > > >>>>> by Flink's state). > > > > >>>>> > > > > >>>>> So our intention was instead of telling and persuading users > what > > > > kind > > > > >> of > > > > >>>>> options they should or should not use by extending > > > > >>>>> current kafka connector when enable upsert support, we are > > actually > > > > >>>> create > > > > >>>>> a whole new and different connector that has total > > > > >>>>> different abstractions in SQL layer, and should be treated > > totally > > > > >>>>> different with current kafka connector. > > > > >>>>> > > > > >>>>> Hope this can clarify some of the concerns. > > > > >>>>> > > > > >>>>> Best, > > > > >>>>> Kurt > > > > >>>>> > > > > >>>>> > > > > >>>>> On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang < > [email protected] > > > > > > > >> wrote: > > > > >>>>> > > > > >>>>>> Hi devs, > > > > >>>>>> > > > > >>>>>> As many people are still confused about the difference option > > > > >>>> behaviours > > > > >>>>>> between the Kafka connector and KTable connector, Jark and I > > list > > > > the > > > > >>>>>> differences in the doc[1]. > > > > >>>>>> > > > > >>>>>> Best, > > > > >>>>>> Shengkai > > > > >>>>>> > > > > >>>>>> [1] > > > > >>>>>> > > > > >>>>>> > > > > >>>>> > > > > >>>> > > > > >> > > > > > > > > > > https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit > > > > >>>>>> > > > > >>>>>> Shengkai Fang <[email protected]> 于2020年10月20日周二 下午12:05写道: > > > > >>>>>> > > > > >>>>>>> Hi Konstantin, > > > > >>>>>>> > > > > >>>>>>> Thanks for your reply. > > > > >>>>>>> > > > > >>>>>>>> It uses the "kafka" connector and does not specify a primary > > > key. > > > > >>>>>>> The dimensional table `users` is a ktable connector and we > can > > > > >>>> specify > > > > >>>>>> the > > > > >>>>>>> pk on the KTable. > > > > >>>>>>> > > > > >>>>>>>> Will it possible to use a "ktable" as a dimensional table in > > > > >>>> FLIP-132 > > > > >>>>>>> Yes. We can specify the watermark on the KTable and it can be > > > used > > > > >>>> as a > > > > >>>>>>> dimension table in temporal join. > > > > >>>>>>> > > > > >>>>>>>> Introduce a new connector vs introduce a new property > > > > >>>>>>> The main reason behind is that the KTable connector almost > has > > no > > > > >>>>> common > > > > >>>>>>> options with the Kafka connector. The options that can be > > reused > > > by > > > > >>>>>> KTable > > > > >>>>>>> connectors are 'topic', 'properties.bootstrap.servers' and > > > > >>>>>>> 'value.fields-include' . We can't set cdc format for > > 'key.format' > > > > and > > > > >>>>>>> 'value.format' in KTable connector now, which is available > in > > > > Kafka > > > > >>>>>>> connector. Considering the difference between the options we > > can > > > > use, > > > > >>>>>> it's > > > > >>>>>>> more suitable to introduce an another connector rather than a > > > > >>>> property. > > > > >>>>>>> > > > > >>>>>>> We are also fine to use "compacted-kafka" as the name of the > > new > > > > >>>>>>> connector. What do you think? > > > > >>>>>>> > > > > >>>>>>> Best, > > > > >>>>>>> Shengkai > > > > >>>>>>> > > > > >>>>>>> Konstantin Knauf <[email protected]> 于2020年10月19日周一 > 下午10:15写道: > > > > >>>>>>> > > > > >>>>>>>> Hi Shengkai, > > > > >>>>>>>> > > > > >>>>>>>> Thank you for driving this effort. I believe this a very > > > important > > > > >>>>>> feature > > > > >>>>>>>> for many users who use Kafka and Flink SQL together. A few > > > > questions > > > > >>>>> and > > > > >>>>>>>> thoughts: > > > > >>>>>>>> > > > > >>>>>>>> * Is your example "Use KTable as a reference/dimension > table" > > > > >>>> correct? > > > > >>>>>> It > > > > >>>>>>>> uses the "kafka" connector and does not specify a primary > key. > > > > >>>>>>>> > > > > >>>>>>>> * Will it be possible to use a "ktable" table directly as a > > > > >>>>> dimensional > > > > >>>>>>>> table in temporal join (*based on event time*) (FLIP-132)? > > This > > > is > > > > >>>> not > > > > >>>>>>>> completely clear to me from the FLIP. > > > > >>>>>>>> > > > > >>>>>>>> * I'd personally prefer not to introduce a new connector and > > > > instead > > > > >>>>> to > > > > >>>>>>>> extend the Kafka connector. We could add an additional > > property > > > > >>>>>>>> "compacted" > > > > >>>>>>>> = "true"|"false". If it is set to "true", we can add > > additional > > > > >>>>>> validation > > > > >>>>>>>> logic (e.g. "scan.startup.mode" can not be set, primary key > > > > >>>> required, > > > > >>>>>>>> etc.). If we stick to a separate connector I'd not call it > > > > "ktable", > > > > >>>>> but > > > > >>>>>>>> rather "compacted-kafka" or similar. KTable seems to carry > > more > > > > >>>>> implicit > > > > >>>>>>>> meaning than we want to imply here. > > > > >>>>>>>> > > > > >>>>>>>> * I agree that this is not a bounded source. If we want to > > > > support a > > > > >>>>>>>> bounded mode, this is an orthogonal concern that also > applies > > to > > > > >>>> other > > > > >>>>>>>> unbounded sources. > > > > >>>>>>>> > > > > >>>>>>>> Best, > > > > >>>>>>>> > > > > >>>>>>>> Konstantin > > > > >>>>>>>> > > > > >>>>>>>> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <[email protected]> > > > wrote: > > > > >>>>>>>> > > > > >>>>>>>>> Hi Danny, > > > > >>>>>>>>> > > > > >>>>>>>>> First of all, we didn't introduce any concepts from KSQL > > (e.g. > > > > >>>>> Stream > > > > >>>>>> vs > > > > >>>>>>>>> Table notion). > > > > >>>>>>>>> This new connector will produce a changelog stream, so it's > > > still > > > > >>>> a > > > > >>>>>>>> dynamic > > > > >>>>>>>>> table and doesn't conflict with Flink core concepts. > > > > >>>>>>>>> > > > > >>>>>>>>> The "ktable" is just a connector name, we can also call it > > > > >>>>>>>>> "compacted-kafka" or something else. > > > > >>>>>>>>> Calling it "ktable" is just because KSQL users can migrate > to > > > > >>>> Flink > > > > >>>>>> SQL > > > > >>>>>>>>> easily. > > > > >>>>>>>>> > > > > >>>>>>>>> Regarding to why introducing a new connector vs a new > > property > > > in > > > > >>>>>>>> existing > > > > >>>>>>>>> kafka connector: > > > > >>>>>>>>> > > > > >>>>>>>>> I think the main reason is that we want to have a clear > > > > separation > > > > >>>>> for > > > > >>>>>>>> such > > > > >>>>>>>>> two use cases, because they are very different. > > > > >>>>>>>>> We also listed reasons in the FLIP, including: > > > > >>>>>>>>> > > > > >>>>>>>>> 1) It's hard to explain what's the behavior when users > > specify > > > > the > > > > >>>>>> start > > > > >>>>>>>>> offset from a middle position (e.g. how to process non > exist > > > > >>>> delete > > > > >>>>>>>>> events). > > > > >>>>>>>>> It's dangerous if users do that. So we don't provide > > the > > > > >>>> offset > > > > >>>>>>>> option > > > > >>>>>>>>> in the new connector at the moment. > > > > >>>>>>>>> 2) It's a different perspective/abstraction on the same > kafka > > > > >>>> topic > > > > >>>>>>>> (append > > > > >>>>>>>>> vs. upsert). It would be easier to understand if we can > > > separate > > > > >>>>> them > > > > >>>>>>>>> instead of mixing them in one connector. The new > > > connector > > > > >>>>>> requires > > > > >>>>>>>>> hash sink partitioner, primary key declared, regular > format. > > > > >>>>>>>>> If we mix them in one connector, it might be > confusing > > > how > > > > to > > > > >>>>> use > > > > >>>>>>>> the > > > > >>>>>>>>> options correctly. > > > > >>>>>>>>> 3) The semantic of the KTable connector is just the same as > > > > KTable > > > > >>>>> in > > > > >>>>>>>> Kafka > > > > >>>>>>>>> Stream. So it's very handy for Kafka Stream and KSQL users. > > > > >>>>>>>>> We have seen several questions in the mailing list > > asking > > > > how > > > > >>>> to > > > > >>>>>>>> model > > > > >>>>>>>>> a KTable and how to join a KTable in Flink SQL. > > > > >>>>>>>>> > > > > >>>>>>>>> Best, > > > > >>>>>>>>> Jark > > > > >>>>>>>>> > > > > >>>>>>>>> On Mon, 19 Oct 2020 at 19:53, Jark Wu <[email protected]> > > > wrote: > > > > >>>>>>>>> > > > > >>>>>>>>>> Hi Jingsong, > > > > >>>>>>>>>> > > > > >>>>>>>>>> As the FLIP describes, "KTable connector produces a > > changelog > > > > >>>>>> stream, > > > > >>>>>>>>>> where each data record represents an update or delete > > event.". > > > > >>>>>>>>>> Therefore, a ktable source is an unbounded stream source. > > > > >>>>> Selecting > > > > >>>>>> a > > > > >>>>>>>>>> ktable source is similar to selecting a kafka source with > > > > >>>>>>>> debezium-json > > > > >>>>>>>>>> format > > > > >>>>>>>>>> that it never ends and the results are continuously > updated. > > > > >>>>>>>>>> > > > > >>>>>>>>>> It's possible to have a bounded ktable source in the > future, > > > for > > > > >>>>>>>> example, > > > > >>>>>>>>>> add an option 'bounded=true' or 'end-offset=xxx'. > > > > >>>>>>>>>> In this way, the ktable will produce a bounded changelog > > > stream. > > > > >>>>>>>>>> So I think this can be a compatible feature in the future. > > > > >>>>>>>>>> > > > > >>>>>>>>>> I don't think we should associate with ksql related > > concepts. > > > > >>>>>>>> Actually, > > > > >>>>>>>>> we > > > > >>>>>>>>>> didn't introduce any concepts from KSQL (e.g. Stream vs > > Table > > > > >>>>>> notion). > > > > >>>>>>>>>> The "ktable" is just a connector name, we can also call it > > > > >>>>>>>>>> "compacted-kafka" or something else. > > > > >>>>>>>>>> Calling it "ktable" is just because KSQL users can migrate > > to > > > > >>>>> Flink > > > > >>>>>>>> SQL > > > > >>>>>>>>>> easily. > > > > >>>>>>>>>> > > > > >>>>>>>>>> Regarding the "value.fields-include", this is an option > > > > >>>> introduced > > > > >>>>>> in > > > > >>>>>>>>>> FLIP-107 for Kafka connector. > > > > >>>>>>>>>> I think we should keep the same behavior with the Kafka > > > > >>>> connector. > > > > >>>>>> I'm > > > > >>>>>>>>> not > > > > >>>>>>>>>> sure what's the default behavior of KSQL. > > > > >>>>>>>>>> But I guess it also stores the keys in value from this > > example > > > > >>>>> docs > > > > >>>>>>>> (see > > > > >>>>>>>>>> the "users_original" table) [1]. > > > > >>>>>>>>>> > > > > >>>>>>>>>> Best, > > > > >>>>>>>>>> Jark > > > > >>>>>>>>>> > > > > >>>>>>>>>> [1]: > > > > >>>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>> > > > > >>>>>> > > > > >>>>> > > > > >>>> > > > > >> > > > > > > > > > > https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> On Mon, 19 Oct 2020 at 18:17, Danny Chan < > > > [email protected]> > > > > >>>>>>>> wrote: > > > > >>>>>>>>>> > > > > >>>>>>>>>>> The concept seems conflicts with the Flink abstraction > > > “dynamic > > > > >>>>>>>> table”, > > > > >>>>>>>>>>> in Flink we see both “stream” and “table” as a dynamic > > table, > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> I think we should make clear first how to express stream > > and > > > > >>>>> table > > > > >>>>>>>>>>> specific features on one “dynamic table”, > > > > >>>>>>>>>>> it is more natural for KSQL because KSQL takes stream and > > > table > > > > >>>>> as > > > > >>>>>>>>>>> different abstractions for representing collections. In > > KSQL, > > > > >>>>> only > > > > >>>>>>>>> table is > > > > >>>>>>>>>>> mutable and can have a primary key. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Does this connector belongs to the “table” scope or > > “stream” > > > > >>>>> scope > > > > >>>>>> ? > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Some of the concepts (such as the primary key on stream) > > > should > > > > >>>>> be > > > > >>>>>>>>>>> suitable for all the connectors, not just Kafka, > Shouldn’t > > > this > > > > >>>>> be > > > > >>>>>> an > > > > >>>>>>>>>>> extension of existing Kafka connector instead of a > totally > > > new > > > > >>>>>>>>> connector ? > > > > >>>>>>>>>>> What about the other connectors ? > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Because this touches the core abstraction of Flink, we > > better > > > > >>>>> have > > > > >>>>>> a > > > > >>>>>>>>>>> top-down overall design, following the KSQL directly is > not > > > the > > > > >>>>>>>> answer. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> P.S. For the source > > > > >>>>>>>>>>>> Shouldn’t this be an extension of existing Kafka > connector > > > > >>>>>> instead > > > > >>>>>>>> of > > > > >>>>>>>>> a > > > > >>>>>>>>>>> totally new connector ? > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> How could we achieve that (e.g. set up the parallelism > > > > >>>>> correctly) ? > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Best, > > > > >>>>>>>>>>> Danny Chan > > > > >>>>>>>>>>> 在 2020年10月19日 +0800 PM5:17,Jingsong Li < > > > [email protected] > > > > >>>>>>> ,写道: > > > > >>>>>>>>>>>> Thanks Shengkai for your proposal. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> +1 for this feature. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> Future Work: Support bounded KTable source > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> I don't think it should be a future work, I think it is > > one > > > > >>>> of > > > > >>>>>> the > > > > >>>>>>>>>>>> important concepts of this FLIP. We need to understand > it > > > > >>>> now. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Intuitively, a ktable in my opinion is a bounded table > > > rather > > > > >>>>>> than > > > > >>>>>>>> a > > > > >>>>>>>>>>>> stream, so select should produce a bounded table by > > default. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> I think we can list Kafka related knowledge, because the > > > word > > > > >>>>>>>> `ktable` > > > > >>>>>>>>>>> is > > > > >>>>>>>>>>>> easy to associate with ksql related concepts. (If > > possible, > > > > >>>>> it's > > > > >>>>>>>>> better > > > > >>>>>>>>>>> to > > > > >>>>>>>>>>>> unify with it) > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> What do you think? > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> value.fields-include > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> What about the default behavior of KSQL? > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Best, > > > > >>>>>>>>>>>> Jingsong > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang < > > > > >>>>> [email protected] > > > > >>>>>>> > > > > >>>>>>>>>>> wrote: > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> Hi, devs. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Jark and I want to start a new FLIP to introduce the > > KTable > > > > >>>>>>>>>>> connector. The > > > > >>>>>>>>>>>>> KTable is a shortcut of "Kafka Table", it also has the > > same > > > > >>>>>>>>> semantics > > > > >>>>>>>>>>> with > > > > >>>>>>>>>>>>> the KTable notion in Kafka Stream. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> FLIP-149: > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>> > > > > >>>>>> > > > > >>>>> > > > > >>>> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Currently many users have expressed their needs for the > > > > >>>>> upsert > > > > >>>>>>>> Kafka > > > > >>>>>>>>>>> by > > > > >>>>>>>>>>>>> mail lists and issues. The KTable connector has several > > > > >>>>>> benefits > > > > >>>>>>>> for > > > > >>>>>>>>>>> users: > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 1. Users are able to interpret a compacted Kafka Topic > as > > > > >>>> an > > > > >>>>>>>> upsert > > > > >>>>>>>>>>> stream > > > > >>>>>>>>>>>>> in Apache Flink. And also be able to write a changelog > > > > >>>> stream > > > > >>>>>> to > > > > >>>>>>>>> Kafka > > > > >>>>>>>>>>>>> (into a compacted topic). > > > > >>>>>>>>>>>>> 2. As a part of the real time pipeline, store join or > > > > >>>>> aggregate > > > > >>>>>>>>>>> result (may > > > > >>>>>>>>>>>>> contain updates) into a Kafka topic for further > > > > >>>> calculation; > > > > >>>>>>>>>>>>> 3. The semantic of the KTable connector is just the > same > > as > > > > >>>>>>>> KTable > > > > >>>>>>>>> in > > > > >>>>>>>>>>> Kafka > > > > >>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and KSQL > > users. > > > > >>>>> We > > > > >>>>>>>> have > > > > >>>>>>>>>>> seen > > > > >>>>>>>>>>>>> several questions in the mailing list asking how to > > model a > > > > >>>>>>>> KTable > > > > >>>>>>>>>>> and how > > > > >>>>>>>>>>>>> to join a KTable in Flink SQL. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> We hope it can expand the usage of the Flink with > Kafka. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> I'm looking forward to your feedback. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Best, > > > > >>>>>>>>>>>>> Shengkai > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> -- > > > > >>>>>>>>>>>> Best, Jingsong Lee > > > > >>>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> -- > > > > >>>>>>>> > > > > >>>>>>>> Konstantin Knauf > > > > >>>>>>>> > > > > >>>>>>>> https://twitter.com/snntrable > > > > >>>>>>>> > > > > >>>>>>>> https://github.com/knaufk > > > > >>>>>>>> > > > > >>>>>>> > > > > >>>>>> > > > > >>>>> > > > > >>>> > > > > >>>> > > > > >>>> -- > > > > >>>> > > > > >>>> Konstantin Knauf > > > > >>>> > > > > >>>> https://twitter.com/snntrable > > > > >>>> > > > > >>>> https://github.com/knaufk > > > > >>>> > > > > >>> > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > > -- > > > > > > Seth Wiesman | Solutions Architect > > > > > > +1 314 387 1463 > > > > > > <https://www.ververica.com/> > > > > > > Follow us @VervericaData > > > > > > -- > > > > > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > > > Conference > > > > > > Stream Processing | Event Driven | Real Time > > > > > >
