Add one more message, I have already updated the FLIP[1]. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector
Shengkai Fang <fskm...@gmail.com> 于2020年10月23日周五 下午2:55写道: > Hi, all. > It seems we have reached a consensus on the FLIP. If no one has other > objections, I would like to start the vote for FLIP-149. > > Best, > Shengkai > > Jingsong Li <jingsongl...@gmail.com> 于2020年10月23日周五 下午2:25写道: > >> Thanks for explanation, >> >> I am OK for `upsert`. Yes, Its concept has been accepted by many systems. >> >> Best, >> Jingsong >> >> On Fri, Oct 23, 2020 at 12:38 PM Jark Wu <imj...@gmail.com> wrote: >> >> > Hi Timo, >> > >> > I have some concerns about `kafka-cdc`, >> > 1) cdc is an abbreviation of Change Data Capture which is commonly used >> for >> > databases, not for message queues. >> > 2) usually, cdc produces full content of changelog, including >> > UPDATE_BEFORE, however "upsert kafka" doesn't >> > 3) `kafka-cdc` sounds like a natively support for `debezium-json` >> format, >> > however, it is not and even we don't want >> > "upsert kafka" to support "debezium-json" >> > >> > >> > Hi Jingsong, >> > >> > I think the terminology of "upsert" is fine, because Kafka also uses >> > "upsert" to define such behavior in their official documentation [1]: >> > >> > > a data record in a changelog stream is interpreted as an UPSERT aka >> > INSERT/UPDATE >> > >> > Materialize uses the "UPSERT" keyword to define such behavior too [2]. >> > Users have been requesting such feature using "upsert kafka" >> terminology in >> > user mailing lists [3][4]. >> > Many other systems support "UPSERT" statement natively, such as impala >> [5], >> > SAP [6], Phoenix [7], Oracle NoSQL [8], etc.. >> > >> > Therefore, I think we don't need to be afraid of introducing "upsert" >> > terminology, it is widely accepted by users. >> > >> > Best, >> > Jark >> > >> > >> > [1]: >> > >> > >> https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable >> > [2]: >> > >> > >> https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic >> > [3]: >> > >> > >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-materialized-upsert-tables-td18482.html#a18503 >> > [4]: >> > >> > >> http://apache-flink.147419.n8.nabble.com/Kafka-Sink-AppendStreamTableSink-doesn-t-support-consuming-update-changes-td5959.html >> > [5]: >> https://impala.apache.org/docs/build/html/topics/impala_upsert.html >> > [6]: >> > >> > >> https://help.sap.com/viewer/7c78579ce9b14a669c1f3295b0d8ca16/Cloud/en-US/ea8b6773be584203bcd99da76844c5ed.html >> > [7]: https://phoenix.apache.org/atomic_upsert.html >> > [8]: >> > >> > >> https://docs.oracle.com/en/database/other-databases/nosql-database/18.3/sqlfornosql/adding-table-rows-using-insert-and-upsert-statements.html >> > >> > On Fri, 23 Oct 2020 at 10:36, Jingsong Li <jingsongl...@gmail.com> >> wrote: >> > >> > > The `kafka-cdc` looks good to me. >> > > We can even give options to indicate whether to turn on compact, >> because >> > > compact is just an optimization? >> > > >> > > - ktable let me think about KSQL. >> > > - kafka-compacted it is not just compacted, more than that, it still >> has >> > > the ability of CDC >> > > - upsert-kafka , upsert is back, and I don't really want to see it >> again >> > > since we have CDC >> > > >> > > Best, >> > > Jingsong >> > > >> > > On Fri, Oct 23, 2020 at 2:21 AM Timo Walther <twal...@apache.org> >> wrote: >> > > >> > > > Hi Jark, >> > > > >> > > > I would be fine with `connector=upsert-kafka`. Another idea would >> be to >> > > > align the name to other available Flink connectors [1]: >> > > > >> > > > `connector=kafka-cdc`. >> > > > >> > > > Regards, >> > > > Timo >> > > > >> > > > [1] https://github.com/ververica/flink-cdc-connectors >> > > > >> > > > On 22.10.20 17:17, Jark Wu wrote: >> > > > > Another name is "connector=upsert-kafka', I think this can solve >> > Timo's >> > > > > concern on the "compacted" word. >> > > > > >> > > > > Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify >> such >> > > > kafka >> > > > > sources. >> > > > > I think "upsert" is a well-known terminology widely used in many >> > > systems >> > > > > and matches the >> > > > > behavior of how we handle the kafka messages. >> > > > > >> > > > > What do you think? >> > > > > >> > > > > Best, >> > > > > Jark >> > > > > >> > > > > [1]: >> > > > > >> > > > >> > > >> > >> https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > On Thu, 22 Oct 2020 at 22:53, Kurt Young <ykt...@gmail.com> >> wrote: >> > > > > >> > > > >> Good validation messages can't solve the broken user experience, >> > > > especially >> > > > >> that >> > > > >> such update mode option will implicitly make half of current >> kafka >> > > > options >> > > > >> invalid or doesn't >> > > > >> make sense. >> > > > >> >> > > > >> Best, >> > > > >> Kurt >> > > > >> >> > > > >> >> > > > >> On Thu, Oct 22, 2020 at 10:31 PM Jark Wu <imj...@gmail.com> >> wrote: >> > > > >> >> > > > >>> Hi Timo, Seth, >> > > > >>> >> > > > >>> The default value "inserting" of "mode" might be not suitable, >> > > > >>> because "debezium-json" emits changelog messages which include >> > > updates. >> > > > >>> >> > > > >>> On Thu, 22 Oct 2020 at 22:10, Seth Wiesman <s...@ververica.com> >> > > wrote: >> > > > >>> >> > > > >>>> +1 for supporting upsert results into Kafka. >> > > > >>>> >> > > > >>>> I have no comments on the implementation details. >> > > > >>>> >> > > > >>>> As far as configuration goes, I tend to favor Timo's option >> where >> > we >> > > > >> add >> > > > >>> a >> > > > >>>> "mode" property to the existing Kafka table with default value >> > > > >>> "inserting". >> > > > >>>> If the mode is set to "updating" then the validation changes to >> > the >> > > > new >> > > > >>>> requirements. I personally find it more intuitive than a >> seperate >> > > > >>>> connector, my fear is users won't understand its the same >> physical >> > > > >> kafka >> > > > >>>> sink under the hood and it will lead to other confusion like >> does >> > it >> > > > >>> offer >> > > > >>>> the same persistence guarantees? I think we are capable of >> adding >> > > good >> > > > >>>> valdiation messaging that solves Jark and Kurts concerns. >> > > > >>>> >> > > > >>>> >> > > > >>>> On Thu, Oct 22, 2020 at 8:51 AM Timo Walther < >> twal...@apache.org> >> > > > >> wrote: >> > > > >>>> >> > > > >>>>> Hi Jark, >> > > > >>>>> >> > > > >>>>> "calling it "kafka-compacted" can even remind users to enable >> log >> > > > >>>>> compaction" >> > > > >>>>> >> > > > >>>>> But sometimes users like to store a lineage of changes in >> their >> > > > >> topics. >> > > > >>>>> Indepent of any ktable/kstream interpretation. >> > > > >>>>> >> > > > >>>>> I let the majority decide on this topic to not further block >> this >> > > > >>>>> effort. But we might find a better name like: >> > > > >>>>> >> > > > >>>>> connector = kafka >> > > > >>>>> mode = updating/inserting >> > > > >>>>> >> > > > >>>>> OR >> > > > >>>>> >> > > > >>>>> connector = kafka-updating >> > > > >>>>> >> > > > >>>>> ... >> > > > >>>>> >> > > > >>>>> Regards, >> > > > >>>>> Timo >> > > > >>>>> >> > > > >>>>> >> > > > >>>>> >> > > > >>>>> >> > > > >>>>> On 22.10.20 15:24, Jark Wu wrote: >> > > > >>>>>> Hi Timo, >> > > > >>>>>> >> > > > >>>>>> Thanks for your opinions. >> > > > >>>>>> >> > > > >>>>>> 1) Implementation >> > > > >>>>>> We will have an stateful operator to generate INSERT and >> > > > >>> UPDATE_BEFORE. >> > > > >>>>>> This operator is keyby-ed (primary key as the shuffle key) >> after >> > > > >> the >> > > > >>>>> source >> > > > >>>>>> operator. >> > > > >>>>>> The implementation of this operator is very similar to the >> > > existing >> > > > >>>>>> `DeduplicateKeepLastRowFunction`. >> > > > >>>>>> The operator will register a value state using the primary >> key >> > > > >> fields >> > > > >>>> as >> > > > >>>>>> keys. >> > > > >>>>>> When the value state is empty under current key, we will emit >> > > > >> INSERT >> > > > >>>> for >> > > > >>>>>> the input row. >> > > > >>>>>> When the value state is not empty under current key, we will >> > emit >> > > > >>>>>> UPDATE_BEFORE using the row in state, >> > > > >>>>>> and emit UPDATE_AFTER using the input row. >> > > > >>>>>> When the input row is DELETE, we will clear state and emit >> > DELETE >> > > > >>> row. >> > > > >>>>>> >> > > > >>>>>> 2) new option vs new connector >> > > > >>>>>>> We recently simplified the table options to a minimum >> amount of >> > > > >>>>>> characters to be as concise as possible in the DDL. >> > > > >>>>>> I think this is the reason why we want to introduce a new >> > > > >> connector, >> > > > >>>>>> because we can simplify the options in DDL. >> > > > >>>>>> For example, if using a new option, the DDL may look like >> this: >> > > > >>>>>> >> > > > >>>>>> CREATE TABLE users ( >> > > > >>>>>> user_id BIGINT, >> > > > >>>>>> user_name STRING, >> > > > >>>>>> user_level STRING, >> > > > >>>>>> region STRING, >> > > > >>>>>> PRIMARY KEY (user_id) NOT ENFORCED >> > > > >>>>>> ) WITH ( >> > > > >>>>>> 'connector' = 'kafka', >> > > > >>>>>> 'model' = 'table', >> > > > >>>>>> 'topic' = 'pageviews_per_region', >> > > > >>>>>> 'properties.bootstrap.servers' = '...', >> > > > >>>>>> 'properties.group.id' = 'testGroup', >> > > > >>>>>> 'scan.startup.mode' = 'earliest', >> > > > >>>>>> 'key.format' = 'csv', >> > > > >>>>>> 'key.fields' = 'user_id', >> > > > >>>>>> 'value.format' = 'avro', >> > > > >>>>>> 'sink.partitioner' = 'hash' >> > > > >>>>>> ); >> > > > >>>>>> >> > > > >>>>>> If using a new connector, we can have a different default >> value >> > > for >> > > > >>> the >> > > > >>>>>> options and remove unnecessary options, >> > > > >>>>>> the DDL can look like this which is much more concise: >> > > > >>>>>> >> > > > >>>>>> CREATE TABLE pageviews_per_region ( >> > > > >>>>>> user_id BIGINT, >> > > > >>>>>> user_name STRING, >> > > > >>>>>> user_level STRING, >> > > > >>>>>> region STRING, >> > > > >>>>>> PRIMARY KEY (user_id) NOT ENFORCED >> > > > >>>>>> ) WITH ( >> > > > >>>>>> 'connector' = 'kafka-compacted', >> > > > >>>>>> 'topic' = 'pageviews_per_region', >> > > > >>>>>> 'properties.bootstrap.servers' = '...', >> > > > >>>>>> 'key.format' = 'csv', >> > > > >>>>>> 'value.format' = 'avro' >> > > > >>>>>> ); >> > > > >>>>>> >> > > > >>>>>>> When people read `connector=kafka-compacted` they might not >> > know >> > > > >>> that >> > > > >>>> it >> > > > >>>>>>> has ktable semantics. You don't need to enable log >> compaction >> > in >> > > > >>> order >> > > > >>>>>>> to use a KTable as far as I know. >> > > > >>>>>> We don't need to let users know it has ktable semantics, as >> > > > >>> Konstantin >> > > > >>>>>> mentioned this may carry more implicit >> > > > >>>>>> meaning than we want to imply here. I agree users don't need >> to >> > > > >>> enable >> > > > >>>>> log >> > > > >>>>>> compaction, but from the production perspective, >> > > > >>>>>> log compaction should always be enabled if it is used in this >> > > > >>> purpose. >> > > > >>>>>> Calling it "kafka-compacted" can even remind users to enable >> log >> > > > >>>>> compaction. >> > > > >>>>>> >> > > > >>>>>> I don't agree to introduce "model = table/stream" option, or >> > > > >>>>>> "connector=kafka-table", >> > > > >>>>>> because this means we are introducing Table vs Stream concept >> > from >> > > > >>>> KSQL. >> > > > >>>>>> However, we don't have such top-level concept in Flink SQL >> now, >> > > > >> this >> > > > >>>> will >> > > > >>>>>> further confuse users. >> > > > >>>>>> In Flink SQL, all the things are STREAM, the differences are >> > > > >> whether >> > > > >>> it >> > > > >>>>> is >> > > > >>>>>> bounded or unbounded, >> > > > >>>>>> whether it is insert-only or changelog. >> > > > >>>>>> >> > > > >>>>>> >> > > > >>>>>> Best, >> > > > >>>>>> Jark >> > > > >>>>>> >> > > > >>>>>> >> > > > >>>>>> On Thu, 22 Oct 2020 at 20:39, Timo Walther < >> twal...@apache.org> >> > > > >>> wrote: >> > > > >>>>>> >> > > > >>>>>>> Hi Shengkai, Hi Jark, >> > > > >>>>>>> >> > > > >>>>>>> thanks for this great proposal. It is time to finally >> connect >> > the >> > > > >>>>>>> changelog processor with a compacted Kafka topic. >> > > > >>>>>>> >> > > > >>>>>>> "The operator will produce INSERT rows, or additionally >> > generate >> > > > >>>>>>> UPDATE_BEFORE rows for the previous image, or produce DELETE >> > rows >> > > > >>> with >> > > > >>>>>>> all columns filled with values." >> > > > >>>>>>> >> > > > >>>>>>> Could you elaborate a bit on the implementation details in >> the >> > > > >> FLIP? >> > > > >>>> How >> > > > >>>>>>> are UPDATE_BEFOREs are generated. How much state is >> required to >> > > > >>>> perform >> > > > >>>>>>> this operation. >> > > > >>>>>>> >> > > > >>>>>>> From a conceptual and semantical point of view, I'm fine >> > with >> > > > >> the >> > > > >>>>>>> proposal. But I would like to share my opinion about how we >> > > expose >> > > > >>>> this >> > > > >>>>>>> feature: >> > > > >>>>>>> >> > > > >>>>>>> ktable vs kafka-compacted >> > > > >>>>>>> >> > > > >>>>>>> I'm against having an additional connector like `ktable` or >> > > > >>>>>>> `kafka-compacted`. We recently simplified the table options >> to >> > a >> > > > >>>> minimum >> > > > >>>>>>> amount of characters to be as concise as possible in the >> DDL. >> > > > >>>> Therefore, >> > > > >>>>>>> I would keep the `connector=kafka` and introduce an >> additional >> > > > >>> option. >> > > > >>>>>>> Because a user wants to read "from Kafka". And the "how" >> should >> > > be >> > > > >>>>>>> determined in the lower options. >> > > > >>>>>>> >> > > > >>>>>>> When people read `connector=ktable` they might not know that >> > this >> > > > >> is >> > > > >>>>>>> Kafka. Or they wonder where `kstream` is? >> > > > >>>>>>> >> > > > >>>>>>> When people read `connector=kafka-compacted` they might not >> > know >> > > > >>> that >> > > > >>>> it >> > > > >>>>>>> has ktable semantics. You don't need to enable log >> compaction >> > in >> > > > >>> order >> > > > >>>>>>> to use a KTable as far as I know. Log compaction and table >> > > > >> semantics >> > > > >>>> are >> > > > >>>>>>> orthogonal topics. >> > > > >>>>>>> >> > > > >>>>>>> In the end we will need 3 types of information when >> declaring a >> > > > >>> Kafka >> > > > >>>>>>> connector: >> > > > >>>>>>> >> > > > >>>>>>> CREATE TABLE ... WITH ( >> > > > >>>>>>> connector=kafka -- Some information about the >> > > connector >> > > > >>>>>>> end-offset = XXXX -- Some information about the >> > > > >> boundedness >> > > > >>>>>>> model = table/stream -- Some information about >> > > > >> interpretation >> > > > >>>>>>> ) >> > > > >>>>>>> >> > > > >>>>>>> >> > > > >>>>>>> We can still apply all the constraints mentioned in the >> FLIP. >> > > When >> > > > >>>>>>> `model` is set to `table`. >> > > > >>>>>>> >> > > > >>>>>>> What do you think? >> > > > >>>>>>> >> > > > >>>>>>> Regards, >> > > > >>>>>>> Timo >> > > > >>>>>>> >> > > > >>>>>>> >> > > > >>>>>>> On 21.10.20 14:19, Jark Wu wrote: >> > > > >>>>>>>> Hi, >> > > > >>>>>>>> >> > > > >>>>>>>> IMO, if we are going to mix them in one connector, >> > > > >>>>>>>> 1) either users need to set some options to a specific >> value >> > > > >>>>> explicitly, >> > > > >>>>>>>> e.g. "scan.startup.mode=earliest", "sink.partitioner=hash", >> > > etc.. >> > > > >>>>>>>> This makes the connector awkward to use. Users may face to >> fix >> > > > >>>> options >> > > > >>>>>>> one >> > > > >>>>>>>> by one according to the exception. >> > > > >>>>>>>> Besides, in the future, it is still possible to use >> > > > >>>>>>>> "sink.partitioner=fixed" (reduce network cost) if users are >> > > aware >> > > > >>> of >> > > > >>>>>>>> the partition routing, >> > > > >>>>>>>> however, it's error-prone to have "fixed" as default for >> > > > >> compacted >> > > > >>>>> mode. >> > > > >>>>>>>> >> > > > >>>>>>>> 2) or make those options a different default value when >> > > > >>>>> "compacted=true". >> > > > >>>>>>>> This would be more confusing and unpredictable if the >> default >> > > > >> value >> > > > >>>> of >> > > > >>>>>>>> options will change according to other options. >> > > > >>>>>>>> What happens if we have a third mode in the future? >> > > > >>>>>>>> >> > > > >>>>>>>> In terms of usage and options, it's very different from the >> > > > >>>>>>>> original "kafka" connector. >> > > > >>>>>>>> It would be more handy to use and less fallible if >> separating >> > > > >> them >> > > > >>>> into >> > > > >>>>>>> two >> > > > >>>>>>>> connectors. >> > > > >>>>>>>> In the implementation layer, we can reuse code as much as >> > > > >> possible. >> > > > >>>>>>>> >> > > > >>>>>>>> Therefore, I'm still +1 to have a new connector. >> > > > >>>>>>>> The "kafka-compacted" name sounds good to me. >> > > > >>>>>>>> >> > > > >>>>>>>> Best, >> > > > >>>>>>>> Jark >> > > > >>>>>>>> >> > > > >>>>>>>> >> > > > >>>>>>>> On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf < >> > > > >> kna...@apache.org> >> > > > >>>>>>> wrote: >> > > > >>>>>>>> >> > > > >>>>>>>>> Hi Kurt, Hi Shengkai, >> > > > >>>>>>>>> >> > > > >>>>>>>>> thanks for answering my questions and the additional >> > > > >>>> clarifications. I >> > > > >>>>>>>>> don't have a strong opinion on whether to extend the >> "kafka" >> > > > >>>> connector >> > > > >>>>>>> or >> > > > >>>>>>>>> to introduce a new connector. So, from my perspective feel >> > free >> > > > >> to >> > > > >>>> go >> > > > >>>>>>> with >> > > > >>>>>>>>> a separate connector. If we do introduce a new connector I >> > > > >>> wouldn't >> > > > >>>>>>> call it >> > > > >>>>>>>>> "ktable" for aforementioned reasons (In addition, we might >> > > > >> suggest >> > > > >>>>> that >> > > > >>>>>>>>> there is also a "kstreams" connector for symmetry >> reasons). I >> > > > >>> don't >> > > > >>>>>>> have a >> > > > >>>>>>>>> good alternative name, though, maybe "kafka-compacted" or >> > > > >>>>>>>>> "compacted-kafka". >> > > > >>>>>>>>> >> > > > >>>>>>>>> Thanks, >> > > > >>>>>>>>> >> > > > >>>>>>>>> Konstantin >> > > > >>>>>>>>> >> > > > >>>>>>>>> >> > > > >>>>>>>>> On Wed, Oct 21, 2020 at 4:43 AM Kurt Young < >> ykt...@gmail.com >> > > >> > > > >>>> wrote: >> > > > >>>>>>>>> >> > > > >>>>>>>>>> Hi all, >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> I want to describe the discussion process which drove us >> to >> > > > >> have >> > > > >>>> such >> > > > >>>>>>>>>> conclusion, this might make some of >> > > > >>>>>>>>>> the design choices easier to understand and keep >> everyone on >> > > > >> the >> > > > >>>> same >> > > > >>>>>>>>> page. >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> Back to the motivation, what functionality do we want to >> > > > >> provide >> > > > >>> in >> > > > >>>>> the >> > > > >>>>>>>>>> first place? We got a lot of feedback and >> > > > >>>>>>>>>> questions from mailing lists that people want to write >> > > > >>>>> Not-Insert-Only >> > > > >>>>>>>>>> messages into kafka. They might be >> > > > >>>>>>>>>> intentional or by accident, e.g. wrote an non-windowed >> > > > >> aggregate >> > > > >>>>> query >> > > > >>>>>>> or >> > > > >>>>>>>>>> non-windowed left outer join. And >> > > > >>>>>>>>>> some users from KSQL world also asked about why Flink >> didn't >> > > > >>>> leverage >> > > > >>>>>>> the >> > > > >>>>>>>>>> Key concept of every kafka topic >> > > > >>>>>>>>>> and make kafka as a dynamic changing keyed table. >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> To work with kafka better, we were thinking to extend the >> > > > >>>>> functionality >> > > > >>>>>>>>> of >> > > > >>>>>>>>>> the current kafka connector by letting it >> > > > >>>>>>>>>> accept updates and deletions. But due to the limitation >> of >> > > > >> kafka, >> > > > >>>> the >> > > > >>>>>>>>>> update has to be "update by key", aka a table >> > > > >>>>>>>>>> with primary key. >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> This introduces a couple of conflicts with current kafka >> > > > >> table's >> > > > >>>>>>> options: >> > > > >>>>>>>>>> 1. key.fields: as said above, we need the kafka table to >> > have >> > > > >> the >> > > > >>>>>>> primary >> > > > >>>>>>>>>> key constraint. And users can also configure >> > > > >>>>>>>>>> key.fields freely, this might cause friction. (Sure we >> can >> > do >> > > > >>> some >> > > > >>>>>>> sanity >> > > > >>>>>>>>>> check on this but it also creates friction.) >> > > > >>>>>>>>>> 2. sink.partitioner: to make the semantics right, we >> need to >> > > > >> make >> > > > >>>>> sure >> > > > >>>>>>>>> all >> > > > >>>>>>>>>> the updates on the same key are written to >> > > > >>>>>>>>>> the same kafka partition, such we should force to use a >> hash >> > > by >> > > > >>> key >> > > > >>>>>>>>>> partition inside such table. Again, this has conflicts >> > > > >>>>>>>>>> and creates friction with current user options. >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> The above things are solvable, though not perfect or most >> > user >> > > > >>>>>>> friendly. >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> Let's take a look at the reading side. The keyed kafka >> table >> > > > >>>> contains >> > > > >>>>>>> two >> > > > >>>>>>>>>> kinds of messages: upsert or deletion. What upsert >> > > > >>>>>>>>>> means is "If the key doesn't exist yet, it's an insert >> > record. >> > > > >>>>>>> Otherwise >> > > > >>>>>>>>>> it's an update record". For the sake of correctness or >> > > > >>>>>>>>>> simplicity, the Flink SQL engine also needs such >> > information. >> > > > >> If >> > > > >>> we >> > > > >>>>>>>>>> interpret all messages to "update record", some queries >> or >> > > > >>>>>>>>>> operators may not work properly. It's weird to see an >> update >> > > > >>> record >> > > > >>>>> but >> > > > >>>>>>>>> you >> > > > >>>>>>>>>> haven't seen the insert record before. >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> So what Flink should do is after reading out the records >> > from >> > > > >>> such >> > > > >>>>>>> table, >> > > > >>>>>>>>>> it needs to create a state to record which messages have >> > > > >>>>>>>>>> been seen and then generate the correct row type >> > > > >> correspondingly. >> > > > >>>>> This >> > > > >>>>>>>>> kind >> > > > >>>>>>>>>> of couples the state and the data of the message >> > > > >>>>>>>>>> queue, and it also creates conflicts with current kafka >> > > > >>> connector. >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> Think about if users suspend a running job (which >> contains >> > > some >> > > > >>>>> reading >> > > > >>>>>>>>>> state now), and then change the start offset of the >> reader. >> > > > >>>>>>>>>> By changing the reading offset, it actually change the >> whole >> > > > >>> story >> > > > >>>> of >> > > > >>>>>>>>>> "which records should be insert messages and which >> records >> > > > >>>>>>>>>> should be update messages). And it will also make Flink >> to >> > > deal >> > > > >>>> with >> > > > >>>>>>>>>> another weird situation that it might receive a deletion >> > > > >>>>>>>>>> on a non existing message. >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> We were unsatisfied with all the frictions and conflicts >> it >> > > > >> will >> > > > >>>>> create >> > > > >>>>>>>>> if >> > > > >>>>>>>>>> we enable the "upsert & deletion" support to the current >> > kafka >> > > > >>>>>>>>>> connector. And later we begin to realize that we >> shouldn't >> > > > >> treat >> > > > >>> it >> > > > >>>>> as >> > > > >>>>>>> a >> > > > >>>>>>>>>> normal message queue, but should treat it as a changing >> > keyed >> > > > >>>>>>>>>> table. We should be able to always get the whole data of >> > such >> > > > >>> table >> > > > >>>>> (by >> > > > >>>>>>>>>> disabling the start offset option) and we can also read >> the >> > > > >>>>>>>>>> changelog out of such table. It's like a HBase table with >> > > > >> binlog >> > > > >>>>>>> support >> > > > >>>>>>>>>> but doesn't have random access capability (which can be >> > > > >> fulfilled >> > > > >>>>>>>>>> by Flink's state). >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> So our intention was instead of telling and persuading >> users >> > > > >> what >> > > > >>>>> kind >> > > > >>>>>>> of >> > > > >>>>>>>>>> options they should or should not use by extending >> > > > >>>>>>>>>> current kafka connector when enable upsert support, we >> are >> > > > >>> actually >> > > > >>>>>>>>> create >> > > > >>>>>>>>>> a whole new and different connector that has total >> > > > >>>>>>>>>> different abstractions in SQL layer, and should be >> treated >> > > > >>> totally >> > > > >>>>>>>>>> different with current kafka connector. >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> Hope this can clarify some of the concerns. >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> Best, >> > > > >>>>>>>>>> Kurt >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang < >> > > > >> fskm...@gmail.com >> > > > >>>> >> > > > >>>>>>> wrote: >> > > > >>>>>>>>>> >> > > > >>>>>>>>>>> Hi devs, >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>>> As many people are still confused about the difference >> > option >> > > > >>>>>>>>> behaviours >> > > > >>>>>>>>>>> between the Kafka connector and KTable connector, Jark >> and >> > I >> > > > >>> list >> > > > >>>>> the >> > > > >>>>>>>>>>> differences in the doc[1]. >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>>> Best, >> > > > >>>>>>>>>>> Shengkai >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>>> [1] >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>> >> > > > >>>>>>>>> >> > > > >>>>>>> >> > > > >>>>> >> > > > >>>> >> > > > >>> >> > > > >> >> > > > >> > > >> > >> https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>>> Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二 >> > 下午12:05写道: >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>>>> Hi Konstantin, >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>>> Thanks for your reply. >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>>>> It uses the "kafka" connector and does not specify a >> > > primary >> > > > >>>> key. >> > > > >>>>>>>>>>>> The dimensional table `users` is a ktable connector >> and we >> > > > >> can >> > > > >>>>>>>>> specify >> > > > >>>>>>>>>>> the >> > > > >>>>>>>>>>>> pk on the KTable. >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>>>> Will it possible to use a "ktable" as a dimensional >> table >> > > in >> > > > >>>>>>>>> FLIP-132 >> > > > >>>>>>>>>>>> Yes. We can specify the watermark on the KTable and it >> can >> > > be >> > > > >>>> used >> > > > >>>>>>>>> as a >> > > > >>>>>>>>>>>> dimension table in temporal join. >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>>>> Introduce a new connector vs introduce a new property >> > > > >>>>>>>>>>>> The main reason behind is that the KTable connector >> almost >> > > > >> has >> > > > >>> no >> > > > >>>>>>>>>> common >> > > > >>>>>>>>>>>> options with the Kafka connector. The options that can >> be >> > > > >>> reused >> > > > >>>> by >> > > > >>>>>>>>>>> KTable >> > > > >>>>>>>>>>>> connectors are 'topic', 'properties.bootstrap.servers' >> and >> > > > >>>>>>>>>>>> 'value.fields-include' . We can't set cdc format for >> > > > >>> 'key.format' >> > > > >>>>> and >> > > > >>>>>>>>>>>> 'value.format' in KTable connector now, which is >> > available >> > > > >> in >> > > > >>>>> Kafka >> > > > >>>>>>>>>>>> connector. Considering the difference between the >> options >> > we >> > > > >>> can >> > > > >>>>> use, >> > > > >>>>>>>>>>> it's >> > > > >>>>>>>>>>>> more suitable to introduce an another connector rather >> > than >> > > a >> > > > >>>>>>>>> property. >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>>> We are also fine to use "compacted-kafka" as the name >> of >> > the >> > > > >>> new >> > > > >>>>>>>>>>>> connector. What do you think? >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>>> Best, >> > > > >>>>>>>>>>>> Shengkai >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>>> Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一 >> > > > >> 下午10:15写道: >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>>>> Hi Shengkai, >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> Thank you for driving this effort. I believe this a >> very >> > > > >>>> important >> > > > >>>>>>>>>>> feature >> > > > >>>>>>>>>>>>> for many users who use Kafka and Flink SQL together. A >> > few >> > > > >>>>> questions >> > > > >>>>>>>>>> and >> > > > >>>>>>>>>>>>> thoughts: >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> * Is your example "Use KTable as a reference/dimension >> > > > >> table" >> > > > >>>>>>>>> correct? >> > > > >>>>>>>>>>> It >> > > > >>>>>>>>>>>>> uses the "kafka" connector and does not specify a >> primary >> > > > >> key. >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> * Will it be possible to use a "ktable" table directly >> > as a >> > > > >>>>>>>>>> dimensional >> > > > >>>>>>>>>>>>> table in temporal join (*based on event time*) >> > (FLIP-132)? >> > > > >>> This >> > > > >>>> is >> > > > >>>>>>>>> not >> > > > >>>>>>>>>>>>> completely clear to me from the FLIP. >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> * I'd personally prefer not to introduce a new >> connector >> > > and >> > > > >>>>> instead >> > > > >>>>>>>>>> to >> > > > >>>>>>>>>>>>> extend the Kafka connector. We could add an additional >> > > > >>> property >> > > > >>>>>>>>>>>>> "compacted" >> > > > >>>>>>>>>>>>> = "true"|"false". If it is set to "true", we can add >> > > > >>> additional >> > > > >>>>>>>>>>> validation >> > > > >>>>>>>>>>>>> logic (e.g. "scan.startup.mode" can not be set, >> primary >> > key >> > > > >>>>>>>>> required, >> > > > >>>>>>>>>>>>> etc.). If we stick to a separate connector I'd not >> call >> > it >> > > > >>>>> "ktable", >> > > > >>>>>>>>>> but >> > > > >>>>>>>>>>>>> rather "compacted-kafka" or similar. KTable seems to >> > carry >> > > > >>> more >> > > > >>>>>>>>>> implicit >> > > > >>>>>>>>>>>>> meaning than we want to imply here. >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> * I agree that this is not a bounded source. If we >> want >> > to >> > > > >>>>> support a >> > > > >>>>>>>>>>>>> bounded mode, this is an orthogonal concern that also >> > > > >> applies >> > > > >>> to >> > > > >>>>>>>>> other >> > > > >>>>>>>>>>>>> unbounded sources. >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> Best, >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> Konstantin >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu < >> > imj...@gmail.com> >> > > > >>>> wrote: >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>> Hi Danny, >> > > > >>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>> First of all, we didn't introduce any concepts from >> KSQL >> > > > >>> (e.g. >> > > > >>>>>>>>>> Stream >> > > > >>>>>>>>>>> vs >> > > > >>>>>>>>>>>>>> Table notion). >> > > > >>>>>>>>>>>>>> This new connector will produce a changelog stream, >> so >> > > it's >> > > > >>>> still >> > > > >>>>>>>>> a >> > > > >>>>>>>>>>>>> dynamic >> > > > >>>>>>>>>>>>>> table and doesn't conflict with Flink core concepts. >> > > > >>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>> The "ktable" is just a connector name, we can also >> call >> > it >> > > > >>>>>>>>>>>>>> "compacted-kafka" or something else. >> > > > >>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can >> > migrate >> > > > >> to >> > > > >>>>>>>>> Flink >> > > > >>>>>>>>>>> SQL >> > > > >>>>>>>>>>>>>> easily. >> > > > >>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>> Regarding to why introducing a new connector vs a new >> > > > >>> property >> > > > >>>> in >> > > > >>>>>>>>>>>>> existing >> > > > >>>>>>>>>>>>>> kafka connector: >> > > > >>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>> I think the main reason is that we want to have a >> clear >> > > > >>>>> separation >> > > > >>>>>>>>>> for >> > > > >>>>>>>>>>>>> such >> > > > >>>>>>>>>>>>>> two use cases, because they are very different. >> > > > >>>>>>>>>>>>>> We also listed reasons in the FLIP, including: >> > > > >>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>> 1) It's hard to explain what's the behavior when >> users >> > > > >>> specify >> > > > >>>>> the >> > > > >>>>>>>>>>> start >> > > > >>>>>>>>>>>>>> offset from a middle position (e.g. how to process >> non >> > > > >> exist >> > > > >>>>>>>>> delete >> > > > >>>>>>>>>>>>>> events). >> > > > >>>>>>>>>>>>>> It's dangerous if users do that. So we don't >> > > provide >> > > > >>> the >> > > > >>>>>>>>> offset >> > > > >>>>>>>>>>>>> option >> > > > >>>>>>>>>>>>>> in the new connector at the moment. >> > > > >>>>>>>>>>>>>> 2) It's a different perspective/abstraction on the >> same >> > > > >> kafka >> > > > >>>>>>>>> topic >> > > > >>>>>>>>>>>>> (append >> > > > >>>>>>>>>>>>>> vs. upsert). It would be easier to understand if we >> can >> > > > >>>> separate >> > > > >>>>>>>>>> them >> > > > >>>>>>>>>>>>>> instead of mixing them in one connector. The >> new >> > > > >>>> connector >> > > > >>>>>>>>>>> requires >> > > > >>>>>>>>>>>>>> hash sink partitioner, primary key declared, regular >> > > > >> format. >> > > > >>>>>>>>>>>>>> If we mix them in one connector, it might be >> > > > >> confusing >> > > > >>>> how >> > > > >>>>> to >> > > > >>>>>>>>>> use >> > > > >>>>>>>>>>>>> the >> > > > >>>>>>>>>>>>>> options correctly. >> > > > >>>>>>>>>>>>>> 3) The semantic of the KTable connector is just the >> same >> > > as >> > > > >>>>> KTable >> > > > >>>>>>>>>> in >> > > > >>>>>>>>>>>>> Kafka >> > > > >>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and KSQL >> > > users. >> > > > >>>>>>>>>>>>>> We have seen several questions in the mailing >> > list >> > > > >>> asking >> > > > >>>>> how >> > > > >>>>>>>>> to >> > > > >>>>>>>>>>>>> model >> > > > >>>>>>>>>>>>>> a KTable and how to join a KTable in Flink SQL. >> > > > >>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>> Best, >> > > > >>>>>>>>>>>>>> Jark >> > > > >>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 19:53, Jark Wu < >> imj...@gmail.com >> > > >> > > > >>>> wrote: >> > > > >>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>> Hi Jingsong, >> > > > >>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>> As the FLIP describes, "KTable connector produces a >> > > > >>> changelog >> > > > >>>>>>>>>>> stream, >> > > > >>>>>>>>>>>>>>> where each data record represents an update or >> delete >> > > > >>> event.". >> > > > >>>>>>>>>>>>>>> Therefore, a ktable source is an unbounded stream >> > source. >> > > > >>>>>>>>>> Selecting >> > > > >>>>>>>>>>> a >> > > > >>>>>>>>>>>>>>> ktable source is similar to selecting a kafka source >> > with >> > > > >>>>>>>>>>>>> debezium-json >> > > > >>>>>>>>>>>>>>> format >> > > > >>>>>>>>>>>>>>> that it never ends and the results are continuously >> > > > >> updated. >> > > > >>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>> It's possible to have a bounded ktable source in the >> > > > >> future, >> > > > >>>> for >> > > > >>>>>>>>>>>>> example, >> > > > >>>>>>>>>>>>>>> add an option 'bounded=true' or 'end-offset=xxx'. >> > > > >>>>>>>>>>>>>>> In this way, the ktable will produce a bounded >> > changelog >> > > > >>>> stream. >> > > > >>>>>>>>>>>>>>> So I think this can be a compatible feature in the >> > > future. >> > > > >>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>> I don't think we should associate with ksql related >> > > > >>> concepts. >> > > > >>>>>>>>>>>>> Actually, >> > > > >>>>>>>>>>>>>> we >> > > > >>>>>>>>>>>>>>> didn't introduce any concepts from KSQL (e.g. >> Stream vs >> > > > >>> Table >> > > > >>>>>>>>>>> notion). >> > > > >>>>>>>>>>>>>>> The "ktable" is just a connector name, we can also >> call >> > > it >> > > > >>>>>>>>>>>>>>> "compacted-kafka" or something else. >> > > > >>>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can >> > > migrate >> > > > >>> to >> > > > >>>>>>>>>> Flink >> > > > >>>>>>>>>>>>> SQL >> > > > >>>>>>>>>>>>>>> easily. >> > > > >>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>> Regarding the "value.fields-include", this is an >> option >> > > > >>>>>>>>> introduced >> > > > >>>>>>>>>>> in >> > > > >>>>>>>>>>>>>>> FLIP-107 for Kafka connector. >> > > > >>>>>>>>>>>>>>> I think we should keep the same behavior with the >> Kafka >> > > > >>>>>>>>> connector. >> > > > >>>>>>>>>>> I'm >> > > > >>>>>>>>>>>>>> not >> > > > >>>>>>>>>>>>>>> sure what's the default behavior of KSQL. >> > > > >>>>>>>>>>>>>>> But I guess it also stores the keys in value from >> this >> > > > >>> example >> > > > >>>>>>>>>> docs >> > > > >>>>>>>>>>>>> (see >> > > > >>>>>>>>>>>>>>> the "users_original" table) [1]. >> > > > >>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>> Best, >> > > > >>>>>>>>>>>>>>> Jark >> > > > >>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>> [1]: >> > > > >>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>> >> > > > >>>>>>>>> >> > > > >>>>>>> >> > > > >>>>> >> > > > >>>> >> > > > >>> >> > > > >> >> > > > >> > > >> > >> https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table >> > > > >>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 18:17, Danny Chan < >> > > > >>>> yuzhao....@gmail.com> >> > > > >>>>>>>>>>>>> wrote: >> > > > >>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>> The concept seems conflicts with the Flink >> abstraction >> > > > >>>> “dynamic >> > > > >>>>>>>>>>>>> table”, >> > > > >>>>>>>>>>>>>>>> in Flink we see both “stream” and “table” as a >> dynamic >> > > > >>> table, >> > > > >>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>> I think we should make clear first how to express >> > stream >> > > > >>> and >> > > > >>>>>>>>>> table >> > > > >>>>>>>>>>>>>>>> specific features on one “dynamic table”, >> > > > >>>>>>>>>>>>>>>> it is more natural for KSQL because KSQL takes >> stream >> > > and >> > > > >>>> table >> > > > >>>>>>>>>> as >> > > > >>>>>>>>>>>>>>>> different abstractions for representing >> collections. >> > In >> > > > >>> KSQL, >> > > > >>>>>>>>>> only >> > > > >>>>>>>>>>>>>> table is >> > > > >>>>>>>>>>>>>>>> mutable and can have a primary key. >> > > > >>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>> Does this connector belongs to the “table” scope or >> > > > >>> “stream” >> > > > >>>>>>>>>> scope >> > > > >>>>>>>>>>> ? >> > > > >>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>> Some of the concepts (such as the primary key on >> > stream) >> > > > >>>> should >> > > > >>>>>>>>>> be >> > > > >>>>>>>>>>>>>>>> suitable for all the connectors, not just Kafka, >> > > > >> Shouldn’t >> > > > >>>> this >> > > > >>>>>>>>>> be >> > > > >>>>>>>>>>> an >> > > > >>>>>>>>>>>>>>>> extension of existing Kafka connector instead of a >> > > > >> totally >> > > > >>>> new >> > > > >>>>>>>>>>>>>> connector ? >> > > > >>>>>>>>>>>>>>>> What about the other connectors ? >> > > > >>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>> Because this touches the core abstraction of >> Flink, we >> > > > >>> better >> > > > >>>>>>>>>> have >> > > > >>>>>>>>>>> a >> > > > >>>>>>>>>>>>>>>> top-down overall design, following the KSQL >> directly >> > is >> > > > >> not >> > > > >>>> the >> > > > >>>>>>>>>>>>> answer. >> > > > >>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>> P.S. For the source >> > > > >>>>>>>>>>>>>>>>> Shouldn’t this be an extension of existing Kafka >> > > > >> connector >> > > > >>>>>>>>>>> instead >> > > > >>>>>>>>>>>>> of >> > > > >>>>>>>>>>>>>> a >> > > > >>>>>>>>>>>>>>>> totally new connector ? >> > > > >>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>> How could we achieve that (e.g. set up the >> parallelism >> > > > >>>>>>>>>> correctly) ? >> > > > >>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>> Best, >> > > > >>>>>>>>>>>>>>>> Danny Chan >> > > > >>>>>>>>>>>>>>>> 在 2020年10月19日 +0800 PM5:17,Jingsong Li < >> > > > >>>> jingsongl...@gmail.com >> > > > >>>>>>>>>>>> ,写道: >> > > > >>>>>>>>>>>>>>>>> Thanks Shengkai for your proposal. >> > > > >>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>> +1 for this feature. >> > > > >>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>>> Future Work: Support bounded KTable source >> > > > >>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>> I don't think it should be a future work, I think >> it >> > is >> > > > >>> one >> > > > >>>>>>>>> of >> > > > >>>>>>>>>>> the >> > > > >>>>>>>>>>>>>>>>> important concepts of this FLIP. We need to >> > understand >> > > > >> it >> > > > >>>>>>>>> now. >> > > > >>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>> Intuitively, a ktable in my opinion is a bounded >> > table >> > > > >>>> rather >> > > > >>>>>>>>>>> than >> > > > >>>>>>>>>>>>> a >> > > > >>>>>>>>>>>>>>>>> stream, so select should produce a bounded table >> by >> > > > >>> default. >> > > > >>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>> I think we can list Kafka related knowledge, >> because >> > > the >> > > > >>>> word >> > > > >>>>>>>>>>>>> `ktable` >> > > > >>>>>>>>>>>>>>>> is >> > > > >>>>>>>>>>>>>>>>> easy to associate with ksql related concepts. (If >> > > > >>> possible, >> > > > >>>>>>>>>> it's >> > > > >>>>>>>>>>>>>> better >> > > > >>>>>>>>>>>>>>>> to >> > > > >>>>>>>>>>>>>>>>> unify with it) >> > > > >>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>> What do you think? >> > > > >>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>>> value.fields-include >> > > > >>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>> What about the default behavior of KSQL? >> > > > >>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>> Best, >> > > > >>>>>>>>>>>>>>>>> Jingsong >> > > > >>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>> On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang < >> > > > >>>>>>>>>> fskm...@gmail.com >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>> wrote: >> > > > >>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>>> Hi, devs. >> > > > >>>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>>> Jark and I want to start a new FLIP to introduce >> the >> > > > >>> KTable >> > > > >>>>>>>>>>>>>>>> connector. The >> > > > >>>>>>>>>>>>>>>>>> KTable is a shortcut of "Kafka Table", it also >> has >> > the >> > > > >>> same >> > > > >>>>>>>>>>>>>> semantics >> > > > >>>>>>>>>>>>>>>> with >> > > > >>>>>>>>>>>>>>>>>> the KTable notion in Kafka Stream. >> > > > >>>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>>> FLIP-149: >> > > > >>>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>> >> > > > >>>>>>>>> >> > > > >>>>>>> >> > > > >>>>> >> > > > >>>> >> > > > >>> >> > > > >> >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector >> > > > >>>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>>> Currently many users have expressed their needs >> for >> > > the >> > > > >>>>>>>>>> upsert >> > > > >>>>>>>>>>>>> Kafka >> > > > >>>>>>>>>>>>>>>> by >> > > > >>>>>>>>>>>>>>>>>> mail lists and issues. The KTable connector has >> > > several >> > > > >>>>>>>>>>> benefits >> > > > >>>>>>>>>>>>> for >> > > > >>>>>>>>>>>>>>>> users: >> > > > >>>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>>> 1. Users are able to interpret a compacted Kafka >> > Topic >> > > > >> as >> > > > >>>>>>>>> an >> > > > >>>>>>>>>>>>> upsert >> > > > >>>>>>>>>>>>>>>> stream >> > > > >>>>>>>>>>>>>>>>>> in Apache Flink. And also be able to write a >> > changelog >> > > > >>>>>>>>> stream >> > > > >>>>>>>>>>> to >> > > > >>>>>>>>>>>>>> Kafka >> > > > >>>>>>>>>>>>>>>>>> (into a compacted topic). >> > > > >>>>>>>>>>>>>>>>>> 2. As a part of the real time pipeline, store >> join >> > or >> > > > >>>>>>>>>> aggregate >> > > > >>>>>>>>>>>>>>>> result (may >> > > > >>>>>>>>>>>>>>>>>> contain updates) into a Kafka topic for further >> > > > >>>>>>>>> calculation; >> > > > >>>>>>>>>>>>>>>>>> 3. The semantic of the KTable connector is just >> the >> > > > >> same >> > > > >>> as >> > > > >>>>>>>>>>>>> KTable >> > > > >>>>>>>>>>>>>> in >> > > > >>>>>>>>>>>>>>>> Kafka >> > > > >>>>>>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and >> KSQL >> > > > >>> users. >> > > > >>>>>>>>>> We >> > > > >>>>>>>>>>>>> have >> > > > >>>>>>>>>>>>>>>> seen >> > > > >>>>>>>>>>>>>>>>>> several questions in the mailing list asking how >> to >> > > > >>> model a >> > > > >>>>>>>>>>>>> KTable >> > > > >>>>>>>>>>>>>>>> and how >> > > > >>>>>>>>>>>>>>>>>> to join a KTable in Flink SQL. >> > > > >>>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>>> We hope it can expand the usage of the Flink with >> > > > >> Kafka. >> > > > >>>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>>> I'm looking forward to your feedback. >> > > > >>>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>>> Best, >> > > > >>>>>>>>>>>>>>>>>> Shengkai >> > > > >>>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>>>> -- >> > > > >>>>>>>>>>>>>>>>> Best, Jingsong Lee >> > > > >>>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> -- >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> Konstantin Knauf >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> https://twitter.com/snntrable >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> https://github.com/knaufk >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>> >> > > > >>>>>>>>> >> > > > >>>>>>>>> >> > > > >>>>>>>>> -- >> > > > >>>>>>>>> >> > > > >>>>>>>>> Konstantin Knauf >> > > > >>>>>>>>> >> > > > >>>>>>>>> https://twitter.com/snntrable >> > > > >>>>>>>>> >> > > > >>>>>>>>> https://github.com/knaufk >> > > > >>>>>>>>> >> > > > >>>>>>>> >> > > > >>>>>>> >> > > > >>>>>>> >> > > > >>>>>> >> > > > >>>>> >> > > > >>>>> >> > > > >>>> >> > > > >>>> -- >> > > > >>>> >> > > > >>>> Seth Wiesman | Solutions Architect >> > > > >>>> >> > > > >>>> +1 314 387 1463 >> > > > >>>> >> > > > >>>> <https://www.ververica.com/> >> > > > >>>> >> > > > >>>> Follow us @VervericaData >> > > > >>>> >> > > > >>>> -- >> > > > >>>> >> > > > >>>> Join Flink Forward <https://flink-forward.org/> - The Apache >> > Flink >> > > > >>>> Conference >> > > > >>>> >> > > > >>>> Stream Processing | Event Driven | Real Time >> > > > >>>> >> > > > >>> >> > > > >> >> > > > > >> > > > >> > > > >> > > >> > > -- >> > > Best, Jingsong Lee >> > > >> > >> >> >> -- >> Best, Jingsong Lee >> >