I see, I understand what you mean is avoiding the loss of historical data Logically, another option is never clean up, so don't have to turn on compact
I am OK with the implementation, It's that feeling shouldn't be a logical limitation Best, Jingsong On Fri, Oct 23, 2020 at 4:09 PM Kurt Young <ykt...@gmail.com> wrote: > To be precise, it means the Kakfa topic should set the configuration > "cleanup.policy" to "compact" not "delete". > > Best, > Kurt > > > On Fri, Oct 23, 2020 at 4:01 PM Jingsong Li <jingsongl...@gmail.com> > wrote: > > > I just notice there is a limitation in the FLIP: > > > > > Generally speaking, the underlying topic of the upsert-kafka source > must > > be compacted. Besides, the underlying topic must have all the data with > the > > same key in the same partition, otherwise, the result will be wrong. > > > > According to my understanding, this is not accurate? Compact is an > > optimization, not a limitation. It depends on users. > > > > I don't want to stop voting, just want to make it clear. > > > > Best, > > Jingsong > > > > On Fri, Oct 23, 2020 at 3:16 PM Timo Walther <twal...@apache.org> wrote: > > > > > +1 for voting > > > > > > Regards, > > > Timo > > > > > > On 23.10.20 09:07, Jark Wu wrote: > > > > Thanks Shengkai! > > > > > > > > +1 to start voting. > > > > > > > > Best, > > > > Jark > > > > > > > > On Fri, 23 Oct 2020 at 15:02, Shengkai Fang <fskm...@gmail.com> > wrote: > > > > > > > >> Add one more message, I have already updated the FLIP[1]. > > > >> > > > >> [1] > > > >> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector > > > >> > > > >> Shengkai Fang <fskm...@gmail.com> 于2020年10月23日周五 下午2:55写道: > > > >> > > > >>> Hi, all. > > > >>> It seems we have reached a consensus on the FLIP. If no one has > other > > > >>> objections, I would like to start the vote for FLIP-149. > > > >>> > > > >>> Best, > > > >>> Shengkai > > > >>> > > > >>> Jingsong Li <jingsongl...@gmail.com> 于2020年10月23日周五 下午2:25写道: > > > >>> > > > >>>> Thanks for explanation, > > > >>>> > > > >>>> I am OK for `upsert`. Yes, Its concept has been accepted by many > > > >> systems. > > > >>>> > > > >>>> Best, > > > >>>> Jingsong > > > >>>> > > > >>>> On Fri, Oct 23, 2020 at 12:38 PM Jark Wu <imj...@gmail.com> > wrote: > > > >>>> > > > >>>>> Hi Timo, > > > >>>>> > > > >>>>> I have some concerns about `kafka-cdc`, > > > >>>>> 1) cdc is an abbreviation of Change Data Capture which is > commonly > > > >> used > > > >>>> for > > > >>>>> databases, not for message queues. > > > >>>>> 2) usually, cdc produces full content of changelog, including > > > >>>>> UPDATE_BEFORE, however "upsert kafka" doesn't > > > >>>>> 3) `kafka-cdc` sounds like a natively support for `debezium-json` > > > >>>> format, > > > >>>>> however, it is not and even we don't want > > > >>>>> "upsert kafka" to support "debezium-json" > > > >>>>> > > > >>>>> > > > >>>>> Hi Jingsong, > > > >>>>> > > > >>>>> I think the terminology of "upsert" is fine, because Kafka also > > uses > > > >>>>> "upsert" to define such behavior in their official documentation > > [1]: > > > >>>>> > > > >>>>>> a data record in a changelog stream is interpreted as an UPSERT > > aka > > > >>>>> INSERT/UPDATE > > > >>>>> > > > >>>>> Materialize uses the "UPSERT" keyword to define such behavior too > > > [2]. > > > >>>>> Users have been requesting such feature using "upsert kafka" > > > >>>> terminology in > > > >>>>> user mailing lists [3][4]. > > > >>>>> Many other systems support "UPSERT" statement natively, such as > > > impala > > > >>>> [5], > > > >>>>> SAP [6], Phoenix [7], Oracle NoSQL [8], etc.. > > > >>>>> > > > >>>>> Therefore, I think we don't need to be afraid of introducing > > "upsert" > > > >>>>> terminology, it is widely accepted by users. > > > >>>>> > > > >>>>> Best, > > > >>>>> Jark > > > >>>>> > > > >>>>> > > > >>>>> [1]: > > > >>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable > > > >>>>> [2]: > > > >>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic > > > >>>>> [3]: > > > >>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-materialized-upsert-tables-td18482.html#a18503 > > > >>>>> [4]: > > > >>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > http://apache-flink.147419.n8.nabble.com/Kafka-Sink-AppendStreamTableSink-doesn-t-support-consuming-update-changes-td5959.html > > > >>>>> [5]: > > > >>>> > https://impala.apache.org/docs/build/html/topics/impala_upsert.html > > > >>>>> [6]: > > > >>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > https://help.sap.com/viewer/7c78579ce9b14a669c1f3295b0d8ca16/Cloud/en-US/ea8b6773be584203bcd99da76844c5ed.html > > > >>>>> [7]: https://phoenix.apache.org/atomic_upsert.html > > > >>>>> [8]: > > > >>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > https://docs.oracle.com/en/database/other-databases/nosql-database/18.3/sqlfornosql/adding-table-rows-using-insert-and-upsert-statements.html > > > >>>>> > > > >>>>> On Fri, 23 Oct 2020 at 10:36, Jingsong Li < > jingsongl...@gmail.com> > > > >>>> wrote: > > > >>>>> > > > >>>>>> The `kafka-cdc` looks good to me. > > > >>>>>> We can even give options to indicate whether to turn on compact, > > > >>>> because > > > >>>>>> compact is just an optimization? > > > >>>>>> > > > >>>>>> - ktable let me think about KSQL. > > > >>>>>> - kafka-compacted it is not just compacted, more than that, it > > still > > > >>>> has > > > >>>>>> the ability of CDC > > > >>>>>> - upsert-kafka , upsert is back, and I don't really want to see > it > > > >>>> again > > > >>>>>> since we have CDC > > > >>>>>> > > > >>>>>> Best, > > > >>>>>> Jingsong > > > >>>>>> > > > >>>>>> On Fri, Oct 23, 2020 at 2:21 AM Timo Walther < > twal...@apache.org> > > > >>>> wrote: > > > >>>>>> > > > >>>>>>> Hi Jark, > > > >>>>>>> > > > >>>>>>> I would be fine with `connector=upsert-kafka`. Another idea > would > > > >>>> be to > > > >>>>>>> align the name to other available Flink connectors [1]: > > > >>>>>>> > > > >>>>>>> `connector=kafka-cdc`. > > > >>>>>>> > > > >>>>>>> Regards, > > > >>>>>>> Timo > > > >>>>>>> > > > >>>>>>> [1] https://github.com/ververica/flink-cdc-connectors > > > >>>>>>> > > > >>>>>>> On 22.10.20 17:17, Jark Wu wrote: > > > >>>>>>>> Another name is "connector=upsert-kafka', I think this can > solve > > > >>>>> Timo's > > > >>>>>>>> concern on the "compacted" word. > > > >>>>>>>> > > > >>>>>>>> Materialize also uses "ENVELOPE UPSERT" [1] keyword to > identify > > > >>>> such > > > >>>>>>> kafka > > > >>>>>>>> sources. > > > >>>>>>>> I think "upsert" is a well-known terminology widely used in > many > > > >>>>>> systems > > > >>>>>>>> and matches the > > > >>>>>>>> behavior of how we handle the kafka messages. > > > >>>>>>>> > > > >>>>>>>> What do you think? > > > >>>>>>>> > > > >>>>>>>> Best, > > > >>>>>>>> Jark > > > >>>>>>>> > > > >>>>>>>> [1]: > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> On Thu, 22 Oct 2020 at 22:53, Kurt Young <ykt...@gmail.com> > > > >>>> wrote: > > > >>>>>>>> > > > >>>>>>>>> Good validation messages can't solve the broken user > > > >> experience, > > > >>>>>>> especially > > > >>>>>>>>> that > > > >>>>>>>>> such update mode option will implicitly make half of current > > > >>>> kafka > > > >>>>>>> options > > > >>>>>>>>> invalid or doesn't > > > >>>>>>>>> make sense. > > > >>>>>>>>> > > > >>>>>>>>> Best, > > > >>>>>>>>> Kurt > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> On Thu, Oct 22, 2020 at 10:31 PM Jark Wu <imj...@gmail.com> > > > >>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>>> Hi Timo, Seth, > > > >>>>>>>>>> > > > >>>>>>>>>> The default value "inserting" of "mode" might be not > suitable, > > > >>>>>>>>>> because "debezium-json" emits changelog messages which > include > > > >>>>>> updates. > > > >>>>>>>>>> > > > >>>>>>>>>> On Thu, 22 Oct 2020 at 22:10, Seth Wiesman < > > > >> s...@ververica.com> > > > >>>>>> wrote: > > > >>>>>>>>>> > > > >>>>>>>>>>> +1 for supporting upsert results into Kafka. > > > >>>>>>>>>>> > > > >>>>>>>>>>> I have no comments on the implementation details. > > > >>>>>>>>>>> > > > >>>>>>>>>>> As far as configuration goes, I tend to favor Timo's option > > > >>>> where > > > >>>>> we > > > >>>>>>>>> add > > > >>>>>>>>>> a > > > >>>>>>>>>>> "mode" property to the existing Kafka table with default > > > >> value > > > >>>>>>>>>> "inserting". > > > >>>>>>>>>>> If the mode is set to "updating" then the validation > changes > > > >> to > > > >>>>> the > > > >>>>>>> new > > > >>>>>>>>>>> requirements. I personally find it more intuitive than a > > > >>>> seperate > > > >>>>>>>>>>> connector, my fear is users won't understand its the same > > > >>>> physical > > > >>>>>>>>> kafka > > > >>>>>>>>>>> sink under the hood and it will lead to other confusion > like > > > >>>> does > > > >>>>> it > > > >>>>>>>>>> offer > > > >>>>>>>>>>> the same persistence guarantees? I think we are capable of > > > >>>> adding > > > >>>>>> good > > > >>>>>>>>>>> valdiation messaging that solves Jark and Kurts concerns. > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> On Thu, Oct 22, 2020 at 8:51 AM Timo Walther < > > > >>>> twal...@apache.org> > > > >>>>>>>>> wrote: > > > >>>>>>>>>>> > > > >>>>>>>>>>>> Hi Jark, > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> "calling it "kafka-compacted" can even remind users to > > > >> enable > > > >>>> log > > > >>>>>>>>>>>> compaction" > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> But sometimes users like to store a lineage of changes in > > > >>>> their > > > >>>>>>>>> topics. > > > >>>>>>>>>>>> Indepent of any ktable/kstream interpretation. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> I let the majority decide on this topic to not further > block > > > >>>> this > > > >>>>>>>>>>>> effort. But we might find a better name like: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> connector = kafka > > > >>>>>>>>>>>> mode = updating/inserting > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> OR > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> connector = kafka-updating > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> ... > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Regards, > > > >>>>>>>>>>>> Timo > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On 22.10.20 15:24, Jark Wu wrote: > > > >>>>>>>>>>>>> Hi Timo, > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Thanks for your opinions. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 1) Implementation > > > >>>>>>>>>>>>> We will have an stateful operator to generate INSERT and > > > >>>>>>>>>> UPDATE_BEFORE. > > > >>>>>>>>>>>>> This operator is keyby-ed (primary key as the shuffle > key) > > > >>>> after > > > >>>>>>>>> the > > > >>>>>>>>>>>> source > > > >>>>>>>>>>>>> operator. > > > >>>>>>>>>>>>> The implementation of this operator is very similar to > the > > > >>>>>> existing > > > >>>>>>>>>>>>> `DeduplicateKeepLastRowFunction`. > > > >>>>>>>>>>>>> The operator will register a value state using the > primary > > > >>>> key > > > >>>>>>>>> fields > > > >>>>>>>>>>> as > > > >>>>>>>>>>>>> keys. > > > >>>>>>>>>>>>> When the value state is empty under current key, we will > > > >> emit > > > >>>>>>>>> INSERT > > > >>>>>>>>>>> for > > > >>>>>>>>>>>>> the input row. > > > >>>>>>>>>>>>> When the value state is not empty under current key, we > > > >> will > > > >>>>> emit > > > >>>>>>>>>>>>> UPDATE_BEFORE using the row in state, > > > >>>>>>>>>>>>> and emit UPDATE_AFTER using the input row. > > > >>>>>>>>>>>>> When the input row is DELETE, we will clear state and > emit > > > >>>>> DELETE > > > >>>>>>>>>> row. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 2) new option vs new connector > > > >>>>>>>>>>>>>> We recently simplified the table options to a minimum > > > >>>> amount of > > > >>>>>>>>>>>>> characters to be as concise as possible in the DDL. > > > >>>>>>>>>>>>> I think this is the reason why we want to introduce a new > > > >>>>>>>>> connector, > > > >>>>>>>>>>>>> because we can simplify the options in DDL. > > > >>>>>>>>>>>>> For example, if using a new option, the DDL may look like > > > >>>> this: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> CREATE TABLE users ( > > > >>>>>>>>>>>>> user_id BIGINT, > > > >>>>>>>>>>>>> user_name STRING, > > > >>>>>>>>>>>>> user_level STRING, > > > >>>>>>>>>>>>> region STRING, > > > >>>>>>>>>>>>> PRIMARY KEY (user_id) NOT ENFORCED > > > >>>>>>>>>>>>> ) WITH ( > > > >>>>>>>>>>>>> 'connector' = 'kafka', > > > >>>>>>>>>>>>> 'model' = 'table', > > > >>>>>>>>>>>>> 'topic' = 'pageviews_per_region', > > > >>>>>>>>>>>>> 'properties.bootstrap.servers' = '...', > > > >>>>>>>>>>>>> 'properties.group.id' = 'testGroup', > > > >>>>>>>>>>>>> 'scan.startup.mode' = 'earliest', > > > >>>>>>>>>>>>> 'key.format' = 'csv', > > > >>>>>>>>>>>>> 'key.fields' = 'user_id', > > > >>>>>>>>>>>>> 'value.format' = 'avro', > > > >>>>>>>>>>>>> 'sink.partitioner' = 'hash' > > > >>>>>>>>>>>>> ); > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> If using a new connector, we can have a different default > > > >>>> value > > > >>>>>> for > > > >>>>>>>>>> the > > > >>>>>>>>>>>>> options and remove unnecessary options, > > > >>>>>>>>>>>>> the DDL can look like this which is much more concise: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> CREATE TABLE pageviews_per_region ( > > > >>>>>>>>>>>>> user_id BIGINT, > > > >>>>>>>>>>>>> user_name STRING, > > > >>>>>>>>>>>>> user_level STRING, > > > >>>>>>>>>>>>> region STRING, > > > >>>>>>>>>>>>> PRIMARY KEY (user_id) NOT ENFORCED > > > >>>>>>>>>>>>> ) WITH ( > > > >>>>>>>>>>>>> 'connector' = 'kafka-compacted', > > > >>>>>>>>>>>>> 'topic' = 'pageviews_per_region', > > > >>>>>>>>>>>>> 'properties.bootstrap.servers' = '...', > > > >>>>>>>>>>>>> 'key.format' = 'csv', > > > >>>>>>>>>>>>> 'value.format' = 'avro' > > > >>>>>>>>>>>>> ); > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> When people read `connector=kafka-compacted` they might > > > >> not > > > >>>>> know > > > >>>>>>>>>> that > > > >>>>>>>>>>> it > > > >>>>>>>>>>>>>> has ktable semantics. You don't need to enable log > > > >>>> compaction > > > >>>>> in > > > >>>>>>>>>> order > > > >>>>>>>>>>>>>> to use a KTable as far as I know. > > > >>>>>>>>>>>>> We don't need to let users know it has ktable semantics, > as > > > >>>>>>>>>> Konstantin > > > >>>>>>>>>>>>> mentioned this may carry more implicit > > > >>>>>>>>>>>>> meaning than we want to imply here. I agree users don't > > > >> need > > > >>>> to > > > >>>>>>>>>> enable > > > >>>>>>>>>>>> log > > > >>>>>>>>>>>>> compaction, but from the production perspective, > > > >>>>>>>>>>>>> log compaction should always be enabled if it is used in > > > >> this > > > >>>>>>>>>> purpose. > > > >>>>>>>>>>>>> Calling it "kafka-compacted" can even remind users to > > > >> enable > > > >>>> log > > > >>>>>>>>>>>> compaction. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> I don't agree to introduce "model = table/stream" option, > > > >> or > > > >>>>>>>>>>>>> "connector=kafka-table", > > > >>>>>>>>>>>>> because this means we are introducing Table vs Stream > > > >> concept > > > >>>>> from > > > >>>>>>>>>>> KSQL. > > > >>>>>>>>>>>>> However, we don't have such top-level concept in Flink > SQL > > > >>>> now, > > > >>>>>>>>> this > > > >>>>>>>>>>> will > > > >>>>>>>>>>>>> further confuse users. > > > >>>>>>>>>>>>> In Flink SQL, all the things are STREAM, the differences > > > >> are > > > >>>>>>>>> whether > > > >>>>>>>>>> it > > > >>>>>>>>>>>> is > > > >>>>>>>>>>>>> bounded or unbounded, > > > >>>>>>>>>>>>> whether it is insert-only or changelog. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>> Jark > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> On Thu, 22 Oct 2020 at 20:39, Timo Walther < > > > >>>> twal...@apache.org> > > > >>>>>>>>>> wrote: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Hi Shengkai, Hi Jark, > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> thanks for this great proposal. It is time to finally > > > >>>> connect > > > >>>>> the > > > >>>>>>>>>>>>>> changelog processor with a compacted Kafka topic. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> "The operator will produce INSERT rows, or additionally > > > >>>>> generate > > > >>>>>>>>>>>>>> UPDATE_BEFORE rows for the previous image, or produce > > > >> DELETE > > > >>>>> rows > > > >>>>>>>>>> with > > > >>>>>>>>>>>>>> all columns filled with values." > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Could you elaborate a bit on the implementation details > in > > > >>>> the > > > >>>>>>>>> FLIP? > > > >>>>>>>>>>> How > > > >>>>>>>>>>>>>> are UPDATE_BEFOREs are generated. How much state is > > > >>>> required to > > > >>>>>>>>>>> perform > > > >>>>>>>>>>>>>> this operation. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> From a conceptual and semantical point of view, I'm > > > >> fine > > > >>>>> with > > > >>>>>>>>> the > > > >>>>>>>>>>>>>> proposal. But I would like to share my opinion about how > > > >> we > > > >>>>>> expose > > > >>>>>>>>>>> this > > > >>>>>>>>>>>>>> feature: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> ktable vs kafka-compacted > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> I'm against having an additional connector like `ktable` > > > >> or > > > >>>>>>>>>>>>>> `kafka-compacted`. We recently simplified the table > > > >> options > > > >>>> to > > > >>>>> a > > > >>>>>>>>>>> minimum > > > >>>>>>>>>>>>>> amount of characters to be as concise as possible in the > > > >>>> DDL. > > > >>>>>>>>>>> Therefore, > > > >>>>>>>>>>>>>> I would keep the `connector=kafka` and introduce an > > > >>>> additional > > > >>>>>>>>>> option. > > > >>>>>>>>>>>>>> Because a user wants to read "from Kafka". And the "how" > > > >>>> should > > > >>>>>> be > > > >>>>>>>>>>>>>> determined in the lower options. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> When people read `connector=ktable` they might not know > > > >> that > > > >>>>> this > > > >>>>>>>>> is > > > >>>>>>>>>>>>>> Kafka. Or they wonder where `kstream` is? > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> When people read `connector=kafka-compacted` they might > > > >> not > > > >>>>> know > > > >>>>>>>>>> that > > > >>>>>>>>>>> it > > > >>>>>>>>>>>>>> has ktable semantics. You don't need to enable log > > > >>>> compaction > > > >>>>> in > > > >>>>>>>>>> order > > > >>>>>>>>>>>>>> to use a KTable as far as I know. Log compaction and > table > > > >>>>>>>>> semantics > > > >>>>>>>>>>> are > > > >>>>>>>>>>>>>> orthogonal topics. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> In the end we will need 3 types of information when > > > >>>> declaring a > > > >>>>>>>>>> Kafka > > > >>>>>>>>>>>>>> connector: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> CREATE TABLE ... WITH ( > > > >>>>>>>>>>>>>> connector=kafka -- Some information about > the > > > >>>>>> connector > > > >>>>>>>>>>>>>> end-offset = XXXX -- Some information about > the > > > >>>>>>>>> boundedness > > > >>>>>>>>>>>>>> model = table/stream -- Some information about > > > >>>>>>>>> interpretation > > > >>>>>>>>>>>>>> ) > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> We can still apply all the constraints mentioned in the > > > >>>> FLIP. > > > >>>>>> When > > > >>>>>>>>>>>>>> `model` is set to `table`. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> What do you think? > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Regards, > > > >>>>>>>>>>>>>> Timo > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On 21.10.20 14:19, Jark Wu wrote: > > > >>>>>>>>>>>>>>> Hi, > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> IMO, if we are going to mix them in one connector, > > > >>>>>>>>>>>>>>> 1) either users need to set some options to a specific > > > >>>> value > > > >>>>>>>>>>>> explicitly, > > > >>>>>>>>>>>>>>> e.g. "scan.startup.mode=earliest", > > > >> "sink.partitioner=hash", > > > >>>>>> etc.. > > > >>>>>>>>>>>>>>> This makes the connector awkward to use. Users may face > > > >> to > > > >>>> fix > > > >>>>>>>>>>> options > > > >>>>>>>>>>>>>> one > > > >>>>>>>>>>>>>>> by one according to the exception. > > > >>>>>>>>>>>>>>> Besides, in the future, it is still possible to use > > > >>>>>>>>>>>>>>> "sink.partitioner=fixed" (reduce network cost) if users > > > >> are > > > >>>>>> aware > > > >>>>>>>>>> of > > > >>>>>>>>>>>>>>> the partition routing, > > > >>>>>>>>>>>>>>> however, it's error-prone to have "fixed" as default > for > > > >>>>>>>>> compacted > > > >>>>>>>>>>>> mode. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> 2) or make those options a different default value when > > > >>>>>>>>>>>> "compacted=true". > > > >>>>>>>>>>>>>>> This would be more confusing and unpredictable if the > > > >>>> default > > > >>>>>>>>> value > > > >>>>>>>>>>> of > > > >>>>>>>>>>>>>>> options will change according to other options. > > > >>>>>>>>>>>>>>> What happens if we have a third mode in the future? > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> In terms of usage and options, it's very different from > > > >> the > > > >>>>>>>>>>>>>>> original "kafka" connector. > > > >>>>>>>>>>>>>>> It would be more handy to use and less fallible if > > > >>>> separating > > > >>>>>>>>> them > > > >>>>>>>>>>> into > > > >>>>>>>>>>>>>> two > > > >>>>>>>>>>>>>>> connectors. > > > >>>>>>>>>>>>>>> In the implementation layer, we can reuse code as much > as > > > >>>>>>>>> possible. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Therefore, I'm still +1 to have a new connector. > > > >>>>>>>>>>>>>>> The "kafka-compacted" name sounds good to me. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>> Jark > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf < > > > >>>>>>>>> kna...@apache.org> > > > >>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Hi Kurt, Hi Shengkai, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> thanks for answering my questions and the additional > > > >>>>>>>>>>> clarifications. I > > > >>>>>>>>>>>>>>>> don't have a strong opinion on whether to extend the > > > >>>> "kafka" > > > >>>>>>>>>>> connector > > > >>>>>>>>>>>>>> or > > > >>>>>>>>>>>>>>>> to introduce a new connector. So, from my perspective > > > >> feel > > > >>>>> free > > > >>>>>>>>> to > > > >>>>>>>>>>> go > > > >>>>>>>>>>>>>> with > > > >>>>>>>>>>>>>>>> a separate connector. If we do introduce a new > > > >> connector I > > > >>>>>>>>>> wouldn't > > > >>>>>>>>>>>>>> call it > > > >>>>>>>>>>>>>>>> "ktable" for aforementioned reasons (In addition, we > > > >> might > > > >>>>>>>>> suggest > > > >>>>>>>>>>>> that > > > >>>>>>>>>>>>>>>> there is also a "kstreams" connector for symmetry > > > >>>> reasons). I > > > >>>>>>>>>> don't > > > >>>>>>>>>>>>>> have a > > > >>>>>>>>>>>>>>>> good alternative name, though, maybe "kafka-compacted" > > > >> or > > > >>>>>>>>>>>>>>>> "compacted-kafka". > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Konstantin > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> On Wed, Oct 21, 2020 at 4:43 AM Kurt Young < > > > >>>> ykt...@gmail.com > > > >>>>>> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Hi all, > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> I want to describe the discussion process which drove > > > >> us > > > >>>> to > > > >>>>>>>>> have > > > >>>>>>>>>>> such > > > >>>>>>>>>>>>>>>>> conclusion, this might make some of > > > >>>>>>>>>>>>>>>>> the design choices easier to understand and keep > > > >>>> everyone on > > > >>>>>>>>> the > > > >>>>>>>>>>> same > > > >>>>>>>>>>>>>>>> page. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Back to the motivation, what functionality do we want > > > >> to > > > >>>>>>>>> provide > > > >>>>>>>>>> in > > > >>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>> first place? We got a lot of feedback and > > > >>>>>>>>>>>>>>>>> questions from mailing lists that people want to > write > > > >>>>>>>>>>>> Not-Insert-Only > > > >>>>>>>>>>>>>>>>> messages into kafka. They might be > > > >>>>>>>>>>>>>>>>> intentional or by accident, e.g. wrote an > non-windowed > > > >>>>>>>>> aggregate > > > >>>>>>>>>>>> query > > > >>>>>>>>>>>>>> or > > > >>>>>>>>>>>>>>>>> non-windowed left outer join. And > > > >>>>>>>>>>>>>>>>> some users from KSQL world also asked about why Flink > > > >>>> didn't > > > >>>>>>>>>>> leverage > > > >>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>> Key concept of every kafka topic > > > >>>>>>>>>>>>>>>>> and make kafka as a dynamic changing keyed table. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> To work with kafka better, we were thinking to extend > > > >> the > > > >>>>>>>>>>>> functionality > > > >>>>>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>> the current kafka connector by letting it > > > >>>>>>>>>>>>>>>>> accept updates and deletions. But due to the > limitation > > > >>>> of > > > >>>>>>>>> kafka, > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>> update has to be "update by key", aka a table > > > >>>>>>>>>>>>>>>>> with primary key. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> This introduces a couple of conflicts with current > > > >> kafka > > > >>>>>>>>> table's > > > >>>>>>>>>>>>>> options: > > > >>>>>>>>>>>>>>>>> 1. key.fields: as said above, we need the kafka table > > > >> to > > > >>>>> have > > > >>>>>>>>> the > > > >>>>>>>>>>>>>> primary > > > >>>>>>>>>>>>>>>>> key constraint. And users can also configure > > > >>>>>>>>>>>>>>>>> key.fields freely, this might cause friction. (Sure > we > > > >>>> can > > > >>>>> do > > > >>>>>>>>>> some > > > >>>>>>>>>>>>>> sanity > > > >>>>>>>>>>>>>>>>> check on this but it also creates friction.) > > > >>>>>>>>>>>>>>>>> 2. sink.partitioner: to make the semantics right, we > > > >>>> need to > > > >>>>>>>>> make > > > >>>>>>>>>>>> sure > > > >>>>>>>>>>>>>>>> all > > > >>>>>>>>>>>>>>>>> the updates on the same key are written to > > > >>>>>>>>>>>>>>>>> the same kafka partition, such we should force to > use a > > > >>>> hash > > > >>>>>> by > > > >>>>>>>>>> key > > > >>>>>>>>>>>>>>>>> partition inside such table. Again, this has > conflicts > > > >>>>>>>>>>>>>>>>> and creates friction with current user options. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> The above things are solvable, though not perfect or > > > >> most > > > >>>>> user > > > >>>>>>>>>>>>>> friendly. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Let's take a look at the reading side. The keyed > kafka > > > >>>> table > > > >>>>>>>>>>> contains > > > >>>>>>>>>>>>>> two > > > >>>>>>>>>>>>>>>>> kinds of messages: upsert or deletion. What upsert > > > >>>>>>>>>>>>>>>>> means is "If the key doesn't exist yet, it's an > insert > > > >>>>> record. > > > >>>>>>>>>>>>>> Otherwise > > > >>>>>>>>>>>>>>>>> it's an update record". For the sake of correctness > or > > > >>>>>>>>>>>>>>>>> simplicity, the Flink SQL engine also needs such > > > >>>>> information. > > > >>>>>>>>> If > > > >>>>>>>>>> we > > > >>>>>>>>>>>>>>>>> interpret all messages to "update record", some > queries > > > >>>> or > > > >>>>>>>>>>>>>>>>> operators may not work properly. It's weird to see an > > > >>>> update > > > >>>>>>>>>> record > > > >>>>>>>>>>>> but > > > >>>>>>>>>>>>>>>> you > > > >>>>>>>>>>>>>>>>> haven't seen the insert record before. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> So what Flink should do is after reading out the > > > >> records > > > >>>>> from > > > >>>>>>>>>> such > > > >>>>>>>>>>>>>> table, > > > >>>>>>>>>>>>>>>>> it needs to create a state to record which messages > > > >> have > > > >>>>>>>>>>>>>>>>> been seen and then generate the correct row type > > > >>>>>>>>> correspondingly. > > > >>>>>>>>>>>> This > > > >>>>>>>>>>>>>>>> kind > > > >>>>>>>>>>>>>>>>> of couples the state and the data of the message > > > >>>>>>>>>>>>>>>>> queue, and it also creates conflicts with current > kafka > > > >>>>>>>>>> connector. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Think about if users suspend a running job (which > > > >>>> contains > > > >>>>>> some > > > >>>>>>>>>>>> reading > > > >>>>>>>>>>>>>>>>> state now), and then change the start offset of the > > > >>>> reader. > > > >>>>>>>>>>>>>>>>> By changing the reading offset, it actually change > the > > > >>>> whole > > > >>>>>>>>>> story > > > >>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>> "which records should be insert messages and which > > > >>>> records > > > >>>>>>>>>>>>>>>>> should be update messages). And it will also make > Flink > > > >>>> to > > > >>>>>> deal > > > >>>>>>>>>>> with > > > >>>>>>>>>>>>>>>>> another weird situation that it might receive a > > > >> deletion > > > >>>>>>>>>>>>>>>>> on a non existing message. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> We were unsatisfied with all the frictions and > > > >> conflicts > > > >>>> it > > > >>>>>>>>> will > > > >>>>>>>>>>>> create > > > >>>>>>>>>>>>>>>> if > > > >>>>>>>>>>>>>>>>> we enable the "upsert & deletion" support to the > > > >> current > > > >>>>> kafka > > > >>>>>>>>>>>>>>>>> connector. And later we begin to realize that we > > > >>>> shouldn't > > > >>>>>>>>> treat > > > >>>>>>>>>> it > > > >>>>>>>>>>>> as > > > >>>>>>>>>>>>>> a > > > >>>>>>>>>>>>>>>>> normal message queue, but should treat it as a > changing > > > >>>>> keyed > > > >>>>>>>>>>>>>>>>> table. We should be able to always get the whole data > > > >> of > > > >>>>> such > > > >>>>>>>>>> table > > > >>>>>>>>>>>> (by > > > >>>>>>>>>>>>>>>>> disabling the start offset option) and we can also > read > > > >>>> the > > > >>>>>>>>>>>>>>>>> changelog out of such table. It's like a HBase table > > > >> with > > > >>>>>>>>> binlog > > > >>>>>>>>>>>>>> support > > > >>>>>>>>>>>>>>>>> but doesn't have random access capability (which can > be > > > >>>>>>>>> fulfilled > > > >>>>>>>>>>>>>>>>> by Flink's state). > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> So our intention was instead of telling and > persuading > > > >>>> users > > > >>>>>>>>> what > > > >>>>>>>>>>>> kind > > > >>>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>> options they should or should not use by extending > > > >>>>>>>>>>>>>>>>> current kafka connector when enable upsert support, > we > > > >>>> are > > > >>>>>>>>>> actually > > > >>>>>>>>>>>>>>>> create > > > >>>>>>>>>>>>>>>>> a whole new and different connector that has total > > > >>>>>>>>>>>>>>>>> different abstractions in SQL layer, and should be > > > >>>> treated > > > >>>>>>>>>> totally > > > >>>>>>>>>>>>>>>>> different with current kafka connector. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Hope this can clarify some of the concerns. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>>> Kurt > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang < > > > >>>>>>>>> fskm...@gmail.com > > > >>>>>>>>>>> > > > >>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Hi devs, > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> As many people are still confused about the > difference > > > >>>>> option > > > >>>>>>>>>>>>>>>> behaviours > > > >>>>>>>>>>>>>>>>>> between the Kafka connector and KTable connector, > Jark > > > >>>> and > > > >>>>> I > > > >>>>>>>>>> list > > > >>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>> differences in the doc[1]. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>>>> Shengkai > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> [1] > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二 > > > >>>>> 下午12:05写道: > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Hi Konstantin, > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Thanks for your reply. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> It uses the "kafka" connector and does not > specify a > > > >>>>>> primary > > > >>>>>>>>>>> key. > > > >>>>>>>>>>>>>>>>>>> The dimensional table `users` is a ktable connector > > > >>>> and we > > > >>>>>>>>> can > > > >>>>>>>>>>>>>>>> specify > > > >>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>> pk on the KTable. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Will it possible to use a "ktable" as a > dimensional > > > >>>> table > > > >>>>>> in > > > >>>>>>>>>>>>>>>> FLIP-132 > > > >>>>>>>>>>>>>>>>>>> Yes. We can specify the watermark on the KTable and > > > >> it > > > >>>> can > > > >>>>>> be > > > >>>>>>>>>>> used > > > >>>>>>>>>>>>>>>> as a > > > >>>>>>>>>>>>>>>>>>> dimension table in temporal join. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Introduce a new connector vs introduce a new > > > >> property > > > >>>>>>>>>>>>>>>>>>> The main reason behind is that the KTable connector > > > >>>> almost > > > >>>>>>>>> has > > > >>>>>>>>>> no > > > >>>>>>>>>>>>>>>>> common > > > >>>>>>>>>>>>>>>>>>> options with the Kafka connector. The options that > > > >> can > > > >>>> be > > > >>>>>>>>>> reused > > > >>>>>>>>>>> by > > > >>>>>>>>>>>>>>>>>> KTable > > > >>>>>>>>>>>>>>>>>>> connectors are 'topic', > > > >> 'properties.bootstrap.servers' > > > >>>> and > > > >>>>>>>>>>>>>>>>>>> 'value.fields-include' . We can't set cdc format > for > > > >>>>>>>>>> 'key.format' > > > >>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>>>> 'value.format' in KTable connector now, which is > > > >>>>> available > > > >>>>>>>>> in > > > >>>>>>>>>>>> Kafka > > > >>>>>>>>>>>>>>>>>>> connector. Considering the difference between the > > > >>>> options > > > >>>>> we > > > >>>>>>>>>> can > > > >>>>>>>>>>>> use, > > > >>>>>>>>>>>>>>>>>> it's > > > >>>>>>>>>>>>>>>>>>> more suitable to introduce an another connector > > > >> rather > > > >>>>> than > > > >>>>>> a > > > >>>>>>>>>>>>>>>> property. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> We are also fine to use "compacted-kafka" as the > name > > > >>>> of > > > >>>>> the > > > >>>>>>>>>> new > > > >>>>>>>>>>>>>>>>>>> connector. What do you think? > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>>>>> Shengkai > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Konstantin Knauf <kna...@apache.org> > 于2020年10月19日周一 > > > >>>>>>>>> 下午10:15写道: > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Hi Shengkai, > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Thank you for driving this effort. I believe this > a > > > >>>> very > > > >>>>>>>>>>> important > > > >>>>>>>>>>>>>>>>>> feature > > > >>>>>>>>>>>>>>>>>>>> for many users who use Kafka and Flink SQL > > > >> together. A > > > >>>>> few > > > >>>>>>>>>>>> questions > > > >>>>>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>>>>> thoughts: > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> * Is your example "Use KTable as a > > > >> reference/dimension > > > >>>>>>>>> table" > > > >>>>>>>>>>>>>>>> correct? > > > >>>>>>>>>>>>>>>>>> It > > > >>>>>>>>>>>>>>>>>>>> uses the "kafka" connector and does not specify a > > > >>>> primary > > > >>>>>>>>> key. > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> * Will it be possible to use a "ktable" table > > > >> directly > > > >>>>> as a > > > >>>>>>>>>>>>>>>>> dimensional > > > >>>>>>>>>>>>>>>>>>>> table in temporal join (*based on event time*) > > > >>>>> (FLIP-132)? > > > >>>>>>>>>> This > > > >>>>>>>>>>> is > > > >>>>>>>>>>>>>>>> not > > > >>>>>>>>>>>>>>>>>>>> completely clear to me from the FLIP. > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> * I'd personally prefer not to introduce a new > > > >>>> connector > > > >>>>>> and > > > >>>>>>>>>>>> instead > > > >>>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>> extend the Kafka connector. We could add an > > > >> additional > > > >>>>>>>>>> property > > > >>>>>>>>>>>>>>>>>>>> "compacted" > > > >>>>>>>>>>>>>>>>>>>> = "true"|"false". If it is set to "true", we can > add > > > >>>>>>>>>> additional > > > >>>>>>>>>>>>>>>>>> validation > > > >>>>>>>>>>>>>>>>>>>> logic (e.g. "scan.startup.mode" can not be set, > > > >>>> primary > > > >>>>> key > > > >>>>>>>>>>>>>>>> required, > > > >>>>>>>>>>>>>>>>>>>> etc.). If we stick to a separate connector I'd not > > > >>>> call > > > >>>>> it > > > >>>>>>>>>>>> "ktable", > > > >>>>>>>>>>>>>>>>> but > > > >>>>>>>>>>>>>>>>>>>> rather "compacted-kafka" or similar. KTable seems > to > > > >>>>> carry > > > >>>>>>>>>> more > > > >>>>>>>>>>>>>>>>> implicit > > > >>>>>>>>>>>>>>>>>>>> meaning than we want to imply here. > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> * I agree that this is not a bounded source. If we > > > >>>> want > > > >>>>> to > > > >>>>>>>>>>>> support a > > > >>>>>>>>>>>>>>>>>>>> bounded mode, this is an orthogonal concern that > > > >> also > > > >>>>>>>>> applies > > > >>>>>>>>>> to > > > >>>>>>>>>>>>>>>> other > > > >>>>>>>>>>>>>>>>>>>> unbounded sources. > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Konstantin > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu < > > > >>>>> imj...@gmail.com> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> Hi Danny, > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> First of all, we didn't introduce any concepts > from > > > >>>> KSQL > > > >>>>>>>>>> (e.g. > > > >>>>>>>>>>>>>>>>> Stream > > > >>>>>>>>>>>>>>>>>> vs > > > >>>>>>>>>>>>>>>>>>>>> Table notion). > > > >>>>>>>>>>>>>>>>>>>>> This new connector will produce a changelog > stream, > > > >>>> so > > > >>>>>> it's > > > >>>>>>>>>>> still > > > >>>>>>>>>>>>>>>> a > > > >>>>>>>>>>>>>>>>>>>> dynamic > > > >>>>>>>>>>>>>>>>>>>>> table and doesn't conflict with Flink core > > > >> concepts. > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> The "ktable" is just a connector name, we can > also > > > >>>> call > > > >>>>> it > > > >>>>>>>>>>>>>>>>>>>>> "compacted-kafka" or something else. > > > >>>>>>>>>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users > can > > > >>>>> migrate > > > >>>>>>>>> to > > > >>>>>>>>>>>>>>>> Flink > > > >>>>>>>>>>>>>>>>>> SQL > > > >>>>>>>>>>>>>>>>>>>>> easily. > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> Regarding to why introducing a new connector vs a > > > >> new > > > >>>>>>>>>> property > > > >>>>>>>>>>> in > > > >>>>>>>>>>>>>>>>>>>> existing > > > >>>>>>>>>>>>>>>>>>>>> kafka connector: > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> I think the main reason is that we want to have a > > > >>>> clear > > > >>>>>>>>>>>> separation > > > >>>>>>>>>>>>>>>>> for > > > >>>>>>>>>>>>>>>>>>>> such > > > >>>>>>>>>>>>>>>>>>>>> two use cases, because they are very different. > > > >>>>>>>>>>>>>>>>>>>>> We also listed reasons in the FLIP, including: > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> 1) It's hard to explain what's the behavior when > > > >>>> users > > > >>>>>>>>>> specify > > > >>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>> start > > > >>>>>>>>>>>>>>>>>>>>> offset from a middle position (e.g. how to > process > > > >>>> non > > > >>>>>>>>> exist > > > >>>>>>>>>>>>>>>> delete > > > >>>>>>>>>>>>>>>>>>>>> events). > > > >>>>>>>>>>>>>>>>>>>>> It's dangerous if users do that. So we > > don't > > > >>>>>> provide > > > >>>>>>>>>> the > > > >>>>>>>>>>>>>>>> offset > > > >>>>>>>>>>>>>>>>>>>> option > > > >>>>>>>>>>>>>>>>>>>>> in the new connector at the moment. > > > >>>>>>>>>>>>>>>>>>>>> 2) It's a different perspective/abstraction on > the > > > >>>> same > > > >>>>>>>>> kafka > > > >>>>>>>>>>>>>>>> topic > > > >>>>>>>>>>>>>>>>>>>> (append > > > >>>>>>>>>>>>>>>>>>>>> vs. upsert). It would be easier to understand if > we > > > >>>> can > > > >>>>>>>>>>> separate > > > >>>>>>>>>>>>>>>>> them > > > >>>>>>>>>>>>>>>>>>>>> instead of mixing them in one connector. > > The > > > >>>> new > > > >>>>>>>>>>> connector > > > >>>>>>>>>>>>>>>>>> requires > > > >>>>>>>>>>>>>>>>>>>>> hash sink partitioner, primary key declared, > > > >> regular > > > >>>>>>>>> format. > > > >>>>>>>>>>>>>>>>>>>>> If we mix them in one connector, it might > > be > > > >>>>>>>>> confusing > > > >>>>>>>>>>> how > > > >>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>> use > > > >>>>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>> options correctly. > > > >>>>>>>>>>>>>>>>>>>>> 3) The semantic of the KTable connector is just > the > > > >>>> same > > > >>>>>> as > > > >>>>>>>>>>>> KTable > > > >>>>>>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>>>>>> Kafka > > > >>>>>>>>>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and > > > >> KSQL > > > >>>>>> users. > > > >>>>>>>>>>>>>>>>>>>>> We have seen several questions in the > > > >> mailing > > > >>>>> list > > > >>>>>>>>>> asking > > > >>>>>>>>>>>> how > > > >>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>> model > > > >>>>>>>>>>>>>>>>>>>>> a KTable and how to join a KTable in Flink SQL. > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>>>>>>> Jark > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 19:53, Jark Wu < > > > >>>> imj...@gmail.com > > > >>>>>> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> Hi Jingsong, > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> As the FLIP describes, "KTable connector > produces > > > >> a > > > >>>>>>>>>> changelog > > > >>>>>>>>>>>>>>>>>> stream, > > > >>>>>>>>>>>>>>>>>>>>>> where each data record represents an update or > > > >>>> delete > > > >>>>>>>>>> event.". > > > >>>>>>>>>>>>>>>>>>>>>> Therefore, a ktable source is an unbounded > stream > > > >>>>> source. > > > >>>>>>>>>>>>>>>>> Selecting > > > >>>>>>>>>>>>>>>>>> a > > > >>>>>>>>>>>>>>>>>>>>>> ktable source is similar to selecting a kafka > > > >> source > > > >>>>> with > > > >>>>>>>>>>>>>>>>>>>> debezium-json > > > >>>>>>>>>>>>>>>>>>>>>> format > > > >>>>>>>>>>>>>>>>>>>>>> that it never ends and the results are > > > >> continuously > > > >>>>>>>>> updated. > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> It's possible to have a bounded ktable source in > > > >> the > > > >>>>>>>>> future, > > > >>>>>>>>>>> for > > > >>>>>>>>>>>>>>>>>>>> example, > > > >>>>>>>>>>>>>>>>>>>>>> add an option 'bounded=true' or > 'end-offset=xxx'. > > > >>>>>>>>>>>>>>>>>>>>>> In this way, the ktable will produce a bounded > > > >>>>> changelog > > > >>>>>>>>>>> stream. > > > >>>>>>>>>>>>>>>>>>>>>> So I think this can be a compatible feature in > the > > > >>>>>> future. > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> I don't think we should associate with ksql > > > >> related > > > >>>>>>>>>> concepts. > > > >>>>>>>>>>>>>>>>>>>> Actually, > > > >>>>>>>>>>>>>>>>>>>>> we > > > >>>>>>>>>>>>>>>>>>>>>> didn't introduce any concepts from KSQL (e.g. > > > >>>> Stream vs > > > >>>>>>>>>> Table > > > >>>>>>>>>>>>>>>>>> notion). > > > >>>>>>>>>>>>>>>>>>>>>> The "ktable" is just a connector name, we can > also > > > >>>> call > > > >>>>>> it > > > >>>>>>>>>>>>>>>>>>>>>> "compacted-kafka" or something else. > > > >>>>>>>>>>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users > can > > > >>>>>> migrate > > > >>>>>>>>>> to > > > >>>>>>>>>>>>>>>>> Flink > > > >>>>>>>>>>>>>>>>>>>> SQL > > > >>>>>>>>>>>>>>>>>>>>>> easily. > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> Regarding the "value.fields-include", this is an > > > >>>> option > > > >>>>>>>>>>>>>>>> introduced > > > >>>>>>>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>>>>>>>> FLIP-107 for Kafka connector. > > > >>>>>>>>>>>>>>>>>>>>>> I think we should keep the same behavior with > the > > > >>>> Kafka > > > >>>>>>>>>>>>>>>> connector. > > > >>>>>>>>>>>>>>>>>> I'm > > > >>>>>>>>>>>>>>>>>>>>> not > > > >>>>>>>>>>>>>>>>>>>>>> sure what's the default behavior of KSQL. > > > >>>>>>>>>>>>>>>>>>>>>> But I guess it also stores the keys in value > from > > > >>>> this > > > >>>>>>>>>> example > > > >>>>>>>>>>>>>>>>> docs > > > >>>>>>>>>>>>>>>>>>>> (see > > > >>>>>>>>>>>>>>>>>>>>>> the "users_original" table) [1]. > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>>>>>>>> Jark > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> [1]: > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 18:17, Danny Chan < > > > >>>>>>>>>>> yuzhao....@gmail.com> > > > >>>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> The concept seems conflicts with the Flink > > > >>>> abstraction > > > >>>>>>>>>>> “dynamic > > > >>>>>>>>>>>>>>>>>>>> table”, > > > >>>>>>>>>>>>>>>>>>>>>>> in Flink we see both “stream” and “table” as a > > > >>>> dynamic > > > >>>>>>>>>> table, > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> I think we should make clear first how to > express > > > >>>>> stream > > > >>>>>>>>>> and > > > >>>>>>>>>>>>>>>>> table > > > >>>>>>>>>>>>>>>>>>>>>>> specific features on one “dynamic table”, > > > >>>>>>>>>>>>>>>>>>>>>>> it is more natural for KSQL because KSQL takes > > > >>>> stream > > > >>>>>> and > > > >>>>>>>>>>> table > > > >>>>>>>>>>>>>>>>> as > > > >>>>>>>>>>>>>>>>>>>>>>> different abstractions for representing > > > >>>> collections. > > > >>>>> In > > > >>>>>>>>>> KSQL, > > > >>>>>>>>>>>>>>>>> only > > > >>>>>>>>>>>>>>>>>>>>> table is > > > >>>>>>>>>>>>>>>>>>>>>>> mutable and can have a primary key. > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> Does this connector belongs to the “table” > scope > > > >> or > > > >>>>>>>>>> “stream” > > > >>>>>>>>>>>>>>>>> scope > > > >>>>>>>>>>>>>>>>>> ? > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> Some of the concepts (such as the primary key > on > > > >>>>> stream) > > > >>>>>>>>>>> should > > > >>>>>>>>>>>>>>>>> be > > > >>>>>>>>>>>>>>>>>>>>>>> suitable for all the connectors, not just > Kafka, > > > >>>>>>>>> Shouldn’t > > > >>>>>>>>>>> this > > > >>>>>>>>>>>>>>>>> be > > > >>>>>>>>>>>>>>>>>> an > > > >>>>>>>>>>>>>>>>>>>>>>> extension of existing Kafka connector instead > of > > > >> a > > > >>>>>>>>> totally > > > >>>>>>>>>>> new > > > >>>>>>>>>>>>>>>>>>>>> connector ? > > > >>>>>>>>>>>>>>>>>>>>>>> What about the other connectors ? > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> Because this touches the core abstraction of > > > >>>> Flink, we > > > >>>>>>>>>> better > > > >>>>>>>>>>>>>>>>> have > > > >>>>>>>>>>>>>>>>>> a > > > >>>>>>>>>>>>>>>>>>>>>>> top-down overall design, following the KSQL > > > >>>> directly > > > >>>>> is > > > >>>>>>>>> not > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>> answer. > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> P.S. For the source > > > >>>>>>>>>>>>>>>>>>>>>>>> Shouldn’t this be an extension of existing > Kafka > > > >>>>>>>>> connector > > > >>>>>>>>>>>>>>>>>> instead > > > >>>>>>>>>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>>>>>> a > > > >>>>>>>>>>>>>>>>>>>>>>> totally new connector ? > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> How could we achieve that (e.g. set up the > > > >>>> parallelism > > > >>>>>>>>>>>>>>>>> correctly) ? > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>>>>>>>>> Danny Chan > > > >>>>>>>>>>>>>>>>>>>>>>> 在 2020年10月19日 +0800 PM5:17,Jingsong Li < > > > >>>>>>>>>>> jingsongl...@gmail.com > > > >>>>>>>>>>>>>>>>>>> ,写道: > > > >>>>>>>>>>>>>>>>>>>>>>>> Thanks Shengkai for your proposal. > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> +1 for this feature. > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>> Future Work: Support bounded KTable source > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> I don't think it should be a future work, I > > > >> think > > > >>>> it > > > >>>>> is > > > >>>>>>>>>> one > > > >>>>>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>>>>> important concepts of this FLIP. We need to > > > >>>>> understand > > > >>>>>>>>> it > > > >>>>>>>>>>>>>>>> now. > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> Intuitively, a ktable in my opinion is a > bounded > > > >>>>> table > > > >>>>>>>>>>> rather > > > >>>>>>>>>>>>>>>>>> than > > > >>>>>>>>>>>>>>>>>>>> a > > > >>>>>>>>>>>>>>>>>>>>>>>> stream, so select should produce a bounded > table > > > >>>> by > > > >>>>>>>>>> default. > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> I think we can list Kafka related knowledge, > > > >>>> because > > > >>>>>> the > > > >>>>>>>>>>> word > > > >>>>>>>>>>>>>>>>>>>> `ktable` > > > >>>>>>>>>>>>>>>>>>>>>>> is > > > >>>>>>>>>>>>>>>>>>>>>>>> easy to associate with ksql related concepts. > > > >> (If > > > >>>>>>>>>> possible, > > > >>>>>>>>>>>>>>>>> it's > > > >>>>>>>>>>>>>>>>>>>>> better > > > >>>>>>>>>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>>>>>> unify with it) > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> What do you think? > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>> value.fields-include > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> What about the default behavior of KSQL? > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>>>>>>>>>> Jingsong > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang > < > > > >>>>>>>>>>>>>>>>> fskm...@gmail.com > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>> Hi, devs. > > > >>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>> Jark and I want to start a new FLIP to > > > >> introduce > > > >>>> the > > > >>>>>>>>>> KTable > > > >>>>>>>>>>>>>>>>>>>>>>> connector. The > > > >>>>>>>>>>>>>>>>>>>>>>>>> KTable is a shortcut of "Kafka Table", it > also > > > >>>> has > > > >>>>> the > > > >>>>>>>>>> same > > > >>>>>>>>>>>>>>>>>>>>> semantics > > > >>>>>>>>>>>>>>>>>>>>>>> with > > > >>>>>>>>>>>>>>>>>>>>>>>>> the KTable notion in Kafka Stream. > > > >>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>> FLIP-149: > > > >>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector > > > >>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>> Currently many users have expressed their > needs > > > >>>> for > > > >>>>>> the > > > >>>>>>>>>>>>>>>>> upsert > > > >>>>>>>>>>>>>>>>>>>> Kafka > > > >>>>>>>>>>>>>>>>>>>>>>> by > > > >>>>>>>>>>>>>>>>>>>>>>>>> mail lists and issues. The KTable connector > has > > > >>>>>> several > > > >>>>>>>>>>>>>>>>>> benefits > > > >>>>>>>>>>>>>>>>>>>> for > > > >>>>>>>>>>>>>>>>>>>>>>> users: > > > >>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>> 1. Users are able to interpret a compacted > > > >> Kafka > > > >>>>> Topic > > > >>>>>>>>> as > > > >>>>>>>>>>>>>>>> an > > > >>>>>>>>>>>>>>>>>>>> upsert > > > >>>>>>>>>>>>>>>>>>>>>>> stream > > > >>>>>>>>>>>>>>>>>>>>>>>>> in Apache Flink. And also be able to write a > > > >>>>> changelog > > > >>>>>>>>>>>>>>>> stream > > > >>>>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>>> Kafka > > > >>>>>>>>>>>>>>>>>>>>>>>>> (into a compacted topic). > > > >>>>>>>>>>>>>>>>>>>>>>>>> 2. As a part of the real time pipeline, store > > > >>>> join > > > >>>>> or > > > >>>>>>>>>>>>>>>>> aggregate > > > >>>>>>>>>>>>>>>>>>>>>>> result (may > > > >>>>>>>>>>>>>>>>>>>>>>>>> contain updates) into a Kafka topic for > further > > > >>>>>>>>>>>>>>>> calculation; > > > >>>>>>>>>>>>>>>>>>>>>>>>> 3. The semantic of the KTable connector is > just > > > >>>> the > > > >>>>>>>>> same > > > >>>>>>>>>> as > > > >>>>>>>>>>>>>>>>>>>> KTable > > > >>>>>>>>>>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>>>>>>>>> Kafka > > > >>>>>>>>>>>>>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream > and > > > >>>> KSQL > > > >>>>>>>>>> users. > > > >>>>>>>>>>>>>>>>> We > > > >>>>>>>>>>>>>>>>>>>> have > > > >>>>>>>>>>>>>>>>>>>>>>> seen > > > >>>>>>>>>>>>>>>>>>>>>>>>> several questions in the mailing list asking > > > >> how > > > >>>> to > > > >>>>>>>>>> model a > > > >>>>>>>>>>>>>>>>>>>> KTable > > > >>>>>>>>>>>>>>>>>>>>>>> and how > > > >>>>>>>>>>>>>>>>>>>>>>>>> to join a KTable in Flink SQL. > > > >>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>> We hope it can expand the usage of the Flink > > > >> with > > > >>>>>>>>> Kafka. > > > >>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>> I'm looking forward to your feedback. > > > >>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>>>>>>>>>>> Shengkai > > > >>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> -- > > > >>>>>>>>>>>>>>>>>>>>>>>> Best, Jingsong Lee > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> -- > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Konstantin Knauf > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> https://twitter.com/snntrable > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> https://github.com/knaufk > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> -- > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Konstantin Knauf > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> https://twitter.com/snntrable > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> https://github.com/knaufk > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> -- > > > >>>>>>>>>>> > > > >>>>>>>>>>> Seth Wiesman | Solutions Architect > > > >>>>>>>>>>> > > > >>>>>>>>>>> +1 314 387 1463 > > > >>>>>>>>>>> > > > >>>>>>>>>>> <https://www.ververica.com/> > > > >>>>>>>>>>> > > > >>>>>>>>>>> Follow us @VervericaData > > > >>>>>>>>>>> > > > >>>>>>>>>>> -- > > > >>>>>>>>>>> > > > >>>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The > Apache > > > >>>>> Flink > > > >>>>>>>>>>> Conference > > > >>>>>>>>>>> > > > >>>>>>>>>>> Stream Processing | Event Driven | Real Time > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>>> -- > > > >>>>>> Best, Jingsong Lee > > > >>>>>> > > > >>>>> > > > >>>> > > > >>>> > > > >>>> -- > > > >>>> Best, Jingsong Lee > > > >>>> > > > >>> > > > >> > > > > > > > > > > > > > > -- > > Best, Jingsong Lee > > > -- Best, Jingsong Lee