Thanks Shengkai! +1 to start voting.
Best, Jark On Fri, 23 Oct 2020 at 15:02, Shengkai Fang <fskm...@gmail.com> wrote: > Add one more message, I have already updated the FLIP[1]. > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector > > Shengkai Fang <fskm...@gmail.com> 于2020年10月23日周五 下午2:55写道: > > > Hi, all. > > It seems we have reached a consensus on the FLIP. If no one has other > > objections, I would like to start the vote for FLIP-149. > > > > Best, > > Shengkai > > > > Jingsong Li <jingsongl...@gmail.com> 于2020年10月23日周五 下午2:25写道: > > > >> Thanks for explanation, > >> > >> I am OK for `upsert`. Yes, Its concept has been accepted by many > systems. > >> > >> Best, > >> Jingsong > >> > >> On Fri, Oct 23, 2020 at 12:38 PM Jark Wu <imj...@gmail.com> wrote: > >> > >> > Hi Timo, > >> > > >> > I have some concerns about `kafka-cdc`, > >> > 1) cdc is an abbreviation of Change Data Capture which is commonly > used > >> for > >> > databases, not for message queues. > >> > 2) usually, cdc produces full content of changelog, including > >> > UPDATE_BEFORE, however "upsert kafka" doesn't > >> > 3) `kafka-cdc` sounds like a natively support for `debezium-json` > >> format, > >> > however, it is not and even we don't want > >> > "upsert kafka" to support "debezium-json" > >> > > >> > > >> > Hi Jingsong, > >> > > >> > I think the terminology of "upsert" is fine, because Kafka also uses > >> > "upsert" to define such behavior in their official documentation [1]: > >> > > >> > > a data record in a changelog stream is interpreted as an UPSERT aka > >> > INSERT/UPDATE > >> > > >> > Materialize uses the "UPSERT" keyword to define such behavior too [2]. > >> > Users have been requesting such feature using "upsert kafka" > >> terminology in > >> > user mailing lists [3][4]. > >> > Many other systems support "UPSERT" statement natively, such as impala > >> [5], > >> > SAP [6], Phoenix [7], Oracle NoSQL [8], etc.. > >> > > >> > Therefore, I think we don't need to be afraid of introducing "upsert" > >> > terminology, it is widely accepted by users. > >> > > >> > Best, > >> > Jark > >> > > >> > > >> > [1]: > >> > > >> > > >> > https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable > >> > [2]: > >> > > >> > > >> > https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic > >> > [3]: > >> > > >> > > >> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-materialized-upsert-tables-td18482.html#a18503 > >> > [4]: > >> > > >> > > >> > http://apache-flink.147419.n8.nabble.com/Kafka-Sink-AppendStreamTableSink-doesn-t-support-consuming-update-changes-td5959.html > >> > [5]: > >> https://impala.apache.org/docs/build/html/topics/impala_upsert.html > >> > [6]: > >> > > >> > > >> > https://help.sap.com/viewer/7c78579ce9b14a669c1f3295b0d8ca16/Cloud/en-US/ea8b6773be584203bcd99da76844c5ed.html > >> > [7]: https://phoenix.apache.org/atomic_upsert.html > >> > [8]: > >> > > >> > > >> > https://docs.oracle.com/en/database/other-databases/nosql-database/18.3/sqlfornosql/adding-table-rows-using-insert-and-upsert-statements.html > >> > > >> > On Fri, 23 Oct 2020 at 10:36, Jingsong Li <jingsongl...@gmail.com> > >> wrote: > >> > > >> > > The `kafka-cdc` looks good to me. > >> > > We can even give options to indicate whether to turn on compact, > >> because > >> > > compact is just an optimization? > >> > > > >> > > - ktable let me think about KSQL. > >> > > - kafka-compacted it is not just compacted, more than that, it still > >> has > >> > > the ability of CDC > >> > > - upsert-kafka , upsert is back, and I don't really want to see it > >> again > >> > > since we have CDC > >> > > > >> > > Best, > >> > > Jingsong > >> > > > >> > > On Fri, Oct 23, 2020 at 2:21 AM Timo Walther <twal...@apache.org> > >> wrote: > >> > > > >> > > > Hi Jark, > >> > > > > >> > > > I would be fine with `connector=upsert-kafka`. Another idea would > >> be to > >> > > > align the name to other available Flink connectors [1]: > >> > > > > >> > > > `connector=kafka-cdc`. > >> > > > > >> > > > Regards, > >> > > > Timo > >> > > > > >> > > > [1] https://github.com/ververica/flink-cdc-connectors > >> > > > > >> > > > On 22.10.20 17:17, Jark Wu wrote: > >> > > > > Another name is "connector=upsert-kafka', I think this can solve > >> > Timo's > >> > > > > concern on the "compacted" word. > >> > > > > > >> > > > > Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify > >> such > >> > > > kafka > >> > > > > sources. > >> > > > > I think "upsert" is a well-known terminology widely used in many > >> > > systems > >> > > > > and matches the > >> > > > > behavior of how we handle the kafka messages. > >> > > > > > >> > > > > What do you think? > >> > > > > > >> > > > > Best, > >> > > > > Jark > >> > > > > > >> > > > > [1]: > >> > > > > > >> > > > > >> > > > >> > > >> > https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > On Thu, 22 Oct 2020 at 22:53, Kurt Young <ykt...@gmail.com> > >> wrote: > >> > > > > > >> > > > >> Good validation messages can't solve the broken user > experience, > >> > > > especially > >> > > > >> that > >> > > > >> such update mode option will implicitly make half of current > >> kafka > >> > > > options > >> > > > >> invalid or doesn't > >> > > > >> make sense. > >> > > > >> > >> > > > >> Best, > >> > > > >> Kurt > >> > > > >> > >> > > > >> > >> > > > >> On Thu, Oct 22, 2020 at 10:31 PM Jark Wu <imj...@gmail.com> > >> wrote: > >> > > > >> > >> > > > >>> Hi Timo, Seth, > >> > > > >>> > >> > > > >>> The default value "inserting" of "mode" might be not suitable, > >> > > > >>> because "debezium-json" emits changelog messages which include > >> > > updates. > >> > > > >>> > >> > > > >>> On Thu, 22 Oct 2020 at 22:10, Seth Wiesman < > s...@ververica.com> > >> > > wrote: > >> > > > >>> > >> > > > >>>> +1 for supporting upsert results into Kafka. > >> > > > >>>> > >> > > > >>>> I have no comments on the implementation details. > >> > > > >>>> > >> > > > >>>> As far as configuration goes, I tend to favor Timo's option > >> where > >> > we > >> > > > >> add > >> > > > >>> a > >> > > > >>>> "mode" property to the existing Kafka table with default > value > >> > > > >>> "inserting". > >> > > > >>>> If the mode is set to "updating" then the validation changes > to > >> > the > >> > > > new > >> > > > >>>> requirements. I personally find it more intuitive than a > >> seperate > >> > > > >>>> connector, my fear is users won't understand its the same > >> physical > >> > > > >> kafka > >> > > > >>>> sink under the hood and it will lead to other confusion like > >> does > >> > it > >> > > > >>> offer > >> > > > >>>> the same persistence guarantees? I think we are capable of > >> adding > >> > > good > >> > > > >>>> valdiation messaging that solves Jark and Kurts concerns. > >> > > > >>>> > >> > > > >>>> > >> > > > >>>> On Thu, Oct 22, 2020 at 8:51 AM Timo Walther < > >> twal...@apache.org> > >> > > > >> wrote: > >> > > > >>>> > >> > > > >>>>> Hi Jark, > >> > > > >>>>> > >> > > > >>>>> "calling it "kafka-compacted" can even remind users to > enable > >> log > >> > > > >>>>> compaction" > >> > > > >>>>> > >> > > > >>>>> But sometimes users like to store a lineage of changes in > >> their > >> > > > >> topics. > >> > > > >>>>> Indepent of any ktable/kstream interpretation. > >> > > > >>>>> > >> > > > >>>>> I let the majority decide on this topic to not further block > >> this > >> > > > >>>>> effort. But we might find a better name like: > >> > > > >>>>> > >> > > > >>>>> connector = kafka > >> > > > >>>>> mode = updating/inserting > >> > > > >>>>> > >> > > > >>>>> OR > >> > > > >>>>> > >> > > > >>>>> connector = kafka-updating > >> > > > >>>>> > >> > > > >>>>> ... > >> > > > >>>>> > >> > > > >>>>> Regards, > >> > > > >>>>> Timo > >> > > > >>>>> > >> > > > >>>>> > >> > > > >>>>> > >> > > > >>>>> > >> > > > >>>>> On 22.10.20 15:24, Jark Wu wrote: > >> > > > >>>>>> Hi Timo, > >> > > > >>>>>> > >> > > > >>>>>> Thanks for your opinions. > >> > > > >>>>>> > >> > > > >>>>>> 1) Implementation > >> > > > >>>>>> We will have an stateful operator to generate INSERT and > >> > > > >>> UPDATE_BEFORE. > >> > > > >>>>>> This operator is keyby-ed (primary key as the shuffle key) > >> after > >> > > > >> the > >> > > > >>>>> source > >> > > > >>>>>> operator. > >> > > > >>>>>> The implementation of this operator is very similar to the > >> > > existing > >> > > > >>>>>> `DeduplicateKeepLastRowFunction`. > >> > > > >>>>>> The operator will register a value state using the primary > >> key > >> > > > >> fields > >> > > > >>>> as > >> > > > >>>>>> keys. > >> > > > >>>>>> When the value state is empty under current key, we will > emit > >> > > > >> INSERT > >> > > > >>>> for > >> > > > >>>>>> the input row. > >> > > > >>>>>> When the value state is not empty under current key, we > will > >> > emit > >> > > > >>>>>> UPDATE_BEFORE using the row in state, > >> > > > >>>>>> and emit UPDATE_AFTER using the input row. > >> > > > >>>>>> When the input row is DELETE, we will clear state and emit > >> > DELETE > >> > > > >>> row. > >> > > > >>>>>> > >> > > > >>>>>> 2) new option vs new connector > >> > > > >>>>>>> We recently simplified the table options to a minimum > >> amount of > >> > > > >>>>>> characters to be as concise as possible in the DDL. > >> > > > >>>>>> I think this is the reason why we want to introduce a new > >> > > > >> connector, > >> > > > >>>>>> because we can simplify the options in DDL. > >> > > > >>>>>> For example, if using a new option, the DDL may look like > >> this: > >> > > > >>>>>> > >> > > > >>>>>> CREATE TABLE users ( > >> > > > >>>>>> user_id BIGINT, > >> > > > >>>>>> user_name STRING, > >> > > > >>>>>> user_level STRING, > >> > > > >>>>>> region STRING, > >> > > > >>>>>> PRIMARY KEY (user_id) NOT ENFORCED > >> > > > >>>>>> ) WITH ( > >> > > > >>>>>> 'connector' = 'kafka', > >> > > > >>>>>> 'model' = 'table', > >> > > > >>>>>> 'topic' = 'pageviews_per_region', > >> > > > >>>>>> 'properties.bootstrap.servers' = '...', > >> > > > >>>>>> 'properties.group.id' = 'testGroup', > >> > > > >>>>>> 'scan.startup.mode' = 'earliest', > >> > > > >>>>>> 'key.format' = 'csv', > >> > > > >>>>>> 'key.fields' = 'user_id', > >> > > > >>>>>> 'value.format' = 'avro', > >> > > > >>>>>> 'sink.partitioner' = 'hash' > >> > > > >>>>>> ); > >> > > > >>>>>> > >> > > > >>>>>> If using a new connector, we can have a different default > >> value > >> > > for > >> > > > >>> the > >> > > > >>>>>> options and remove unnecessary options, > >> > > > >>>>>> the DDL can look like this which is much more concise: > >> > > > >>>>>> > >> > > > >>>>>> CREATE TABLE pageviews_per_region ( > >> > > > >>>>>> user_id BIGINT, > >> > > > >>>>>> user_name STRING, > >> > > > >>>>>> user_level STRING, > >> > > > >>>>>> region STRING, > >> > > > >>>>>> PRIMARY KEY (user_id) NOT ENFORCED > >> > > > >>>>>> ) WITH ( > >> > > > >>>>>> 'connector' = 'kafka-compacted', > >> > > > >>>>>> 'topic' = 'pageviews_per_region', > >> > > > >>>>>> 'properties.bootstrap.servers' = '...', > >> > > > >>>>>> 'key.format' = 'csv', > >> > > > >>>>>> 'value.format' = 'avro' > >> > > > >>>>>> ); > >> > > > >>>>>> > >> > > > >>>>>>> When people read `connector=kafka-compacted` they might > not > >> > know > >> > > > >>> that > >> > > > >>>> it > >> > > > >>>>>>> has ktable semantics. You don't need to enable log > >> compaction > >> > in > >> > > > >>> order > >> > > > >>>>>>> to use a KTable as far as I know. > >> > > > >>>>>> We don't need to let users know it has ktable semantics, as > >> > > > >>> Konstantin > >> > > > >>>>>> mentioned this may carry more implicit > >> > > > >>>>>> meaning than we want to imply here. I agree users don't > need > >> to > >> > > > >>> enable > >> > > > >>>>> log > >> > > > >>>>>> compaction, but from the production perspective, > >> > > > >>>>>> log compaction should always be enabled if it is used in > this > >> > > > >>> purpose. > >> > > > >>>>>> Calling it "kafka-compacted" can even remind users to > enable > >> log > >> > > > >>>>> compaction. > >> > > > >>>>>> > >> > > > >>>>>> I don't agree to introduce "model = table/stream" option, > or > >> > > > >>>>>> "connector=kafka-table", > >> > > > >>>>>> because this means we are introducing Table vs Stream > concept > >> > from > >> > > > >>>> KSQL. > >> > > > >>>>>> However, we don't have such top-level concept in Flink SQL > >> now, > >> > > > >> this > >> > > > >>>> will > >> > > > >>>>>> further confuse users. > >> > > > >>>>>> In Flink SQL, all the things are STREAM, the differences > are > >> > > > >> whether > >> > > > >>> it > >> > > > >>>>> is > >> > > > >>>>>> bounded or unbounded, > >> > > > >>>>>> whether it is insert-only or changelog. > >> > > > >>>>>> > >> > > > >>>>>> > >> > > > >>>>>> Best, > >> > > > >>>>>> Jark > >> > > > >>>>>> > >> > > > >>>>>> > >> > > > >>>>>> On Thu, 22 Oct 2020 at 20:39, Timo Walther < > >> twal...@apache.org> > >> > > > >>> wrote: > >> > > > >>>>>> > >> > > > >>>>>>> Hi Shengkai, Hi Jark, > >> > > > >>>>>>> > >> > > > >>>>>>> thanks for this great proposal. It is time to finally > >> connect > >> > the > >> > > > >>>>>>> changelog processor with a compacted Kafka topic. > >> > > > >>>>>>> > >> > > > >>>>>>> "The operator will produce INSERT rows, or additionally > >> > generate > >> > > > >>>>>>> UPDATE_BEFORE rows for the previous image, or produce > DELETE > >> > rows > >> > > > >>> with > >> > > > >>>>>>> all columns filled with values." > >> > > > >>>>>>> > >> > > > >>>>>>> Could you elaborate a bit on the implementation details in > >> the > >> > > > >> FLIP? > >> > > > >>>> How > >> > > > >>>>>>> are UPDATE_BEFOREs are generated. How much state is > >> required to > >> > > > >>>> perform > >> > > > >>>>>>> this operation. > >> > > > >>>>>>> > >> > > > >>>>>>> From a conceptual and semantical point of view, I'm > fine > >> > with > >> > > > >> the > >> > > > >>>>>>> proposal. But I would like to share my opinion about how > we > >> > > expose > >> > > > >>>> this > >> > > > >>>>>>> feature: > >> > > > >>>>>>> > >> > > > >>>>>>> ktable vs kafka-compacted > >> > > > >>>>>>> > >> > > > >>>>>>> I'm against having an additional connector like `ktable` > or > >> > > > >>>>>>> `kafka-compacted`. We recently simplified the table > options > >> to > >> > a > >> > > > >>>> minimum > >> > > > >>>>>>> amount of characters to be as concise as possible in the > >> DDL. > >> > > > >>>> Therefore, > >> > > > >>>>>>> I would keep the `connector=kafka` and introduce an > >> additional > >> > > > >>> option. > >> > > > >>>>>>> Because a user wants to read "from Kafka". And the "how" > >> should > >> > > be > >> > > > >>>>>>> determined in the lower options. > >> > > > >>>>>>> > >> > > > >>>>>>> When people read `connector=ktable` they might not know > that > >> > this > >> > > > >> is > >> > > > >>>>>>> Kafka. Or they wonder where `kstream` is? > >> > > > >>>>>>> > >> > > > >>>>>>> When people read `connector=kafka-compacted` they might > not > >> > know > >> > > > >>> that > >> > > > >>>> it > >> > > > >>>>>>> has ktable semantics. You don't need to enable log > >> compaction > >> > in > >> > > > >>> order > >> > > > >>>>>>> to use a KTable as far as I know. Log compaction and table > >> > > > >> semantics > >> > > > >>>> are > >> > > > >>>>>>> orthogonal topics. > >> > > > >>>>>>> > >> > > > >>>>>>> In the end we will need 3 types of information when > >> declaring a > >> > > > >>> Kafka > >> > > > >>>>>>> connector: > >> > > > >>>>>>> > >> > > > >>>>>>> CREATE TABLE ... WITH ( > >> > > > >>>>>>> connector=kafka -- Some information about the > >> > > connector > >> > > > >>>>>>> end-offset = XXXX -- Some information about the > >> > > > >> boundedness > >> > > > >>>>>>> model = table/stream -- Some information about > >> > > > >> interpretation > >> > > > >>>>>>> ) > >> > > > >>>>>>> > >> > > > >>>>>>> > >> > > > >>>>>>> We can still apply all the constraints mentioned in the > >> FLIP. > >> > > When > >> > > > >>>>>>> `model` is set to `table`. > >> > > > >>>>>>> > >> > > > >>>>>>> What do you think? > >> > > > >>>>>>> > >> > > > >>>>>>> Regards, > >> > > > >>>>>>> Timo > >> > > > >>>>>>> > >> > > > >>>>>>> > >> > > > >>>>>>> On 21.10.20 14:19, Jark Wu wrote: > >> > > > >>>>>>>> Hi, > >> > > > >>>>>>>> > >> > > > >>>>>>>> IMO, if we are going to mix them in one connector, > >> > > > >>>>>>>> 1) either users need to set some options to a specific > >> value > >> > > > >>>>> explicitly, > >> > > > >>>>>>>> e.g. "scan.startup.mode=earliest", > "sink.partitioner=hash", > >> > > etc.. > >> > > > >>>>>>>> This makes the connector awkward to use. Users may face > to > >> fix > >> > > > >>>> options > >> > > > >>>>>>> one > >> > > > >>>>>>>> by one according to the exception. > >> > > > >>>>>>>> Besides, in the future, it is still possible to use > >> > > > >>>>>>>> "sink.partitioner=fixed" (reduce network cost) if users > are > >> > > aware > >> > > > >>> of > >> > > > >>>>>>>> the partition routing, > >> > > > >>>>>>>> however, it's error-prone to have "fixed" as default for > >> > > > >> compacted > >> > > > >>>>> mode. > >> > > > >>>>>>>> > >> > > > >>>>>>>> 2) or make those options a different default value when > >> > > > >>>>> "compacted=true". > >> > > > >>>>>>>> This would be more confusing and unpredictable if the > >> default > >> > > > >> value > >> > > > >>>> of > >> > > > >>>>>>>> options will change according to other options. > >> > > > >>>>>>>> What happens if we have a third mode in the future? > >> > > > >>>>>>>> > >> > > > >>>>>>>> In terms of usage and options, it's very different from > the > >> > > > >>>>>>>> original "kafka" connector. > >> > > > >>>>>>>> It would be more handy to use and less fallible if > >> separating > >> > > > >> them > >> > > > >>>> into > >> > > > >>>>>>> two > >> > > > >>>>>>>> connectors. > >> > > > >>>>>>>> In the implementation layer, we can reuse code as much as > >> > > > >> possible. > >> > > > >>>>>>>> > >> > > > >>>>>>>> Therefore, I'm still +1 to have a new connector. > >> > > > >>>>>>>> The "kafka-compacted" name sounds good to me. > >> > > > >>>>>>>> > >> > > > >>>>>>>> Best, > >> > > > >>>>>>>> Jark > >> > > > >>>>>>>> > >> > > > >>>>>>>> > >> > > > >>>>>>>> On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf < > >> > > > >> kna...@apache.org> > >> > > > >>>>>>> wrote: > >> > > > >>>>>>>> > >> > > > >>>>>>>>> Hi Kurt, Hi Shengkai, > >> > > > >>>>>>>>> > >> > > > >>>>>>>>> thanks for answering my questions and the additional > >> > > > >>>> clarifications. I > >> > > > >>>>>>>>> don't have a strong opinion on whether to extend the > >> "kafka" > >> > > > >>>> connector > >> > > > >>>>>>> or > >> > > > >>>>>>>>> to introduce a new connector. So, from my perspective > feel > >> > free > >> > > > >> to > >> > > > >>>> go > >> > > > >>>>>>> with > >> > > > >>>>>>>>> a separate connector. If we do introduce a new > connector I > >> > > > >>> wouldn't > >> > > > >>>>>>> call it > >> > > > >>>>>>>>> "ktable" for aforementioned reasons (In addition, we > might > >> > > > >> suggest > >> > > > >>>>> that > >> > > > >>>>>>>>> there is also a "kstreams" connector for symmetry > >> reasons). I > >> > > > >>> don't > >> > > > >>>>>>> have a > >> > > > >>>>>>>>> good alternative name, though, maybe "kafka-compacted" > or > >> > > > >>>>>>>>> "compacted-kafka". > >> > > > >>>>>>>>> > >> > > > >>>>>>>>> Thanks, > >> > > > >>>>>>>>> > >> > > > >>>>>>>>> Konstantin > >> > > > >>>>>>>>> > >> > > > >>>>>>>>> > >> > > > >>>>>>>>> On Wed, Oct 21, 2020 at 4:43 AM Kurt Young < > >> ykt...@gmail.com > >> > > > >> > > > >>>> wrote: > >> > > > >>>>>>>>> > >> > > > >>>>>>>>>> Hi all, > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>> I want to describe the discussion process which drove > us > >> to > >> > > > >> have > >> > > > >>>> such > >> > > > >>>>>>>>>> conclusion, this might make some of > >> > > > >>>>>>>>>> the design choices easier to understand and keep > >> everyone on > >> > > > >> the > >> > > > >>>> same > >> > > > >>>>>>>>> page. > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>> Back to the motivation, what functionality do we want > to > >> > > > >> provide > >> > > > >>> in > >> > > > >>>>> the > >> > > > >>>>>>>>>> first place? We got a lot of feedback and > >> > > > >>>>>>>>>> questions from mailing lists that people want to write > >> > > > >>>>> Not-Insert-Only > >> > > > >>>>>>>>>> messages into kafka. They might be > >> > > > >>>>>>>>>> intentional or by accident, e.g. wrote an non-windowed > >> > > > >> aggregate > >> > > > >>>>> query > >> > > > >>>>>>> or > >> > > > >>>>>>>>>> non-windowed left outer join. And > >> > > > >>>>>>>>>> some users from KSQL world also asked about why Flink > >> didn't > >> > > > >>>> leverage > >> > > > >>>>>>> the > >> > > > >>>>>>>>>> Key concept of every kafka topic > >> > > > >>>>>>>>>> and make kafka as a dynamic changing keyed table. > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>> To work with kafka better, we were thinking to extend > the > >> > > > >>>>> functionality > >> > > > >>>>>>>>> of > >> > > > >>>>>>>>>> the current kafka connector by letting it > >> > > > >>>>>>>>>> accept updates and deletions. But due to the limitation > >> of > >> > > > >> kafka, > >> > > > >>>> the > >> > > > >>>>>>>>>> update has to be "update by key", aka a table > >> > > > >>>>>>>>>> with primary key. > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>> This introduces a couple of conflicts with current > kafka > >> > > > >> table's > >> > > > >>>>>>> options: > >> > > > >>>>>>>>>> 1. key.fields: as said above, we need the kafka table > to > >> > have > >> > > > >> the > >> > > > >>>>>>> primary > >> > > > >>>>>>>>>> key constraint. And users can also configure > >> > > > >>>>>>>>>> key.fields freely, this might cause friction. (Sure we > >> can > >> > do > >> > > > >>> some > >> > > > >>>>>>> sanity > >> > > > >>>>>>>>>> check on this but it also creates friction.) > >> > > > >>>>>>>>>> 2. sink.partitioner: to make the semantics right, we > >> need to > >> > > > >> make > >> > > > >>>>> sure > >> > > > >>>>>>>>> all > >> > > > >>>>>>>>>> the updates on the same key are written to > >> > > > >>>>>>>>>> the same kafka partition, such we should force to use a > >> hash > >> > > by > >> > > > >>> key > >> > > > >>>>>>>>>> partition inside such table. Again, this has conflicts > >> > > > >>>>>>>>>> and creates friction with current user options. > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>> The above things are solvable, though not perfect or > most > >> > user > >> > > > >>>>>>> friendly. > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>> Let's take a look at the reading side. The keyed kafka > >> table > >> > > > >>>> contains > >> > > > >>>>>>> two > >> > > > >>>>>>>>>> kinds of messages: upsert or deletion. What upsert > >> > > > >>>>>>>>>> means is "If the key doesn't exist yet, it's an insert > >> > record. > >> > > > >>>>>>> Otherwise > >> > > > >>>>>>>>>> it's an update record". For the sake of correctness or > >> > > > >>>>>>>>>> simplicity, the Flink SQL engine also needs such > >> > information. > >> > > > >> If > >> > > > >>> we > >> > > > >>>>>>>>>> interpret all messages to "update record", some queries > >> or > >> > > > >>>>>>>>>> operators may not work properly. It's weird to see an > >> update > >> > > > >>> record > >> > > > >>>>> but > >> > > > >>>>>>>>> you > >> > > > >>>>>>>>>> haven't seen the insert record before. > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>> So what Flink should do is after reading out the > records > >> > from > >> > > > >>> such > >> > > > >>>>>>> table, > >> > > > >>>>>>>>>> it needs to create a state to record which messages > have > >> > > > >>>>>>>>>> been seen and then generate the correct row type > >> > > > >> correspondingly. > >> > > > >>>>> This > >> > > > >>>>>>>>> kind > >> > > > >>>>>>>>>> of couples the state and the data of the message > >> > > > >>>>>>>>>> queue, and it also creates conflicts with current kafka > >> > > > >>> connector. > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>> Think about if users suspend a running job (which > >> contains > >> > > some > >> > > > >>>>> reading > >> > > > >>>>>>>>>> state now), and then change the start offset of the > >> reader. > >> > > > >>>>>>>>>> By changing the reading offset, it actually change the > >> whole > >> > > > >>> story > >> > > > >>>> of > >> > > > >>>>>>>>>> "which records should be insert messages and which > >> records > >> > > > >>>>>>>>>> should be update messages). And it will also make Flink > >> to > >> > > deal > >> > > > >>>> with > >> > > > >>>>>>>>>> another weird situation that it might receive a > deletion > >> > > > >>>>>>>>>> on a non existing message. > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>> We were unsatisfied with all the frictions and > conflicts > >> it > >> > > > >> will > >> > > > >>>>> create > >> > > > >>>>>>>>> if > >> > > > >>>>>>>>>> we enable the "upsert & deletion" support to the > current > >> > kafka > >> > > > >>>>>>>>>> connector. And later we begin to realize that we > >> shouldn't > >> > > > >> treat > >> > > > >>> it > >> > > > >>>>> as > >> > > > >>>>>>> a > >> > > > >>>>>>>>>> normal message queue, but should treat it as a changing > >> > keyed > >> > > > >>>>>>>>>> table. We should be able to always get the whole data > of > >> > such > >> > > > >>> table > >> > > > >>>>> (by > >> > > > >>>>>>>>>> disabling the start offset option) and we can also read > >> the > >> > > > >>>>>>>>>> changelog out of such table. It's like a HBase table > with > >> > > > >> binlog > >> > > > >>>>>>> support > >> > > > >>>>>>>>>> but doesn't have random access capability (which can be > >> > > > >> fulfilled > >> > > > >>>>>>>>>> by Flink's state). > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>> So our intention was instead of telling and persuading > >> users > >> > > > >> what > >> > > > >>>>> kind > >> > > > >>>>>>> of > >> > > > >>>>>>>>>> options they should or should not use by extending > >> > > > >>>>>>>>>> current kafka connector when enable upsert support, we > >> are > >> > > > >>> actually > >> > > > >>>>>>>>> create > >> > > > >>>>>>>>>> a whole new and different connector that has total > >> > > > >>>>>>>>>> different abstractions in SQL layer, and should be > >> treated > >> > > > >>> totally > >> > > > >>>>>>>>>> different with current kafka connector. > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>> Hope this can clarify some of the concerns. > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>> Best, > >> > > > >>>>>>>>>> Kurt > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>> On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang < > >> > > > >> fskm...@gmail.com > >> > > > >>>> > >> > > > >>>>>>> wrote: > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>>> Hi devs, > >> > > > >>>>>>>>>>> > >> > > > >>>>>>>>>>> As many people are still confused about the difference > >> > option > >> > > > >>>>>>>>> behaviours > >> > > > >>>>>>>>>>> between the Kafka connector and KTable connector, Jark > >> and > >> > I > >> > > > >>> list > >> > > > >>>>> the > >> > > > >>>>>>>>>>> differences in the doc[1]. > >> > > > >>>>>>>>>>> > >> > > > >>>>>>>>>>> Best, > >> > > > >>>>>>>>>>> Shengkai > >> > > > >>>>>>>>>>> > >> > > > >>>>>>>>>>> [1] > >> > > > >>>>>>>>>>> > >> > > > >>>>>>>>>>> > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>> > >> > > > >>>>>>> > >> > > > >>>>> > >> > > > >>>> > >> > > > >>> > >> > > > >> > >> > > > > >> > > > >> > > >> > https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit > >> > > > >>>>>>>>>>> > >> > > > >>>>>>>>>>> Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二 > >> > 下午12:05写道: > >> > > > >>>>>>>>>>> > >> > > > >>>>>>>>>>>> Hi Konstantin, > >> > > > >>>>>>>>>>>> > >> > > > >>>>>>>>>>>> Thanks for your reply. > >> > > > >>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> It uses the "kafka" connector and does not specify a > >> > > primary > >> > > > >>>> key. > >> > > > >>>>>>>>>>>> The dimensional table `users` is a ktable connector > >> and we > >> > > > >> can > >> > > > >>>>>>>>> specify > >> > > > >>>>>>>>>>> the > >> > > > >>>>>>>>>>>> pk on the KTable. > >> > > > >>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> Will it possible to use a "ktable" as a dimensional > >> table > >> > > in > >> > > > >>>>>>>>> FLIP-132 > >> > > > >>>>>>>>>>>> Yes. We can specify the watermark on the KTable and > it > >> can > >> > > be > >> > > > >>>> used > >> > > > >>>>>>>>> as a > >> > > > >>>>>>>>>>>> dimension table in temporal join. > >> > > > >>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> Introduce a new connector vs introduce a new > property > >> > > > >>>>>>>>>>>> The main reason behind is that the KTable connector > >> almost > >> > > > >> has > >> > > > >>> no > >> > > > >>>>>>>>>> common > >> > > > >>>>>>>>>>>> options with the Kafka connector. The options that > can > >> be > >> > > > >>> reused > >> > > > >>>> by > >> > > > >>>>>>>>>>> KTable > >> > > > >>>>>>>>>>>> connectors are 'topic', > 'properties.bootstrap.servers' > >> and > >> > > > >>>>>>>>>>>> 'value.fields-include' . We can't set cdc format for > >> > > > >>> 'key.format' > >> > > > >>>>> and > >> > > > >>>>>>>>>>>> 'value.format' in KTable connector now, which is > >> > available > >> > > > >> in > >> > > > >>>>> Kafka > >> > > > >>>>>>>>>>>> connector. Considering the difference between the > >> options > >> > we > >> > > > >>> can > >> > > > >>>>> use, > >> > > > >>>>>>>>>>> it's > >> > > > >>>>>>>>>>>> more suitable to introduce an another connector > rather > >> > than > >> > > a > >> > > > >>>>>>>>> property. > >> > > > >>>>>>>>>>>> > >> > > > >>>>>>>>>>>> We are also fine to use "compacted-kafka" as the name > >> of > >> > the > >> > > > >>> new > >> > > > >>>>>>>>>>>> connector. What do you think? > >> > > > >>>>>>>>>>>> > >> > > > >>>>>>>>>>>> Best, > >> > > > >>>>>>>>>>>> Shengkai > >> > > > >>>>>>>>>>>> > >> > > > >>>>>>>>>>>> Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一 > >> > > > >> 下午10:15写道: > >> > > > >>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> Hi Shengkai, > >> > > > >>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> Thank you for driving this effort. I believe this a > >> very > >> > > > >>>> important > >> > > > >>>>>>>>>>> feature > >> > > > >>>>>>>>>>>>> for many users who use Kafka and Flink SQL > together. A > >> > few > >> > > > >>>>> questions > >> > > > >>>>>>>>>> and > >> > > > >>>>>>>>>>>>> thoughts: > >> > > > >>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> * Is your example "Use KTable as a > reference/dimension > >> > > > >> table" > >> > > > >>>>>>>>> correct? > >> > > > >>>>>>>>>>> It > >> > > > >>>>>>>>>>>>> uses the "kafka" connector and does not specify a > >> primary > >> > > > >> key. > >> > > > >>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> * Will it be possible to use a "ktable" table > directly > >> > as a > >> > > > >>>>>>>>>> dimensional > >> > > > >>>>>>>>>>>>> table in temporal join (*based on event time*) > >> > (FLIP-132)? > >> > > > >>> This > >> > > > >>>> is > >> > > > >>>>>>>>> not > >> > > > >>>>>>>>>>>>> completely clear to me from the FLIP. > >> > > > >>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> * I'd personally prefer not to introduce a new > >> connector > >> > > and > >> > > > >>>>> instead > >> > > > >>>>>>>>>> to > >> > > > >>>>>>>>>>>>> extend the Kafka connector. We could add an > additional > >> > > > >>> property > >> > > > >>>>>>>>>>>>> "compacted" > >> > > > >>>>>>>>>>>>> = "true"|"false". If it is set to "true", we can add > >> > > > >>> additional > >> > > > >>>>>>>>>>> validation > >> > > > >>>>>>>>>>>>> logic (e.g. "scan.startup.mode" can not be set, > >> primary > >> > key > >> > > > >>>>>>>>> required, > >> > > > >>>>>>>>>>>>> etc.). If we stick to a separate connector I'd not > >> call > >> > it > >> > > > >>>>> "ktable", > >> > > > >>>>>>>>>> but > >> > > > >>>>>>>>>>>>> rather "compacted-kafka" or similar. KTable seems to > >> > carry > >> > > > >>> more > >> > > > >>>>>>>>>> implicit > >> > > > >>>>>>>>>>>>> meaning than we want to imply here. > >> > > > >>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> * I agree that this is not a bounded source. If we > >> want > >> > to > >> > > > >>>>> support a > >> > > > >>>>>>>>>>>>> bounded mode, this is an orthogonal concern that > also > >> > > > >> applies > >> > > > >>> to > >> > > > >>>>>>>>> other > >> > > > >>>>>>>>>>>>> unbounded sources. > >> > > > >>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> Best, > >> > > > >>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> Konstantin > >> > > > >>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu < > >> > imj...@gmail.com> > >> > > > >>>> wrote: > >> > > > >>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>> Hi Danny, > >> > > > >>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>> First of all, we didn't introduce any concepts from > >> KSQL > >> > > > >>> (e.g. > >> > > > >>>>>>>>>> Stream > >> > > > >>>>>>>>>>> vs > >> > > > >>>>>>>>>>>>>> Table notion). > >> > > > >>>>>>>>>>>>>> This new connector will produce a changelog stream, > >> so > >> > > it's > >> > > > >>>> still > >> > > > >>>>>>>>> a > >> > > > >>>>>>>>>>>>> dynamic > >> > > > >>>>>>>>>>>>>> table and doesn't conflict with Flink core > concepts. > >> > > > >>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>> The "ktable" is just a connector name, we can also > >> call > >> > it > >> > > > >>>>>>>>>>>>>> "compacted-kafka" or something else. > >> > > > >>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can > >> > migrate > >> > > > >> to > >> > > > >>>>>>>>> Flink > >> > > > >>>>>>>>>>> SQL > >> > > > >>>>>>>>>>>>>> easily. > >> > > > >>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>> Regarding to why introducing a new connector vs a > new > >> > > > >>> property > >> > > > >>>> in > >> > > > >>>>>>>>>>>>> existing > >> > > > >>>>>>>>>>>>>> kafka connector: > >> > > > >>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>> I think the main reason is that we want to have a > >> clear > >> > > > >>>>> separation > >> > > > >>>>>>>>>> for > >> > > > >>>>>>>>>>>>> such > >> > > > >>>>>>>>>>>>>> two use cases, because they are very different. > >> > > > >>>>>>>>>>>>>> We also listed reasons in the FLIP, including: > >> > > > >>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>> 1) It's hard to explain what's the behavior when > >> users > >> > > > >>> specify > >> > > > >>>>> the > >> > > > >>>>>>>>>>> start > >> > > > >>>>>>>>>>>>>> offset from a middle position (e.g. how to process > >> non > >> > > > >> exist > >> > > > >>>>>>>>> delete > >> > > > >>>>>>>>>>>>>> events). > >> > > > >>>>>>>>>>>>>> It's dangerous if users do that. So we don't > >> > > provide > >> > > > >>> the > >> > > > >>>>>>>>> offset > >> > > > >>>>>>>>>>>>> option > >> > > > >>>>>>>>>>>>>> in the new connector at the moment. > >> > > > >>>>>>>>>>>>>> 2) It's a different perspective/abstraction on the > >> same > >> > > > >> kafka > >> > > > >>>>>>>>> topic > >> > > > >>>>>>>>>>>>> (append > >> > > > >>>>>>>>>>>>>> vs. upsert). It would be easier to understand if we > >> can > >> > > > >>>> separate > >> > > > >>>>>>>>>> them > >> > > > >>>>>>>>>>>>>> instead of mixing them in one connector. The > >> new > >> > > > >>>> connector > >> > > > >>>>>>>>>>> requires > >> > > > >>>>>>>>>>>>>> hash sink partitioner, primary key declared, > regular > >> > > > >> format. > >> > > > >>>>>>>>>>>>>> If we mix them in one connector, it might be > >> > > > >> confusing > >> > > > >>>> how > >> > > > >>>>> to > >> > > > >>>>>>>>>> use > >> > > > >>>>>>>>>>>>> the > >> > > > >>>>>>>>>>>>>> options correctly. > >> > > > >>>>>>>>>>>>>> 3) The semantic of the KTable connector is just the > >> same > >> > > as > >> > > > >>>>> KTable > >> > > > >>>>>>>>>> in > >> > > > >>>>>>>>>>>>> Kafka > >> > > > >>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and > KSQL > >> > > users. > >> > > > >>>>>>>>>>>>>> We have seen several questions in the > mailing > >> > list > >> > > > >>> asking > >> > > > >>>>> how > >> > > > >>>>>>>>> to > >> > > > >>>>>>>>>>>>> model > >> > > > >>>>>>>>>>>>>> a KTable and how to join a KTable in Flink SQL. > >> > > > >>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>> Best, > >> > > > >>>>>>>>>>>>>> Jark > >> > > > >>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 19:53, Jark Wu < > >> imj...@gmail.com > >> > > > >> > > > >>>> wrote: > >> > > > >>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>> Hi Jingsong, > >> > > > >>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>> As the FLIP describes, "KTable connector produces > a > >> > > > >>> changelog > >> > > > >>>>>>>>>>> stream, > >> > > > >>>>>>>>>>>>>>> where each data record represents an update or > >> delete > >> > > > >>> event.". > >> > > > >>>>>>>>>>>>>>> Therefore, a ktable source is an unbounded stream > >> > source. > >> > > > >>>>>>>>>> Selecting > >> > > > >>>>>>>>>>> a > >> > > > >>>>>>>>>>>>>>> ktable source is similar to selecting a kafka > source > >> > with > >> > > > >>>>>>>>>>>>> debezium-json > >> > > > >>>>>>>>>>>>>>> format > >> > > > >>>>>>>>>>>>>>> that it never ends and the results are > continuously > >> > > > >> updated. > >> > > > >>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>> It's possible to have a bounded ktable source in > the > >> > > > >> future, > >> > > > >>>> for > >> > > > >>>>>>>>>>>>> example, > >> > > > >>>>>>>>>>>>>>> add an option 'bounded=true' or 'end-offset=xxx'. > >> > > > >>>>>>>>>>>>>>> In this way, the ktable will produce a bounded > >> > changelog > >> > > > >>>> stream. > >> > > > >>>>>>>>>>>>>>> So I think this can be a compatible feature in the > >> > > future. > >> > > > >>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>> I don't think we should associate with ksql > related > >> > > > >>> concepts. > >> > > > >>>>>>>>>>>>> Actually, > >> > > > >>>>>>>>>>>>>> we > >> > > > >>>>>>>>>>>>>>> didn't introduce any concepts from KSQL (e.g. > >> Stream vs > >> > > > >>> Table > >> > > > >>>>>>>>>>> notion). > >> > > > >>>>>>>>>>>>>>> The "ktable" is just a connector name, we can also > >> call > >> > > it > >> > > > >>>>>>>>>>>>>>> "compacted-kafka" or something else. > >> > > > >>>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can > >> > > migrate > >> > > > >>> to > >> > > > >>>>>>>>>> Flink > >> > > > >>>>>>>>>>>>> SQL > >> > > > >>>>>>>>>>>>>>> easily. > >> > > > >>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>> Regarding the "value.fields-include", this is an > >> option > >> > > > >>>>>>>>> introduced > >> > > > >>>>>>>>>>> in > >> > > > >>>>>>>>>>>>>>> FLIP-107 for Kafka connector. > >> > > > >>>>>>>>>>>>>>> I think we should keep the same behavior with the > >> Kafka > >> > > > >>>>>>>>> connector. > >> > > > >>>>>>>>>>> I'm > >> > > > >>>>>>>>>>>>>> not > >> > > > >>>>>>>>>>>>>>> sure what's the default behavior of KSQL. > >> > > > >>>>>>>>>>>>>>> But I guess it also stores the keys in value from > >> this > >> > > > >>> example > >> > > > >>>>>>>>>> docs > >> > > > >>>>>>>>>>>>> (see > >> > > > >>>>>>>>>>>>>>> the "users_original" table) [1]. > >> > > > >>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>> Best, > >> > > > >>>>>>>>>>>>>>> Jark > >> > > > >>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>> [1]: > >> > > > >>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> > >> > > > >>>>>>>>>>> > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>> > >> > > > >>>>>>> > >> > > > >>>>> > >> > > > >>>> > >> > > > >>> > >> > > > >> > >> > > > > >> > > > >> > > >> > https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table > >> > > > >>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 18:17, Danny Chan < > >> > > > >>>> yuzhao....@gmail.com> > >> > > > >>>>>>>>>>>>> wrote: > >> > > > >>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>> The concept seems conflicts with the Flink > >> abstraction > >> > > > >>>> “dynamic > >> > > > >>>>>>>>>>>>> table”, > >> > > > >>>>>>>>>>>>>>>> in Flink we see both “stream” and “table” as a > >> dynamic > >> > > > >>> table, > >> > > > >>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>> I think we should make clear first how to express > >> > stream > >> > > > >>> and > >> > > > >>>>>>>>>> table > >> > > > >>>>>>>>>>>>>>>> specific features on one “dynamic table”, > >> > > > >>>>>>>>>>>>>>>> it is more natural for KSQL because KSQL takes > >> stream > >> > > and > >> > > > >>>> table > >> > > > >>>>>>>>>> as > >> > > > >>>>>>>>>>>>>>>> different abstractions for representing > >> collections. > >> > In > >> > > > >>> KSQL, > >> > > > >>>>>>>>>> only > >> > > > >>>>>>>>>>>>>> table is > >> > > > >>>>>>>>>>>>>>>> mutable and can have a primary key. > >> > > > >>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>> Does this connector belongs to the “table” scope > or > >> > > > >>> “stream” > >> > > > >>>>>>>>>> scope > >> > > > >>>>>>>>>>> ? > >> > > > >>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>> Some of the concepts (such as the primary key on > >> > stream) > >> > > > >>>> should > >> > > > >>>>>>>>>> be > >> > > > >>>>>>>>>>>>>>>> suitable for all the connectors, not just Kafka, > >> > > > >> Shouldn’t > >> > > > >>>> this > >> > > > >>>>>>>>>> be > >> > > > >>>>>>>>>>> an > >> > > > >>>>>>>>>>>>>>>> extension of existing Kafka connector instead of > a > >> > > > >> totally > >> > > > >>>> new > >> > > > >>>>>>>>>>>>>> connector ? > >> > > > >>>>>>>>>>>>>>>> What about the other connectors ? > >> > > > >>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>> Because this touches the core abstraction of > >> Flink, we > >> > > > >>> better > >> > > > >>>>>>>>>> have > >> > > > >>>>>>>>>>> a > >> > > > >>>>>>>>>>>>>>>> top-down overall design, following the KSQL > >> directly > >> > is > >> > > > >> not > >> > > > >>>> the > >> > > > >>>>>>>>>>>>> answer. > >> > > > >>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>> P.S. For the source > >> > > > >>>>>>>>>>>>>>>>> Shouldn’t this be an extension of existing Kafka > >> > > > >> connector > >> > > > >>>>>>>>>>> instead > >> > > > >>>>>>>>>>>>> of > >> > > > >>>>>>>>>>>>>> a > >> > > > >>>>>>>>>>>>>>>> totally new connector ? > >> > > > >>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>> How could we achieve that (e.g. set up the > >> parallelism > >> > > > >>>>>>>>>> correctly) ? > >> > > > >>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>> Best, > >> > > > >>>>>>>>>>>>>>>> Danny Chan > >> > > > >>>>>>>>>>>>>>>> 在 2020年10月19日 +0800 PM5:17,Jingsong Li < > >> > > > >>>> jingsongl...@gmail.com > >> > > > >>>>>>>>>>>> ,写道: > >> > > > >>>>>>>>>>>>>>>>> Thanks Shengkai for your proposal. > >> > > > >>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>> +1 for this feature. > >> > > > >>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>>> Future Work: Support bounded KTable source > >> > > > >>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>> I don't think it should be a future work, I > think > >> it > >> > is > >> > > > >>> one > >> > > > >>>>>>>>> of > >> > > > >>>>>>>>>>> the > >> > > > >>>>>>>>>>>>>>>>> important concepts of this FLIP. We need to > >> > understand > >> > > > >> it > >> > > > >>>>>>>>> now. > >> > > > >>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>> Intuitively, a ktable in my opinion is a bounded > >> > table > >> > > > >>>> rather > >> > > > >>>>>>>>>>> than > >> > > > >>>>>>>>>>>>> a > >> > > > >>>>>>>>>>>>>>>>> stream, so select should produce a bounded table > >> by > >> > > > >>> default. > >> > > > >>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>> I think we can list Kafka related knowledge, > >> because > >> > > the > >> > > > >>>> word > >> > > > >>>>>>>>>>>>> `ktable` > >> > > > >>>>>>>>>>>>>>>> is > >> > > > >>>>>>>>>>>>>>>>> easy to associate with ksql related concepts. > (If > >> > > > >>> possible, > >> > > > >>>>>>>>>> it's > >> > > > >>>>>>>>>>>>>> better > >> > > > >>>>>>>>>>>>>>>> to > >> > > > >>>>>>>>>>>>>>>>> unify with it) > >> > > > >>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>> What do you think? > >> > > > >>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>>> value.fields-include > >> > > > >>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>> What about the default behavior of KSQL? > >> > > > >>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>> Best, > >> > > > >>>>>>>>>>>>>>>>> Jingsong > >> > > > >>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>> On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang < > >> > > > >>>>>>>>>> fskm...@gmail.com > >> > > > >>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>> wrote: > >> > > > >>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>>> Hi, devs. > >> > > > >>>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>>> Jark and I want to start a new FLIP to > introduce > >> the > >> > > > >>> KTable > >> > > > >>>>>>>>>>>>>>>> connector. The > >> > > > >>>>>>>>>>>>>>>>>> KTable is a shortcut of "Kafka Table", it also > >> has > >> > the > >> > > > >>> same > >> > > > >>>>>>>>>>>>>> semantics > >> > > > >>>>>>>>>>>>>>>> with > >> > > > >>>>>>>>>>>>>>>>>> the KTable notion in Kafka Stream. > >> > > > >>>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>>> FLIP-149: > >> > > > >>>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> > >> > > > >>>>>>>>>>> > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>> > >> > > > >>>>>>> > >> > > > >>>>> > >> > > > >>>> > >> > > > >>> > >> > > > >> > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector > >> > > > >>>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>>> Currently many users have expressed their needs > >> for > >> > > the > >> > > > >>>>>>>>>> upsert > >> > > > >>>>>>>>>>>>> Kafka > >> > > > >>>>>>>>>>>>>>>> by > >> > > > >>>>>>>>>>>>>>>>>> mail lists and issues. The KTable connector has > >> > > several > >> > > > >>>>>>>>>>> benefits > >> > > > >>>>>>>>>>>>> for > >> > > > >>>>>>>>>>>>>>>> users: > >> > > > >>>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>>> 1. Users are able to interpret a compacted > Kafka > >> > Topic > >> > > > >> as > >> > > > >>>>>>>>> an > >> > > > >>>>>>>>>>>>> upsert > >> > > > >>>>>>>>>>>>>>>> stream > >> > > > >>>>>>>>>>>>>>>>>> in Apache Flink. And also be able to write a > >> > changelog > >> > > > >>>>>>>>> stream > >> > > > >>>>>>>>>>> to > >> > > > >>>>>>>>>>>>>> Kafka > >> > > > >>>>>>>>>>>>>>>>>> (into a compacted topic). > >> > > > >>>>>>>>>>>>>>>>>> 2. As a part of the real time pipeline, store > >> join > >> > or > >> > > > >>>>>>>>>> aggregate > >> > > > >>>>>>>>>>>>>>>> result (may > >> > > > >>>>>>>>>>>>>>>>>> contain updates) into a Kafka topic for further > >> > > > >>>>>>>>> calculation; > >> > > > >>>>>>>>>>>>>>>>>> 3. The semantic of the KTable connector is just > >> the > >> > > > >> same > >> > > > >>> as > >> > > > >>>>>>>>>>>>> KTable > >> > > > >>>>>>>>>>>>>> in > >> > > > >>>>>>>>>>>>>>>> Kafka > >> > > > >>>>>>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and > >> KSQL > >> > > > >>> users. > >> > > > >>>>>>>>>> We > >> > > > >>>>>>>>>>>>> have > >> > > > >>>>>>>>>>>>>>>> seen > >> > > > >>>>>>>>>>>>>>>>>> several questions in the mailing list asking > how > >> to > >> > > > >>> model a > >> > > > >>>>>>>>>>>>> KTable > >> > > > >>>>>>>>>>>>>>>> and how > >> > > > >>>>>>>>>>>>>>>>>> to join a KTable in Flink SQL. > >> > > > >>>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>>> We hope it can expand the usage of the Flink > with > >> > > > >> Kafka. > >> > > > >>>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>>> I'm looking forward to your feedback. > >> > > > >>>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>>> Best, > >> > > > >>>>>>>>>>>>>>>>>> Shengkai > >> > > > >>>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>>>> -- > >> > > > >>>>>>>>>>>>>>>>> Best, Jingsong Lee > >> > > > >>>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> -- > >> > > > >>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> Konstantin Knauf > >> > > > >>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> https://twitter.com/snntrable > >> > > > >>>>>>>>>>>>> > >> > > > >>>>>>>>>>>>> https://github.com/knaufk > >> > > > >>>>>>>>>>>>> > >> > > > >>>>>>>>>>>> > >> > > > >>>>>>>>>>> > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>> > >> > > > >>>>>>>>> > >> > > > >>>>>>>>> -- > >> > > > >>>>>>>>> > >> > > > >>>>>>>>> Konstantin Knauf > >> > > > >>>>>>>>> > >> > > > >>>>>>>>> https://twitter.com/snntrable > >> > > > >>>>>>>>> > >> > > > >>>>>>>>> https://github.com/knaufk > >> > > > >>>>>>>>> > >> > > > >>>>>>>> > >> > > > >>>>>>> > >> > > > >>>>>>> > >> > > > >>>>>> > >> > > > >>>>> > >> > > > >>>>> > >> > > > >>>> > >> > > > >>>> -- > >> > > > >>>> > >> > > > >>>> Seth Wiesman | Solutions Architect > >> > > > >>>> > >> > > > >>>> +1 314 387 1463 > >> > > > >>>> > >> > > > >>>> <https://www.ververica.com/> > >> > > > >>>> > >> > > > >>>> Follow us @VervericaData > >> > > > >>>> > >> > > > >>>> -- > >> > > > >>>> > >> > > > >>>> Join Flink Forward <https://flink-forward.org/> - The Apache > >> > Flink > >> > > > >>>> Conference > >> > > > >>>> > >> > > > >>>> Stream Processing | Event Driven | Real Time > >> > > > >>>> > >> > > > >>> > >> > > > >> > >> > > > > > >> > > > > >> > > > > >> > > > >> > > -- > >> > > Best, Jingsong Lee > >> > > > >> > > >> > >> > >> -- > >> Best, Jingsong Lee > >> > > >