To be precise, it means the Kakfa topic should set the configuration "cleanup.policy" to "compact" not "delete".
Best, Kurt On Fri, Oct 23, 2020 at 4:01 PM Jingsong Li <jingsongl...@gmail.com> wrote: > I just notice there is a limitation in the FLIP: > > > Generally speaking, the underlying topic of the upsert-kafka source must > be compacted. Besides, the underlying topic must have all the data with the > same key in the same partition, otherwise, the result will be wrong. > > According to my understanding, this is not accurate? Compact is an > optimization, not a limitation. It depends on users. > > I don't want to stop voting, just want to make it clear. > > Best, > Jingsong > > On Fri, Oct 23, 2020 at 3:16 PM Timo Walther <twal...@apache.org> wrote: > > > +1 for voting > > > > Regards, > > Timo > > > > On 23.10.20 09:07, Jark Wu wrote: > > > Thanks Shengkai! > > > > > > +1 to start voting. > > > > > > Best, > > > Jark > > > > > > On Fri, 23 Oct 2020 at 15:02, Shengkai Fang <fskm...@gmail.com> wrote: > > > > > >> Add one more message, I have already updated the FLIP[1]. > > >> > > >> [1] > > >> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector > > >> > > >> Shengkai Fang <fskm...@gmail.com> 于2020年10月23日周五 下午2:55写道: > > >> > > >>> Hi, all. > > >>> It seems we have reached a consensus on the FLIP. If no one has other > > >>> objections, I would like to start the vote for FLIP-149. > > >>> > > >>> Best, > > >>> Shengkai > > >>> > > >>> Jingsong Li <jingsongl...@gmail.com> 于2020年10月23日周五 下午2:25写道: > > >>> > > >>>> Thanks for explanation, > > >>>> > > >>>> I am OK for `upsert`. Yes, Its concept has been accepted by many > > >> systems. > > >>>> > > >>>> Best, > > >>>> Jingsong > > >>>> > > >>>> On Fri, Oct 23, 2020 at 12:38 PM Jark Wu <imj...@gmail.com> wrote: > > >>>> > > >>>>> Hi Timo, > > >>>>> > > >>>>> I have some concerns about `kafka-cdc`, > > >>>>> 1) cdc is an abbreviation of Change Data Capture which is commonly > > >> used > > >>>> for > > >>>>> databases, not for message queues. > > >>>>> 2) usually, cdc produces full content of changelog, including > > >>>>> UPDATE_BEFORE, however "upsert kafka" doesn't > > >>>>> 3) `kafka-cdc` sounds like a natively support for `debezium-json` > > >>>> format, > > >>>>> however, it is not and even we don't want > > >>>>> "upsert kafka" to support "debezium-json" > > >>>>> > > >>>>> > > >>>>> Hi Jingsong, > > >>>>> > > >>>>> I think the terminology of "upsert" is fine, because Kafka also > uses > > >>>>> "upsert" to define such behavior in their official documentation > [1]: > > >>>>> > > >>>>>> a data record in a changelog stream is interpreted as an UPSERT > aka > > >>>>> INSERT/UPDATE > > >>>>> > > >>>>> Materialize uses the "UPSERT" keyword to define such behavior too > > [2]. > > >>>>> Users have been requesting such feature using "upsert kafka" > > >>>> terminology in > > >>>>> user mailing lists [3][4]. > > >>>>> Many other systems support "UPSERT" statement natively, such as > > impala > > >>>> [5], > > >>>>> SAP [6], Phoenix [7], Oracle NoSQL [8], etc.. > > >>>>> > > >>>>> Therefore, I think we don't need to be afraid of introducing > "upsert" > > >>>>> terminology, it is widely accepted by users. > > >>>>> > > >>>>> Best, > > >>>>> Jark > > >>>>> > > >>>>> > > >>>>> [1]: > > >>>>> > > >>>>> > > >>>> > > >> > > > https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable > > >>>>> [2]: > > >>>>> > > >>>>> > > >>>> > > >> > > > https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic > > >>>>> [3]: > > >>>>> > > >>>>> > > >>>> > > >> > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-materialized-upsert-tables-td18482.html#a18503 > > >>>>> [4]: > > >>>>> > > >>>>> > > >>>> > > >> > > > http://apache-flink.147419.n8.nabble.com/Kafka-Sink-AppendStreamTableSink-doesn-t-support-consuming-update-changes-td5959.html > > >>>>> [5]: > > >>>> https://impala.apache.org/docs/build/html/topics/impala_upsert.html > > >>>>> [6]: > > >>>>> > > >>>>> > > >>>> > > >> > > > https://help.sap.com/viewer/7c78579ce9b14a669c1f3295b0d8ca16/Cloud/en-US/ea8b6773be584203bcd99da76844c5ed.html > > >>>>> [7]: https://phoenix.apache.org/atomic_upsert.html > > >>>>> [8]: > > >>>>> > > >>>>> > > >>>> > > >> > > > https://docs.oracle.com/en/database/other-databases/nosql-database/18.3/sqlfornosql/adding-table-rows-using-insert-and-upsert-statements.html > > >>>>> > > >>>>> On Fri, 23 Oct 2020 at 10:36, Jingsong Li <jingsongl...@gmail.com> > > >>>> wrote: > > >>>>> > > >>>>>> The `kafka-cdc` looks good to me. > > >>>>>> We can even give options to indicate whether to turn on compact, > > >>>> because > > >>>>>> compact is just an optimization? > > >>>>>> > > >>>>>> - ktable let me think about KSQL. > > >>>>>> - kafka-compacted it is not just compacted, more than that, it > still > > >>>> has > > >>>>>> the ability of CDC > > >>>>>> - upsert-kafka , upsert is back, and I don't really want to see it > > >>>> again > > >>>>>> since we have CDC > > >>>>>> > > >>>>>> Best, > > >>>>>> Jingsong > > >>>>>> > > >>>>>> On Fri, Oct 23, 2020 at 2:21 AM Timo Walther <twal...@apache.org> > > >>>> wrote: > > >>>>>> > > >>>>>>> Hi Jark, > > >>>>>>> > > >>>>>>> I would be fine with `connector=upsert-kafka`. Another idea would > > >>>> be to > > >>>>>>> align the name to other available Flink connectors [1]: > > >>>>>>> > > >>>>>>> `connector=kafka-cdc`. > > >>>>>>> > > >>>>>>> Regards, > > >>>>>>> Timo > > >>>>>>> > > >>>>>>> [1] https://github.com/ververica/flink-cdc-connectors > > >>>>>>> > > >>>>>>> On 22.10.20 17:17, Jark Wu wrote: > > >>>>>>>> Another name is "connector=upsert-kafka', I think this can solve > > >>>>> Timo's > > >>>>>>>> concern on the "compacted" word. > > >>>>>>>> > > >>>>>>>> Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify > > >>>> such > > >>>>>>> kafka > > >>>>>>>> sources. > > >>>>>>>> I think "upsert" is a well-known terminology widely used in many > > >>>>>> systems > > >>>>>>>> and matches the > > >>>>>>>> behavior of how we handle the kafka messages. > > >>>>>>>> > > >>>>>>>> What do you think? > > >>>>>>>> > > >>>>>>>> Best, > > >>>>>>>> Jark > > >>>>>>>> > > >>>>>>>> [1]: > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >> > > > https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Thu, 22 Oct 2020 at 22:53, Kurt Young <ykt...@gmail.com> > > >>>> wrote: > > >>>>>>>> > > >>>>>>>>> Good validation messages can't solve the broken user > > >> experience, > > >>>>>>> especially > > >>>>>>>>> that > > >>>>>>>>> such update mode option will implicitly make half of current > > >>>> kafka > > >>>>>>> options > > >>>>>>>>> invalid or doesn't > > >>>>>>>>> make sense. > > >>>>>>>>> > > >>>>>>>>> Best, > > >>>>>>>>> Kurt > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On Thu, Oct 22, 2020 at 10:31 PM Jark Wu <imj...@gmail.com> > > >>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> Hi Timo, Seth, > > >>>>>>>>>> > > >>>>>>>>>> The default value "inserting" of "mode" might be not suitable, > > >>>>>>>>>> because "debezium-json" emits changelog messages which include > > >>>>>> updates. > > >>>>>>>>>> > > >>>>>>>>>> On Thu, 22 Oct 2020 at 22:10, Seth Wiesman < > > >> s...@ververica.com> > > >>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> +1 for supporting upsert results into Kafka. > > >>>>>>>>>>> > > >>>>>>>>>>> I have no comments on the implementation details. > > >>>>>>>>>>> > > >>>>>>>>>>> As far as configuration goes, I tend to favor Timo's option > > >>>> where > > >>>>> we > > >>>>>>>>> add > > >>>>>>>>>> a > > >>>>>>>>>>> "mode" property to the existing Kafka table with default > > >> value > > >>>>>>>>>> "inserting". > > >>>>>>>>>>> If the mode is set to "updating" then the validation changes > > >> to > > >>>>> the > > >>>>>>> new > > >>>>>>>>>>> requirements. I personally find it more intuitive than a > > >>>> seperate > > >>>>>>>>>>> connector, my fear is users won't understand its the same > > >>>> physical > > >>>>>>>>> kafka > > >>>>>>>>>>> sink under the hood and it will lead to other confusion like > > >>>> does > > >>>>> it > > >>>>>>>>>> offer > > >>>>>>>>>>> the same persistence guarantees? I think we are capable of > > >>>> adding > > >>>>>> good > > >>>>>>>>>>> valdiation messaging that solves Jark and Kurts concerns. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> On Thu, Oct 22, 2020 at 8:51 AM Timo Walther < > > >>>> twal...@apache.org> > > >>>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> Hi Jark, > > >>>>>>>>>>>> > > >>>>>>>>>>>> "calling it "kafka-compacted" can even remind users to > > >> enable > > >>>> log > > >>>>>>>>>>>> compaction" > > >>>>>>>>>>>> > > >>>>>>>>>>>> But sometimes users like to store a lineage of changes in > > >>>> their > > >>>>>>>>> topics. > > >>>>>>>>>>>> Indepent of any ktable/kstream interpretation. > > >>>>>>>>>>>> > > >>>>>>>>>>>> I let the majority decide on this topic to not further block > > >>>> this > > >>>>>>>>>>>> effort. But we might find a better name like: > > >>>>>>>>>>>> > > >>>>>>>>>>>> connector = kafka > > >>>>>>>>>>>> mode = updating/inserting > > >>>>>>>>>>>> > > >>>>>>>>>>>> OR > > >>>>>>>>>>>> > > >>>>>>>>>>>> connector = kafka-updating > > >>>>>>>>>>>> > > >>>>>>>>>>>> ... > > >>>>>>>>>>>> > > >>>>>>>>>>>> Regards, > > >>>>>>>>>>>> Timo > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> On 22.10.20 15:24, Jark Wu wrote: > > >>>>>>>>>>>>> Hi Timo, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Thanks for your opinions. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 1) Implementation > > >>>>>>>>>>>>> We will have an stateful operator to generate INSERT and > > >>>>>>>>>> UPDATE_BEFORE. > > >>>>>>>>>>>>> This operator is keyby-ed (primary key as the shuffle key) > > >>>> after > > >>>>>>>>> the > > >>>>>>>>>>>> source > > >>>>>>>>>>>>> operator. > > >>>>>>>>>>>>> The implementation of this operator is very similar to the > > >>>>>> existing > > >>>>>>>>>>>>> `DeduplicateKeepLastRowFunction`. > > >>>>>>>>>>>>> The operator will register a value state using the primary > > >>>> key > > >>>>>>>>> fields > > >>>>>>>>>>> as > > >>>>>>>>>>>>> keys. > > >>>>>>>>>>>>> When the value state is empty under current key, we will > > >> emit > > >>>>>>>>> INSERT > > >>>>>>>>>>> for > > >>>>>>>>>>>>> the input row. > > >>>>>>>>>>>>> When the value state is not empty under current key, we > > >> will > > >>>>> emit > > >>>>>>>>>>>>> UPDATE_BEFORE using the row in state, > > >>>>>>>>>>>>> and emit UPDATE_AFTER using the input row. > > >>>>>>>>>>>>> When the input row is DELETE, we will clear state and emit > > >>>>> DELETE > > >>>>>>>>>> row. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 2) new option vs new connector > > >>>>>>>>>>>>>> We recently simplified the table options to a minimum > > >>>> amount of > > >>>>>>>>>>>>> characters to be as concise as possible in the DDL. > > >>>>>>>>>>>>> I think this is the reason why we want to introduce a new > > >>>>>>>>> connector, > > >>>>>>>>>>>>> because we can simplify the options in DDL. > > >>>>>>>>>>>>> For example, if using a new option, the DDL may look like > > >>>> this: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> CREATE TABLE users ( > > >>>>>>>>>>>>> user_id BIGINT, > > >>>>>>>>>>>>> user_name STRING, > > >>>>>>>>>>>>> user_level STRING, > > >>>>>>>>>>>>> region STRING, > > >>>>>>>>>>>>> PRIMARY KEY (user_id) NOT ENFORCED > > >>>>>>>>>>>>> ) WITH ( > > >>>>>>>>>>>>> 'connector' = 'kafka', > > >>>>>>>>>>>>> 'model' = 'table', > > >>>>>>>>>>>>> 'topic' = 'pageviews_per_region', > > >>>>>>>>>>>>> 'properties.bootstrap.servers' = '...', > > >>>>>>>>>>>>> 'properties.group.id' = 'testGroup', > > >>>>>>>>>>>>> 'scan.startup.mode' = 'earliest', > > >>>>>>>>>>>>> 'key.format' = 'csv', > > >>>>>>>>>>>>> 'key.fields' = 'user_id', > > >>>>>>>>>>>>> 'value.format' = 'avro', > > >>>>>>>>>>>>> 'sink.partitioner' = 'hash' > > >>>>>>>>>>>>> ); > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> If using a new connector, we can have a different default > > >>>> value > > >>>>>> for > > >>>>>>>>>> the > > >>>>>>>>>>>>> options and remove unnecessary options, > > >>>>>>>>>>>>> the DDL can look like this which is much more concise: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> CREATE TABLE pageviews_per_region ( > > >>>>>>>>>>>>> user_id BIGINT, > > >>>>>>>>>>>>> user_name STRING, > > >>>>>>>>>>>>> user_level STRING, > > >>>>>>>>>>>>> region STRING, > > >>>>>>>>>>>>> PRIMARY KEY (user_id) NOT ENFORCED > > >>>>>>>>>>>>> ) WITH ( > > >>>>>>>>>>>>> 'connector' = 'kafka-compacted', > > >>>>>>>>>>>>> 'topic' = 'pageviews_per_region', > > >>>>>>>>>>>>> 'properties.bootstrap.servers' = '...', > > >>>>>>>>>>>>> 'key.format' = 'csv', > > >>>>>>>>>>>>> 'value.format' = 'avro' > > >>>>>>>>>>>>> ); > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> When people read `connector=kafka-compacted` they might > > >> not > > >>>>> know > > >>>>>>>>>> that > > >>>>>>>>>>> it > > >>>>>>>>>>>>>> has ktable semantics. You don't need to enable log > > >>>> compaction > > >>>>> in > > >>>>>>>>>> order > > >>>>>>>>>>>>>> to use a KTable as far as I know. > > >>>>>>>>>>>>> We don't need to let users know it has ktable semantics, as > > >>>>>>>>>> Konstantin > > >>>>>>>>>>>>> mentioned this may carry more implicit > > >>>>>>>>>>>>> meaning than we want to imply here. I agree users don't > > >> need > > >>>> to > > >>>>>>>>>> enable > > >>>>>>>>>>>> log > > >>>>>>>>>>>>> compaction, but from the production perspective, > > >>>>>>>>>>>>> log compaction should always be enabled if it is used in > > >> this > > >>>>>>>>>> purpose. > > >>>>>>>>>>>>> Calling it "kafka-compacted" can even remind users to > > >> enable > > >>>> log > > >>>>>>>>>>>> compaction. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I don't agree to introduce "model = table/stream" option, > > >> or > > >>>>>>>>>>>>> "connector=kafka-table", > > >>>>>>>>>>>>> because this means we are introducing Table vs Stream > > >> concept > > >>>>> from > > >>>>>>>>>>> KSQL. > > >>>>>>>>>>>>> However, we don't have such top-level concept in Flink SQL > > >>>> now, > > >>>>>>>>> this > > >>>>>>>>>>> will > > >>>>>>>>>>>>> further confuse users. > > >>>>>>>>>>>>> In Flink SQL, all the things are STREAM, the differences > > >> are > > >>>>>>>>> whether > > >>>>>>>>>> it > > >>>>>>>>>>>> is > > >>>>>>>>>>>>> bounded or unbounded, > > >>>>>>>>>>>>> whether it is insert-only or changelog. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Best, > > >>>>>>>>>>>>> Jark > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Thu, 22 Oct 2020 at 20:39, Timo Walther < > > >>>> twal...@apache.org> > > >>>>>>>>>> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> Hi Shengkai, Hi Jark, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> thanks for this great proposal. It is time to finally > > >>>> connect > > >>>>> the > > >>>>>>>>>>>>>> changelog processor with a compacted Kafka topic. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> "The operator will produce INSERT rows, or additionally > > >>>>> generate > > >>>>>>>>>>>>>> UPDATE_BEFORE rows for the previous image, or produce > > >> DELETE > > >>>>> rows > > >>>>>>>>>> with > > >>>>>>>>>>>>>> all columns filled with values." > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Could you elaborate a bit on the implementation details in > > >>>> the > > >>>>>>>>> FLIP? > > >>>>>>>>>>> How > > >>>>>>>>>>>>>> are UPDATE_BEFOREs are generated. How much state is > > >>>> required to > > >>>>>>>>>>> perform > > >>>>>>>>>>>>>> this operation. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> From a conceptual and semantical point of view, I'm > > >> fine > > >>>>> with > > >>>>>>>>> the > > >>>>>>>>>>>>>> proposal. But I would like to share my opinion about how > > >> we > > >>>>>> expose > > >>>>>>>>>>> this > > >>>>>>>>>>>>>> feature: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> ktable vs kafka-compacted > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> I'm against having an additional connector like `ktable` > > >> or > > >>>>>>>>>>>>>> `kafka-compacted`. We recently simplified the table > > >> options > > >>>> to > > >>>>> a > > >>>>>>>>>>> minimum > > >>>>>>>>>>>>>> amount of characters to be as concise as possible in the > > >>>> DDL. > > >>>>>>>>>>> Therefore, > > >>>>>>>>>>>>>> I would keep the `connector=kafka` and introduce an > > >>>> additional > > >>>>>>>>>> option. > > >>>>>>>>>>>>>> Because a user wants to read "from Kafka". And the "how" > > >>>> should > > >>>>>> be > > >>>>>>>>>>>>>> determined in the lower options. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> When people read `connector=ktable` they might not know > > >> that > > >>>>> this > > >>>>>>>>> is > > >>>>>>>>>>>>>> Kafka. Or they wonder where `kstream` is? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> When people read `connector=kafka-compacted` they might > > >> not > > >>>>> know > > >>>>>>>>>> that > > >>>>>>>>>>> it > > >>>>>>>>>>>>>> has ktable semantics. You don't need to enable log > > >>>> compaction > > >>>>> in > > >>>>>>>>>> order > > >>>>>>>>>>>>>> to use a KTable as far as I know. Log compaction and table > > >>>>>>>>> semantics > > >>>>>>>>>>> are > > >>>>>>>>>>>>>> orthogonal topics. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> In the end we will need 3 types of information when > > >>>> declaring a > > >>>>>>>>>> Kafka > > >>>>>>>>>>>>>> connector: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> CREATE TABLE ... WITH ( > > >>>>>>>>>>>>>> connector=kafka -- Some information about the > > >>>>>> connector > > >>>>>>>>>>>>>> end-offset = XXXX -- Some information about the > > >>>>>>>>> boundedness > > >>>>>>>>>>>>>> model = table/stream -- Some information about > > >>>>>>>>> interpretation > > >>>>>>>>>>>>>> ) > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> We can still apply all the constraints mentioned in the > > >>>> FLIP. > > >>>>>> When > > >>>>>>>>>>>>>> `model` is set to `table`. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> What do you think? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>> Timo > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On 21.10.20 14:19, Jark Wu wrote: > > >>>>>>>>>>>>>>> Hi, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> IMO, if we are going to mix them in one connector, > > >>>>>>>>>>>>>>> 1) either users need to set some options to a specific > > >>>> value > > >>>>>>>>>>>> explicitly, > > >>>>>>>>>>>>>>> e.g. "scan.startup.mode=earliest", > > >> "sink.partitioner=hash", > > >>>>>> etc.. > > >>>>>>>>>>>>>>> This makes the connector awkward to use. Users may face > > >> to > > >>>> fix > > >>>>>>>>>>> options > > >>>>>>>>>>>>>> one > > >>>>>>>>>>>>>>> by one according to the exception. > > >>>>>>>>>>>>>>> Besides, in the future, it is still possible to use > > >>>>>>>>>>>>>>> "sink.partitioner=fixed" (reduce network cost) if users > > >> are > > >>>>>> aware > > >>>>>>>>>> of > > >>>>>>>>>>>>>>> the partition routing, > > >>>>>>>>>>>>>>> however, it's error-prone to have "fixed" as default for > > >>>>>>>>> compacted > > >>>>>>>>>>>> mode. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 2) or make those options a different default value when > > >>>>>>>>>>>> "compacted=true". > > >>>>>>>>>>>>>>> This would be more confusing and unpredictable if the > > >>>> default > > >>>>>>>>> value > > >>>>>>>>>>> of > > >>>>>>>>>>>>>>> options will change according to other options. > > >>>>>>>>>>>>>>> What happens if we have a third mode in the future? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> In terms of usage and options, it's very different from > > >> the > > >>>>>>>>>>>>>>> original "kafka" connector. > > >>>>>>>>>>>>>>> It would be more handy to use and less fallible if > > >>>> separating > > >>>>>>>>> them > > >>>>>>>>>>> into > > >>>>>>>>>>>>>> two > > >>>>>>>>>>>>>>> connectors. > > >>>>>>>>>>>>>>> In the implementation layer, we can reuse code as much as > > >>>>>>>>> possible. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Therefore, I'm still +1 to have a new connector. > > >>>>>>>>>>>>>>> The "kafka-compacted" name sounds good to me. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>> Jark > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf < > > >>>>>>>>> kna...@apache.org> > > >>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Hi Kurt, Hi Shengkai, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> thanks for answering my questions and the additional > > >>>>>>>>>>> clarifications. I > > >>>>>>>>>>>>>>>> don't have a strong opinion on whether to extend the > > >>>> "kafka" > > >>>>>>>>>>> connector > > >>>>>>>>>>>>>> or > > >>>>>>>>>>>>>>>> to introduce a new connector. So, from my perspective > > >> feel > > >>>>> free > > >>>>>>>>> to > > >>>>>>>>>>> go > > >>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>> a separate connector. If we do introduce a new > > >> connector I > > >>>>>>>>>> wouldn't > > >>>>>>>>>>>>>> call it > > >>>>>>>>>>>>>>>> "ktable" for aforementioned reasons (In addition, we > > >> might > > >>>>>>>>> suggest > > >>>>>>>>>>>> that > > >>>>>>>>>>>>>>>> there is also a "kstreams" connector for symmetry > > >>>> reasons). I > > >>>>>>>>>> don't > > >>>>>>>>>>>>>> have a > > >>>>>>>>>>>>>>>> good alternative name, though, maybe "kafka-compacted" > > >> or > > >>>>>>>>>>>>>>>> "compacted-kafka". > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Konstantin > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> On Wed, Oct 21, 2020 at 4:43 AM Kurt Young < > > >>>> ykt...@gmail.com > > >>>>>> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Hi all, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> I want to describe the discussion process which drove > > >> us > > >>>> to > > >>>>>>>>> have > > >>>>>>>>>>> such > > >>>>>>>>>>>>>>>>> conclusion, this might make some of > > >>>>>>>>>>>>>>>>> the design choices easier to understand and keep > > >>>> everyone on > > >>>>>>>>> the > > >>>>>>>>>>> same > > >>>>>>>>>>>>>>>> page. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Back to the motivation, what functionality do we want > > >> to > > >>>>>>>>> provide > > >>>>>>>>>> in > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> first place? We got a lot of feedback and > > >>>>>>>>>>>>>>>>> questions from mailing lists that people want to write > > >>>>>>>>>>>> Not-Insert-Only > > >>>>>>>>>>>>>>>>> messages into kafka. They might be > > >>>>>>>>>>>>>>>>> intentional or by accident, e.g. wrote an non-windowed > > >>>>>>>>> aggregate > > >>>>>>>>>>>> query > > >>>>>>>>>>>>>> or > > >>>>>>>>>>>>>>>>> non-windowed left outer join. And > > >>>>>>>>>>>>>>>>> some users from KSQL world also asked about why Flink > > >>>> didn't > > >>>>>>>>>>> leverage > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> Key concept of every kafka topic > > >>>>>>>>>>>>>>>>> and make kafka as a dynamic changing keyed table. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> To work with kafka better, we were thinking to extend > > >> the > > >>>>>>>>>>>> functionality > > >>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>> the current kafka connector by letting it > > >>>>>>>>>>>>>>>>> accept updates and deletions. But due to the limitation > > >>>> of > > >>>>>>>>> kafka, > > >>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> update has to be "update by key", aka a table > > >>>>>>>>>>>>>>>>> with primary key. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> This introduces a couple of conflicts with current > > >> kafka > > >>>>>>>>> table's > > >>>>>>>>>>>>>> options: > > >>>>>>>>>>>>>>>>> 1. key.fields: as said above, we need the kafka table > > >> to > > >>>>> have > > >>>>>>>>> the > > >>>>>>>>>>>>>> primary > > >>>>>>>>>>>>>>>>> key constraint. And users can also configure > > >>>>>>>>>>>>>>>>> key.fields freely, this might cause friction. (Sure we > > >>>> can > > >>>>> do > > >>>>>>>>>> some > > >>>>>>>>>>>>>> sanity > > >>>>>>>>>>>>>>>>> check on this but it also creates friction.) > > >>>>>>>>>>>>>>>>> 2. sink.partitioner: to make the semantics right, we > > >>>> need to > > >>>>>>>>> make > > >>>>>>>>>>>> sure > > >>>>>>>>>>>>>>>> all > > >>>>>>>>>>>>>>>>> the updates on the same key are written to > > >>>>>>>>>>>>>>>>> the same kafka partition, such we should force to use a > > >>>> hash > > >>>>>> by > > >>>>>>>>>> key > > >>>>>>>>>>>>>>>>> partition inside such table. Again, this has conflicts > > >>>>>>>>>>>>>>>>> and creates friction with current user options. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> The above things are solvable, though not perfect or > > >> most > > >>>>> user > > >>>>>>>>>>>>>> friendly. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Let's take a look at the reading side. The keyed kafka > > >>>> table > > >>>>>>>>>>> contains > > >>>>>>>>>>>>>> two > > >>>>>>>>>>>>>>>>> kinds of messages: upsert or deletion. What upsert > > >>>>>>>>>>>>>>>>> means is "If the key doesn't exist yet, it's an insert > > >>>>> record. > > >>>>>>>>>>>>>> Otherwise > > >>>>>>>>>>>>>>>>> it's an update record". For the sake of correctness or > > >>>>>>>>>>>>>>>>> simplicity, the Flink SQL engine also needs such > > >>>>> information. > > >>>>>>>>> If > > >>>>>>>>>> we > > >>>>>>>>>>>>>>>>> interpret all messages to "update record", some queries > > >>>> or > > >>>>>>>>>>>>>>>>> operators may not work properly. It's weird to see an > > >>>> update > > >>>>>>>>>> record > > >>>>>>>>>>>> but > > >>>>>>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>> haven't seen the insert record before. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> So what Flink should do is after reading out the > > >> records > > >>>>> from > > >>>>>>>>>> such > > >>>>>>>>>>>>>> table, > > >>>>>>>>>>>>>>>>> it needs to create a state to record which messages > > >> have > > >>>>>>>>>>>>>>>>> been seen and then generate the correct row type > > >>>>>>>>> correspondingly. > > >>>>>>>>>>>> This > > >>>>>>>>>>>>>>>> kind > > >>>>>>>>>>>>>>>>> of couples the state and the data of the message > > >>>>>>>>>>>>>>>>> queue, and it also creates conflicts with current kafka > > >>>>>>>>>> connector. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Think about if users suspend a running job (which > > >>>> contains > > >>>>>> some > > >>>>>>>>>>>> reading > > >>>>>>>>>>>>>>>>> state now), and then change the start offset of the > > >>>> reader. > > >>>>>>>>>>>>>>>>> By changing the reading offset, it actually change the > > >>>> whole > > >>>>>>>>>> story > > >>>>>>>>>>> of > > >>>>>>>>>>>>>>>>> "which records should be insert messages and which > > >>>> records > > >>>>>>>>>>>>>>>>> should be update messages). And it will also make Flink > > >>>> to > > >>>>>> deal > > >>>>>>>>>>> with > > >>>>>>>>>>>>>>>>> another weird situation that it might receive a > > >> deletion > > >>>>>>>>>>>>>>>>> on a non existing message. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> We were unsatisfied with all the frictions and > > >> conflicts > > >>>> it > > >>>>>>>>> will > > >>>>>>>>>>>> create > > >>>>>>>>>>>>>>>> if > > >>>>>>>>>>>>>>>>> we enable the "upsert & deletion" support to the > > >> current > > >>>>> kafka > > >>>>>>>>>>>>>>>>> connector. And later we begin to realize that we > > >>>> shouldn't > > >>>>>>>>> treat > > >>>>>>>>>> it > > >>>>>>>>>>>> as > > >>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>> normal message queue, but should treat it as a changing > > >>>>> keyed > > >>>>>>>>>>>>>>>>> table. We should be able to always get the whole data > > >> of > > >>>>> such > > >>>>>>>>>> table > > >>>>>>>>>>>> (by > > >>>>>>>>>>>>>>>>> disabling the start offset option) and we can also read > > >>>> the > > >>>>>>>>>>>>>>>>> changelog out of such table. It's like a HBase table > > >> with > > >>>>>>>>> binlog > > >>>>>>>>>>>>>> support > > >>>>>>>>>>>>>>>>> but doesn't have random access capability (which can be > > >>>>>>>>> fulfilled > > >>>>>>>>>>>>>>>>> by Flink's state). > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> So our intention was instead of telling and persuading > > >>>> users > > >>>>>>>>> what > > >>>>>>>>>>>> kind > > >>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>> options they should or should not use by extending > > >>>>>>>>>>>>>>>>> current kafka connector when enable upsert support, we > > >>>> are > > >>>>>>>>>> actually > > >>>>>>>>>>>>>>>> create > > >>>>>>>>>>>>>>>>> a whole new and different connector that has total > > >>>>>>>>>>>>>>>>> different abstractions in SQL layer, and should be > > >>>> treated > > >>>>>>>>>> totally > > >>>>>>>>>>>>>>>>> different with current kafka connector. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Hope this can clarify some of the concerns. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>> Kurt > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang < > > >>>>>>>>> fskm...@gmail.com > > >>>>>>>>>>> > > >>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Hi devs, > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> As many people are still confused about the difference > > >>>>> option > > >>>>>>>>>>>>>>>> behaviours > > >>>>>>>>>>>>>>>>>> between the Kafka connector and KTable connector, Jark > > >>>> and > > >>>>> I > > >>>>>>>>>> list > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> differences in the doc[1]. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>> Shengkai > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> [1] > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >> > > > https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二 > > >>>>> 下午12:05写道: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Hi Konstantin, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Thanks for your reply. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> It uses the "kafka" connector and does not specify a > > >>>>>> primary > > >>>>>>>>>>> key. > > >>>>>>>>>>>>>>>>>>> The dimensional table `users` is a ktable connector > > >>>> and we > > >>>>>>>>> can > > >>>>>>>>>>>>>>>> specify > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> pk on the KTable. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Will it possible to use a "ktable" as a dimensional > > >>>> table > > >>>>>> in > > >>>>>>>>>>>>>>>> FLIP-132 > > >>>>>>>>>>>>>>>>>>> Yes. We can specify the watermark on the KTable and > > >> it > > >>>> can > > >>>>>> be > > >>>>>>>>>>> used > > >>>>>>>>>>>>>>>> as a > > >>>>>>>>>>>>>>>>>>> dimension table in temporal join. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Introduce a new connector vs introduce a new > > >> property > > >>>>>>>>>>>>>>>>>>> The main reason behind is that the KTable connector > > >>>> almost > > >>>>>>>>> has > > >>>>>>>>>> no > > >>>>>>>>>>>>>>>>> common > > >>>>>>>>>>>>>>>>>>> options with the Kafka connector. The options that > > >> can > > >>>> be > > >>>>>>>>>> reused > > >>>>>>>>>>> by > > >>>>>>>>>>>>>>>>>> KTable > > >>>>>>>>>>>>>>>>>>> connectors are 'topic', > > >> 'properties.bootstrap.servers' > > >>>> and > > >>>>>>>>>>>>>>>>>>> 'value.fields-include' . We can't set cdc format for > > >>>>>>>>>> 'key.format' > > >>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>> 'value.format' in KTable connector now, which is > > >>>>> available > > >>>>>>>>> in > > >>>>>>>>>>>> Kafka > > >>>>>>>>>>>>>>>>>>> connector. Considering the difference between the > > >>>> options > > >>>>> we > > >>>>>>>>>> can > > >>>>>>>>>>>> use, > > >>>>>>>>>>>>>>>>>> it's > > >>>>>>>>>>>>>>>>>>> more suitable to introduce an another connector > > >> rather > > >>>>> than > > >>>>>> a > > >>>>>>>>>>>>>>>> property. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> We are also fine to use "compacted-kafka" as the name > > >>>> of > > >>>>> the > > >>>>>>>>>> new > > >>>>>>>>>>>>>>>>>>> connector. What do you think? > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>> Shengkai > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一 > > >>>>>>>>> 下午10:15写道: > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Hi Shengkai, > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Thank you for driving this effort. I believe this a > > >>>> very > > >>>>>>>>>>> important > > >>>>>>>>>>>>>>>>>> feature > > >>>>>>>>>>>>>>>>>>>> for many users who use Kafka and Flink SQL > > >> together. A > > >>>>> few > > >>>>>>>>>>>> questions > > >>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>> thoughts: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> * Is your example "Use KTable as a > > >> reference/dimension > > >>>>>>>>> table" > > >>>>>>>>>>>>>>>> correct? > > >>>>>>>>>>>>>>>>>> It > > >>>>>>>>>>>>>>>>>>>> uses the "kafka" connector and does not specify a > > >>>> primary > > >>>>>>>>> key. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> * Will it be possible to use a "ktable" table > > >> directly > > >>>>> as a > > >>>>>>>>>>>>>>>>> dimensional > > >>>>>>>>>>>>>>>>>>>> table in temporal join (*based on event time*) > > >>>>> (FLIP-132)? > > >>>>>>>>>> This > > >>>>>>>>>>> is > > >>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>> completely clear to me from the FLIP. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> * I'd personally prefer not to introduce a new > > >>>> connector > > >>>>>> and > > >>>>>>>>>>>> instead > > >>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>> extend the Kafka connector. We could add an > > >> additional > > >>>>>>>>>> property > > >>>>>>>>>>>>>>>>>>>> "compacted" > > >>>>>>>>>>>>>>>>>>>> = "true"|"false". If it is set to "true", we can add > > >>>>>>>>>> additional > > >>>>>>>>>>>>>>>>>> validation > > >>>>>>>>>>>>>>>>>>>> logic (e.g. "scan.startup.mode" can not be set, > > >>>> primary > > >>>>> key > > >>>>>>>>>>>>>>>> required, > > >>>>>>>>>>>>>>>>>>>> etc.). If we stick to a separate connector I'd not > > >>>> call > > >>>>> it > > >>>>>>>>>>>> "ktable", > > >>>>>>>>>>>>>>>>> but > > >>>>>>>>>>>>>>>>>>>> rather "compacted-kafka" or similar. KTable seems to > > >>>>> carry > > >>>>>>>>>> more > > >>>>>>>>>>>>>>>>> implicit > > >>>>>>>>>>>>>>>>>>>> meaning than we want to imply here. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> * I agree that this is not a bounded source. If we > > >>>> want > > >>>>> to > > >>>>>>>>>>>> support a > > >>>>>>>>>>>>>>>>>>>> bounded mode, this is an orthogonal concern that > > >> also > > >>>>>>>>> applies > > >>>>>>>>>> to > > >>>>>>>>>>>>>>>> other > > >>>>>>>>>>>>>>>>>>>> unbounded sources. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Konstantin > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu < > > >>>>> imj...@gmail.com> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Hi Danny, > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> First of all, we didn't introduce any concepts from > > >>>> KSQL > > >>>>>>>>>> (e.g. > > >>>>>>>>>>>>>>>>> Stream > > >>>>>>>>>>>>>>>>>> vs > > >>>>>>>>>>>>>>>>>>>>> Table notion). > > >>>>>>>>>>>>>>>>>>>>> This new connector will produce a changelog stream, > > >>>> so > > >>>>>> it's > > >>>>>>>>>>> still > > >>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>> dynamic > > >>>>>>>>>>>>>>>>>>>>> table and doesn't conflict with Flink core > > >> concepts. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> The "ktable" is just a connector name, we can also > > >>>> call > > >>>>> it > > >>>>>>>>>>>>>>>>>>>>> "compacted-kafka" or something else. > > >>>>>>>>>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can > > >>>>> migrate > > >>>>>>>>> to > > >>>>>>>>>>>>>>>> Flink > > >>>>>>>>>>>>>>>>>> SQL > > >>>>>>>>>>>>>>>>>>>>> easily. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Regarding to why introducing a new connector vs a > > >> new > > >>>>>>>>>> property > > >>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>> existing > > >>>>>>>>>>>>>>>>>>>>> kafka connector: > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> I think the main reason is that we want to have a > > >>>> clear > > >>>>>>>>>>>> separation > > >>>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>> such > > >>>>>>>>>>>>>>>>>>>>> two use cases, because they are very different. > > >>>>>>>>>>>>>>>>>>>>> We also listed reasons in the FLIP, including: > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> 1) It's hard to explain what's the behavior when > > >>>> users > > >>>>>>>>>> specify > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> start > > >>>>>>>>>>>>>>>>>>>>> offset from a middle position (e.g. how to process > > >>>> non > > >>>>>>>>> exist > > >>>>>>>>>>>>>>>> delete > > >>>>>>>>>>>>>>>>>>>>> events). > > >>>>>>>>>>>>>>>>>>>>> It's dangerous if users do that. So we > don't > > >>>>>> provide > > >>>>>>>>>> the > > >>>>>>>>>>>>>>>> offset > > >>>>>>>>>>>>>>>>>>>> option > > >>>>>>>>>>>>>>>>>>>>> in the new connector at the moment. > > >>>>>>>>>>>>>>>>>>>>> 2) It's a different perspective/abstraction on the > > >>>> same > > >>>>>>>>> kafka > > >>>>>>>>>>>>>>>> topic > > >>>>>>>>>>>>>>>>>>>> (append > > >>>>>>>>>>>>>>>>>>>>> vs. upsert). It would be easier to understand if we > > >>>> can > > >>>>>>>>>>> separate > > >>>>>>>>>>>>>>>>> them > > >>>>>>>>>>>>>>>>>>>>> instead of mixing them in one connector. > The > > >>>> new > > >>>>>>>>>>> connector > > >>>>>>>>>>>>>>>>>> requires > > >>>>>>>>>>>>>>>>>>>>> hash sink partitioner, primary key declared, > > >> regular > > >>>>>>>>> format. > > >>>>>>>>>>>>>>>>>>>>> If we mix them in one connector, it might > be > > >>>>>>>>> confusing > > >>>>>>>>>>> how > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> use > > >>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>> options correctly. > > >>>>>>>>>>>>>>>>>>>>> 3) The semantic of the KTable connector is just the > > >>>> same > > >>>>>> as > > >>>>>>>>>>>> KTable > > >>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>> Kafka > > >>>>>>>>>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and > > >> KSQL > > >>>>>> users. > > >>>>>>>>>>>>>>>>>>>>> We have seen several questions in the > > >> mailing > > >>>>> list > > >>>>>>>>>> asking > > >>>>>>>>>>>> how > > >>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>> model > > >>>>>>>>>>>>>>>>>>>>> a KTable and how to join a KTable in Flink SQL. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>>> Jark > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 19:53, Jark Wu < > > >>>> imj...@gmail.com > > >>>>>> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Hi Jingsong, > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> As the FLIP describes, "KTable connector produces > > >> a > > >>>>>>>>>> changelog > > >>>>>>>>>>>>>>>>>> stream, > > >>>>>>>>>>>>>>>>>>>>>> where each data record represents an update or > > >>>> delete > > >>>>>>>>>> event.". > > >>>>>>>>>>>>>>>>>>>>>> Therefore, a ktable source is an unbounded stream > > >>>>> source. > > >>>>>>>>>>>>>>>>> Selecting > > >>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>> ktable source is similar to selecting a kafka > > >> source > > >>>>> with > > >>>>>>>>>>>>>>>>>>>> debezium-json > > >>>>>>>>>>>>>>>>>>>>>> format > > >>>>>>>>>>>>>>>>>>>>>> that it never ends and the results are > > >> continuously > > >>>>>>>>> updated. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> It's possible to have a bounded ktable source in > > >> the > > >>>>>>>>> future, > > >>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>> example, > > >>>>>>>>>>>>>>>>>>>>>> add an option 'bounded=true' or 'end-offset=xxx'. > > >>>>>>>>>>>>>>>>>>>>>> In this way, the ktable will produce a bounded > > >>>>> changelog > > >>>>>>>>>>> stream. > > >>>>>>>>>>>>>>>>>>>>>> So I think this can be a compatible feature in the > > >>>>>> future. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> I don't think we should associate with ksql > > >> related > > >>>>>>>>>> concepts. > > >>>>>>>>>>>>>>>>>>>> Actually, > > >>>>>>>>>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>>>>>>> didn't introduce any concepts from KSQL (e.g. > > >>>> Stream vs > > >>>>>>>>>> Table > > >>>>>>>>>>>>>>>>>> notion). > > >>>>>>>>>>>>>>>>>>>>>> The "ktable" is just a connector name, we can also > > >>>> call > > >>>>>> it > > >>>>>>>>>>>>>>>>>>>>>> "compacted-kafka" or something else. > > >>>>>>>>>>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can > > >>>>>> migrate > > >>>>>>>>>> to > > >>>>>>>>>>>>>>>>> Flink > > >>>>>>>>>>>>>>>>>>>> SQL > > >>>>>>>>>>>>>>>>>>>>>> easily. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Regarding the "value.fields-include", this is an > > >>>> option > > >>>>>>>>>>>>>>>> introduced > > >>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>>> FLIP-107 for Kafka connector. > > >>>>>>>>>>>>>>>>>>>>>> I think we should keep the same behavior with the > > >>>> Kafka > > >>>>>>>>>>>>>>>> connector. > > >>>>>>>>>>>>>>>>>> I'm > > >>>>>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>> sure what's the default behavior of KSQL. > > >>>>>>>>>>>>>>>>>>>>>> But I guess it also stores the keys in value from > > >>>> this > > >>>>>>>>>> example > > >>>>>>>>>>>>>>>>> docs > > >>>>>>>>>>>>>>>>>>>> (see > > >>>>>>>>>>>>>>>>>>>>>> the "users_original" table) [1]. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>>>> Jark > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> [1]: > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >> > > > https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 18:17, Danny Chan < > > >>>>>>>>>>> yuzhao....@gmail.com> > > >>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> The concept seems conflicts with the Flink > > >>>> abstraction > > >>>>>>>>>>> “dynamic > > >>>>>>>>>>>>>>>>>>>> table”, > > >>>>>>>>>>>>>>>>>>>>>>> in Flink we see both “stream” and “table” as a > > >>>> dynamic > > >>>>>>>>>> table, > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> I think we should make clear first how to express > > >>>>> stream > > >>>>>>>>>> and > > >>>>>>>>>>>>>>>>> table > > >>>>>>>>>>>>>>>>>>>>>>> specific features on one “dynamic table”, > > >>>>>>>>>>>>>>>>>>>>>>> it is more natural for KSQL because KSQL takes > > >>>> stream > > >>>>>> and > > >>>>>>>>>>> table > > >>>>>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>>>>>>>> different abstractions for representing > > >>>> collections. > > >>>>> In > > >>>>>>>>>> KSQL, > > >>>>>>>>>>>>>>>>> only > > >>>>>>>>>>>>>>>>>>>>> table is > > >>>>>>>>>>>>>>>>>>>>>>> mutable and can have a primary key. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Does this connector belongs to the “table” scope > > >> or > > >>>>>>>>>> “stream” > > >>>>>>>>>>>>>>>>> scope > > >>>>>>>>>>>>>>>>>> ? > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Some of the concepts (such as the primary key on > > >>>>> stream) > > >>>>>>>>>>> should > > >>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>>>> suitable for all the connectors, not just Kafka, > > >>>>>>>>> Shouldn’t > > >>>>>>>>>>> this > > >>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>> an > > >>>>>>>>>>>>>>>>>>>>>>> extension of existing Kafka connector instead of > > >> a > > >>>>>>>>> totally > > >>>>>>>>>>> new > > >>>>>>>>>>>>>>>>>>>>> connector ? > > >>>>>>>>>>>>>>>>>>>>>>> What about the other connectors ? > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Because this touches the core abstraction of > > >>>> Flink, we > > >>>>>>>>>> better > > >>>>>>>>>>>>>>>>> have > > >>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>>> top-down overall design, following the KSQL > > >>>> directly > > >>>>> is > > >>>>>>>>> not > > >>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> answer. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> P.S. For the source > > >>>>>>>>>>>>>>>>>>>>>>>> Shouldn’t this be an extension of existing Kafka > > >>>>>>>>> connector > > >>>>>>>>>>>>>>>>>> instead > > >>>>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>>> totally new connector ? > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> How could we achieve that (e.g. set up the > > >>>> parallelism > > >>>>>>>>>>>>>>>>> correctly) ? > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>>>>> Danny Chan > > >>>>>>>>>>>>>>>>>>>>>>> 在 2020年10月19日 +0800 PM5:17,Jingsong Li < > > >>>>>>>>>>> jingsongl...@gmail.com > > >>>>>>>>>>>>>>>>>>> ,写道: > > >>>>>>>>>>>>>>>>>>>>>>>> Thanks Shengkai for your proposal. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> +1 for this feature. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> Future Work: Support bounded KTable source > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> I don't think it should be a future work, I > > >> think > > >>>> it > > >>>>> is > > >>>>>>>>>> one > > >>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> important concepts of this FLIP. We need to > > >>>>> understand > > >>>>>>>>> it > > >>>>>>>>>>>>>>>> now. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Intuitively, a ktable in my opinion is a bounded > > >>>>> table > > >>>>>>>>>>> rather > > >>>>>>>>>>>>>>>>>> than > > >>>>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>>>> stream, so select should produce a bounded table > > >>>> by > > >>>>>>>>>> default. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> I think we can list Kafka related knowledge, > > >>>> because > > >>>>>> the > > >>>>>>>>>>> word > > >>>>>>>>>>>>>>>>>>>> `ktable` > > >>>>>>>>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>>>>> easy to associate with ksql related concepts. > > >> (If > > >>>>>>>>>> possible, > > >>>>>>>>>>>>>>>>> it's > > >>>>>>>>>>>>>>>>>>>>> better > > >>>>>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>> unify with it) > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> What do you think? > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> value.fields-include > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> What about the default behavior of KSQL? > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>>>>>> Jingsong > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang < > > >>>>>>>>>>>>>>>>> fskm...@gmail.com > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> Hi, devs. > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> Jark and I want to start a new FLIP to > > >> introduce > > >>>> the > > >>>>>>>>>> KTable > > >>>>>>>>>>>>>>>>>>>>>>> connector. The > > >>>>>>>>>>>>>>>>>>>>>>>>> KTable is a shortcut of "Kafka Table", it also > > >>>> has > > >>>>> the > > >>>>>>>>>> same > > >>>>>>>>>>>>>>>>>>>>> semantics > > >>>>>>>>>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>>>>>>>>> the KTable notion in Kafka Stream. > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> FLIP-149: > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> Currently many users have expressed their needs > > >>>> for > > >>>>>> the > > >>>>>>>>>>>>>>>>> upsert > > >>>>>>>>>>>>>>>>>>>> Kafka > > >>>>>>>>>>>>>>>>>>>>>>> by > > >>>>>>>>>>>>>>>>>>>>>>>>> mail lists and issues. The KTable connector has > > >>>>>> several > > >>>>>>>>>>>>>>>>>> benefits > > >>>>>>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>>>>> users: > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> 1. Users are able to interpret a compacted > > >> Kafka > > >>>>> Topic > > >>>>>>>>> as > > >>>>>>>>>>>>>>>> an > > >>>>>>>>>>>>>>>>>>>> upsert > > >>>>>>>>>>>>>>>>>>>>>>> stream > > >>>>>>>>>>>>>>>>>>>>>>>>> in Apache Flink. And also be able to write a > > >>>>> changelog > > >>>>>>>>>>>>>>>> stream > > >>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>> Kafka > > >>>>>>>>>>>>>>>>>>>>>>>>> (into a compacted topic). > > >>>>>>>>>>>>>>>>>>>>>>>>> 2. As a part of the real time pipeline, store > > >>>> join > > >>>>> or > > >>>>>>>>>>>>>>>>> aggregate > > >>>>>>>>>>>>>>>>>>>>>>> result (may > > >>>>>>>>>>>>>>>>>>>>>>>>> contain updates) into a Kafka topic for further > > >>>>>>>>>>>>>>>> calculation; > > >>>>>>>>>>>>>>>>>>>>>>>>> 3. The semantic of the KTable connector is just > > >>>> the > > >>>>>>>>> same > > >>>>>>>>>> as > > >>>>>>>>>>>>>>>>>>>> KTable > > >>>>>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>>>> Kafka > > >>>>>>>>>>>>>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and > > >>>> KSQL > > >>>>>>>>>> users. > > >>>>>>>>>>>>>>>>> We > > >>>>>>>>>>>>>>>>>>>> have > > >>>>>>>>>>>>>>>>>>>>>>> seen > > >>>>>>>>>>>>>>>>>>>>>>>>> several questions in the mailing list asking > > >> how > > >>>> to > > >>>>>>>>>> model a > > >>>>>>>>>>>>>>>>>>>> KTable > > >>>>>>>>>>>>>>>>>>>>>>> and how > > >>>>>>>>>>>>>>>>>>>>>>>>> to join a KTable in Flink SQL. > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> We hope it can expand the usage of the Flink > > >> with > > >>>>>>>>> Kafka. > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> I'm looking forward to your feedback. > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>>>>>>> Shengkai > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>>>>>>>>>> Best, Jingsong Lee > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Konstantin Knauf > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> https://twitter.com/snntrable > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> https://github.com/knaufk > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Konstantin Knauf > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> https://twitter.com/snntrable > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> https://github.com/knaufk > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> -- > > >>>>>>>>>>> > > >>>>>>>>>>> Seth Wiesman | Solutions Architect > > >>>>>>>>>>> > > >>>>>>>>>>> +1 314 387 1463 > > >>>>>>>>>>> > > >>>>>>>>>>> <https://www.ververica.com/> > > >>>>>>>>>>> > > >>>>>>>>>>> Follow us @VervericaData > > >>>>>>>>>>> > > >>>>>>>>>>> -- > > >>>>>>>>>>> > > >>>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache > > >>>>> Flink > > >>>>>>>>>>> Conference > > >>>>>>>>>>> > > >>>>>>>>>>> Stream Processing | Event Driven | Real Time > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>> > > >>>>>> -- > > >>>>>> Best, Jingsong Lee > > >>>>>> > > >>>>> > > >>>> > > >>>> > > >>>> -- > > >>>> Best, Jingsong Lee > > >>>> > > >>> > > >> > > > > > > > > > -- > Best, Jingsong Lee >