Hi Timo, I have some concerns about `kafka-cdc`, 1) cdc is an abbreviation of Change Data Capture which is commonly used for databases, not for message queues. 2) usually, cdc produces full content of changelog, including UPDATE_BEFORE, however "upsert kafka" doesn't 3) `kafka-cdc` sounds like a natively support for `debezium-json` format, however, it is not and even we don't want "upsert kafka" to support "debezium-json"
Hi Jingsong, I think the terminology of "upsert" is fine, because Kafka also uses "upsert" to define such behavior in their official documentation [1]: > a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE Materialize uses the "UPSERT" keyword to define such behavior too [2]. Users have been requesting such feature using "upsert kafka" terminology in user mailing lists [3][4]. Many other systems support "UPSERT" statement natively, such as impala [5], SAP [6], Phoenix [7], Oracle NoSQL [8], etc.. Therefore, I think we don't need to be afraid of introducing "upsert" terminology, it is widely accepted by users. Best, Jark [1]: https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable [2]: https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic [3]: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-materialized-upsert-tables-td18482.html#a18503 [4]: http://apache-flink.147419.n8.nabble.com/Kafka-Sink-AppendStreamTableSink-doesn-t-support-consuming-update-changes-td5959.html [5]: https://impala.apache.org/docs/build/html/topics/impala_upsert.html [6]: https://help.sap.com/viewer/7c78579ce9b14a669c1f3295b0d8ca16/Cloud/en-US/ea8b6773be584203bcd99da76844c5ed.html [7]: https://phoenix.apache.org/atomic_upsert.html [8]: https://docs.oracle.com/en/database/other-databases/nosql-database/18.3/sqlfornosql/adding-table-rows-using-insert-and-upsert-statements.html On Fri, 23 Oct 2020 at 10:36, Jingsong Li <jingsongl...@gmail.com> wrote: > The `kafka-cdc` looks good to me. > We can even give options to indicate whether to turn on compact, because > compact is just an optimization? > > - ktable let me think about KSQL. > - kafka-compacted it is not just compacted, more than that, it still has > the ability of CDC > - upsert-kafka , upsert is back, and I don't really want to see it again > since we have CDC > > Best, > Jingsong > > On Fri, Oct 23, 2020 at 2:21 AM Timo Walther <twal...@apache.org> wrote: > > > Hi Jark, > > > > I would be fine with `connector=upsert-kafka`. Another idea would be to > > align the name to other available Flink connectors [1]: > > > > `connector=kafka-cdc`. > > > > Regards, > > Timo > > > > [1] https://github.com/ververica/flink-cdc-connectors > > > > On 22.10.20 17:17, Jark Wu wrote: > > > Another name is "connector=upsert-kafka', I think this can solve Timo's > > > concern on the "compacted" word. > > > > > > Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify such > > kafka > > > sources. > > > I think "upsert" is a well-known terminology widely used in many > systems > > > and matches the > > > behavior of how we handle the kafka messages. > > > > > > What do you think? > > > > > > Best, > > > Jark > > > > > > [1]: > > > > > > https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic > > > > > > > > > > > > > > > On Thu, 22 Oct 2020 at 22:53, Kurt Young <ykt...@gmail.com> wrote: > > > > > >> Good validation messages can't solve the broken user experience, > > especially > > >> that > > >> such update mode option will implicitly make half of current kafka > > options > > >> invalid or doesn't > > >> make sense. > > >> > > >> Best, > > >> Kurt > > >> > > >> > > >> On Thu, Oct 22, 2020 at 10:31 PM Jark Wu <imj...@gmail.com> wrote: > > >> > > >>> Hi Timo, Seth, > > >>> > > >>> The default value "inserting" of "mode" might be not suitable, > > >>> because "debezium-json" emits changelog messages which include > updates. > > >>> > > >>> On Thu, 22 Oct 2020 at 22:10, Seth Wiesman <s...@ververica.com> > wrote: > > >>> > > >>>> +1 for supporting upsert results into Kafka. > > >>>> > > >>>> I have no comments on the implementation details. > > >>>> > > >>>> As far as configuration goes, I tend to favor Timo's option where we > > >> add > > >>> a > > >>>> "mode" property to the existing Kafka table with default value > > >>> "inserting". > > >>>> If the mode is set to "updating" then the validation changes to the > > new > > >>>> requirements. I personally find it more intuitive than a seperate > > >>>> connector, my fear is users won't understand its the same physical > > >> kafka > > >>>> sink under the hood and it will lead to other confusion like does it > > >>> offer > > >>>> the same persistence guarantees? I think we are capable of adding > good > > >>>> valdiation messaging that solves Jark and Kurts concerns. > > >>>> > > >>>> > > >>>> On Thu, Oct 22, 2020 at 8:51 AM Timo Walther <twal...@apache.org> > > >> wrote: > > >>>> > > >>>>> Hi Jark, > > >>>>> > > >>>>> "calling it "kafka-compacted" can even remind users to enable log > > >>>>> compaction" > > >>>>> > > >>>>> But sometimes users like to store a lineage of changes in their > > >> topics. > > >>>>> Indepent of any ktable/kstream interpretation. > > >>>>> > > >>>>> I let the majority decide on this topic to not further block this > > >>>>> effort. But we might find a better name like: > > >>>>> > > >>>>> connector = kafka > > >>>>> mode = updating/inserting > > >>>>> > > >>>>> OR > > >>>>> > > >>>>> connector = kafka-updating > > >>>>> > > >>>>> ... > > >>>>> > > >>>>> Regards, > > >>>>> Timo > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> On 22.10.20 15:24, Jark Wu wrote: > > >>>>>> Hi Timo, > > >>>>>> > > >>>>>> Thanks for your opinions. > > >>>>>> > > >>>>>> 1) Implementation > > >>>>>> We will have an stateful operator to generate INSERT and > > >>> UPDATE_BEFORE. > > >>>>>> This operator is keyby-ed (primary key as the shuffle key) after > > >> the > > >>>>> source > > >>>>>> operator. > > >>>>>> The implementation of this operator is very similar to the > existing > > >>>>>> `DeduplicateKeepLastRowFunction`. > > >>>>>> The operator will register a value state using the primary key > > >> fields > > >>>> as > > >>>>>> keys. > > >>>>>> When the value state is empty under current key, we will emit > > >> INSERT > > >>>> for > > >>>>>> the input row. > > >>>>>> When the value state is not empty under current key, we will emit > > >>>>>> UPDATE_BEFORE using the row in state, > > >>>>>> and emit UPDATE_AFTER using the input row. > > >>>>>> When the input row is DELETE, we will clear state and emit DELETE > > >>> row. > > >>>>>> > > >>>>>> 2) new option vs new connector > > >>>>>>> We recently simplified the table options to a minimum amount of > > >>>>>> characters to be as concise as possible in the DDL. > > >>>>>> I think this is the reason why we want to introduce a new > > >> connector, > > >>>>>> because we can simplify the options in DDL. > > >>>>>> For example, if using a new option, the DDL may look like this: > > >>>>>> > > >>>>>> CREATE TABLE users ( > > >>>>>> user_id BIGINT, > > >>>>>> user_name STRING, > > >>>>>> user_level STRING, > > >>>>>> region STRING, > > >>>>>> PRIMARY KEY (user_id) NOT ENFORCED > > >>>>>> ) WITH ( > > >>>>>> 'connector' = 'kafka', > > >>>>>> 'model' = 'table', > > >>>>>> 'topic' = 'pageviews_per_region', > > >>>>>> 'properties.bootstrap.servers' = '...', > > >>>>>> 'properties.group.id' = 'testGroup', > > >>>>>> 'scan.startup.mode' = 'earliest', > > >>>>>> 'key.format' = 'csv', > > >>>>>> 'key.fields' = 'user_id', > > >>>>>> 'value.format' = 'avro', > > >>>>>> 'sink.partitioner' = 'hash' > > >>>>>> ); > > >>>>>> > > >>>>>> If using a new connector, we can have a different default value > for > > >>> the > > >>>>>> options and remove unnecessary options, > > >>>>>> the DDL can look like this which is much more concise: > > >>>>>> > > >>>>>> CREATE TABLE pageviews_per_region ( > > >>>>>> user_id BIGINT, > > >>>>>> user_name STRING, > > >>>>>> user_level STRING, > > >>>>>> region STRING, > > >>>>>> PRIMARY KEY (user_id) NOT ENFORCED > > >>>>>> ) WITH ( > > >>>>>> 'connector' = 'kafka-compacted', > > >>>>>> 'topic' = 'pageviews_per_region', > > >>>>>> 'properties.bootstrap.servers' = '...', > > >>>>>> 'key.format' = 'csv', > > >>>>>> 'value.format' = 'avro' > > >>>>>> ); > > >>>>>> > > >>>>>>> When people read `connector=kafka-compacted` they might not know > > >>> that > > >>>> it > > >>>>>>> has ktable semantics. You don't need to enable log compaction in > > >>> order > > >>>>>>> to use a KTable as far as I know. > > >>>>>> We don't need to let users know it has ktable semantics, as > > >>> Konstantin > > >>>>>> mentioned this may carry more implicit > > >>>>>> meaning than we want to imply here. I agree users don't need to > > >>> enable > > >>>>> log > > >>>>>> compaction, but from the production perspective, > > >>>>>> log compaction should always be enabled if it is used in this > > >>> purpose. > > >>>>>> Calling it "kafka-compacted" can even remind users to enable log > > >>>>> compaction. > > >>>>>> > > >>>>>> I don't agree to introduce "model = table/stream" option, or > > >>>>>> "connector=kafka-table", > > >>>>>> because this means we are introducing Table vs Stream concept from > > >>>> KSQL. > > >>>>>> However, we don't have such top-level concept in Flink SQL now, > > >> this > > >>>> will > > >>>>>> further confuse users. > > >>>>>> In Flink SQL, all the things are STREAM, the differences are > > >> whether > > >>> it > > >>>>> is > > >>>>>> bounded or unbounded, > > >>>>>> whether it is insert-only or changelog. > > >>>>>> > > >>>>>> > > >>>>>> Best, > > >>>>>> Jark > > >>>>>> > > >>>>>> > > >>>>>> On Thu, 22 Oct 2020 at 20:39, Timo Walther <twal...@apache.org> > > >>> wrote: > > >>>>>> > > >>>>>>> Hi Shengkai, Hi Jark, > > >>>>>>> > > >>>>>>> thanks for this great proposal. It is time to finally connect the > > >>>>>>> changelog processor with a compacted Kafka topic. > > >>>>>>> > > >>>>>>> "The operator will produce INSERT rows, or additionally generate > > >>>>>>> UPDATE_BEFORE rows for the previous image, or produce DELETE rows > > >>> with > > >>>>>>> all columns filled with values." > > >>>>>>> > > >>>>>>> Could you elaborate a bit on the implementation details in the > > >> FLIP? > > >>>> How > > >>>>>>> are UPDATE_BEFOREs are generated. How much state is required to > > >>>> perform > > >>>>>>> this operation. > > >>>>>>> > > >>>>>>> From a conceptual and semantical point of view, I'm fine with > > >> the > > >>>>>>> proposal. But I would like to share my opinion about how we > expose > > >>>> this > > >>>>>>> feature: > > >>>>>>> > > >>>>>>> ktable vs kafka-compacted > > >>>>>>> > > >>>>>>> I'm against having an additional connector like `ktable` or > > >>>>>>> `kafka-compacted`. We recently simplified the table options to a > > >>>> minimum > > >>>>>>> amount of characters to be as concise as possible in the DDL. > > >>>> Therefore, > > >>>>>>> I would keep the `connector=kafka` and introduce an additional > > >>> option. > > >>>>>>> Because a user wants to read "from Kafka". And the "how" should > be > > >>>>>>> determined in the lower options. > > >>>>>>> > > >>>>>>> When people read `connector=ktable` they might not know that this > > >> is > > >>>>>>> Kafka. Or they wonder where `kstream` is? > > >>>>>>> > > >>>>>>> When people read `connector=kafka-compacted` they might not know > > >>> that > > >>>> it > > >>>>>>> has ktable semantics. You don't need to enable log compaction in > > >>> order > > >>>>>>> to use a KTable as far as I know. Log compaction and table > > >> semantics > > >>>> are > > >>>>>>> orthogonal topics. > > >>>>>>> > > >>>>>>> In the end we will need 3 types of information when declaring a > > >>> Kafka > > >>>>>>> connector: > > >>>>>>> > > >>>>>>> CREATE TABLE ... WITH ( > > >>>>>>> connector=kafka -- Some information about the > connector > > >>>>>>> end-offset = XXXX -- Some information about the > > >> boundedness > > >>>>>>> model = table/stream -- Some information about > > >> interpretation > > >>>>>>> ) > > >>>>>>> > > >>>>>>> > > >>>>>>> We can still apply all the constraints mentioned in the FLIP. > When > > >>>>>>> `model` is set to `table`. > > >>>>>>> > > >>>>>>> What do you think? > > >>>>>>> > > >>>>>>> Regards, > > >>>>>>> Timo > > >>>>>>> > > >>>>>>> > > >>>>>>> On 21.10.20 14:19, Jark Wu wrote: > > >>>>>>>> Hi, > > >>>>>>>> > > >>>>>>>> IMO, if we are going to mix them in one connector, > > >>>>>>>> 1) either users need to set some options to a specific value > > >>>>> explicitly, > > >>>>>>>> e.g. "scan.startup.mode=earliest", "sink.partitioner=hash", > etc.. > > >>>>>>>> This makes the connector awkward to use. Users may face to fix > > >>>> options > > >>>>>>> one > > >>>>>>>> by one according to the exception. > > >>>>>>>> Besides, in the future, it is still possible to use > > >>>>>>>> "sink.partitioner=fixed" (reduce network cost) if users are > aware > > >>> of > > >>>>>>>> the partition routing, > > >>>>>>>> however, it's error-prone to have "fixed" as default for > > >> compacted > > >>>>> mode. > > >>>>>>>> > > >>>>>>>> 2) or make those options a different default value when > > >>>>> "compacted=true". > > >>>>>>>> This would be more confusing and unpredictable if the default > > >> value > > >>>> of > > >>>>>>>> options will change according to other options. > > >>>>>>>> What happens if we have a third mode in the future? > > >>>>>>>> > > >>>>>>>> In terms of usage and options, it's very different from the > > >>>>>>>> original "kafka" connector. > > >>>>>>>> It would be more handy to use and less fallible if separating > > >> them > > >>>> into > > >>>>>>> two > > >>>>>>>> connectors. > > >>>>>>>> In the implementation layer, we can reuse code as much as > > >> possible. > > >>>>>>>> > > >>>>>>>> Therefore, I'm still +1 to have a new connector. > > >>>>>>>> The "kafka-compacted" name sounds good to me. > > >>>>>>>> > > >>>>>>>> Best, > > >>>>>>>> Jark > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf < > > >> kna...@apache.org> > > >>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Hi Kurt, Hi Shengkai, > > >>>>>>>>> > > >>>>>>>>> thanks for answering my questions and the additional > > >>>> clarifications. I > > >>>>>>>>> don't have a strong opinion on whether to extend the "kafka" > > >>>> connector > > >>>>>>> or > > >>>>>>>>> to introduce a new connector. So, from my perspective feel free > > >> to > > >>>> go > > >>>>>>> with > > >>>>>>>>> a separate connector. If we do introduce a new connector I > > >>> wouldn't > > >>>>>>> call it > > >>>>>>>>> "ktable" for aforementioned reasons (In addition, we might > > >> suggest > > >>>>> that > > >>>>>>>>> there is also a "kstreams" connector for symmetry reasons). I > > >>> don't > > >>>>>>> have a > > >>>>>>>>> good alternative name, though, maybe "kafka-compacted" or > > >>>>>>>>> "compacted-kafka". > > >>>>>>>>> > > >>>>>>>>> Thanks, > > >>>>>>>>> > > >>>>>>>>> Konstantin > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On Wed, Oct 21, 2020 at 4:43 AM Kurt Young <ykt...@gmail.com> > > >>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> Hi all, > > >>>>>>>>>> > > >>>>>>>>>> I want to describe the discussion process which drove us to > > >> have > > >>>> such > > >>>>>>>>>> conclusion, this might make some of > > >>>>>>>>>> the design choices easier to understand and keep everyone on > > >> the > > >>>> same > > >>>>>>>>> page. > > >>>>>>>>>> > > >>>>>>>>>> Back to the motivation, what functionality do we want to > > >> provide > > >>> in > > >>>>> the > > >>>>>>>>>> first place? We got a lot of feedback and > > >>>>>>>>>> questions from mailing lists that people want to write > > >>>>> Not-Insert-Only > > >>>>>>>>>> messages into kafka. They might be > > >>>>>>>>>> intentional or by accident, e.g. wrote an non-windowed > > >> aggregate > > >>>>> query > > >>>>>>> or > > >>>>>>>>>> non-windowed left outer join. And > > >>>>>>>>>> some users from KSQL world also asked about why Flink didn't > > >>>> leverage > > >>>>>>> the > > >>>>>>>>>> Key concept of every kafka topic > > >>>>>>>>>> and make kafka as a dynamic changing keyed table. > > >>>>>>>>>> > > >>>>>>>>>> To work with kafka better, we were thinking to extend the > > >>>>> functionality > > >>>>>>>>> of > > >>>>>>>>>> the current kafka connector by letting it > > >>>>>>>>>> accept updates and deletions. But due to the limitation of > > >> kafka, > > >>>> the > > >>>>>>>>>> update has to be "update by key", aka a table > > >>>>>>>>>> with primary key. > > >>>>>>>>>> > > >>>>>>>>>> This introduces a couple of conflicts with current kafka > > >> table's > > >>>>>>> options: > > >>>>>>>>>> 1. key.fields: as said above, we need the kafka table to have > > >> the > > >>>>>>> primary > > >>>>>>>>>> key constraint. And users can also configure > > >>>>>>>>>> key.fields freely, this might cause friction. (Sure we can do > > >>> some > > >>>>>>> sanity > > >>>>>>>>>> check on this but it also creates friction.) > > >>>>>>>>>> 2. sink.partitioner: to make the semantics right, we need to > > >> make > > >>>>> sure > > >>>>>>>>> all > > >>>>>>>>>> the updates on the same key are written to > > >>>>>>>>>> the same kafka partition, such we should force to use a hash > by > > >>> key > > >>>>>>>>>> partition inside such table. Again, this has conflicts > > >>>>>>>>>> and creates friction with current user options. > > >>>>>>>>>> > > >>>>>>>>>> The above things are solvable, though not perfect or most user > > >>>>>>> friendly. > > >>>>>>>>>> > > >>>>>>>>>> Let's take a look at the reading side. The keyed kafka table > > >>>> contains > > >>>>>>> two > > >>>>>>>>>> kinds of messages: upsert or deletion. What upsert > > >>>>>>>>>> means is "If the key doesn't exist yet, it's an insert record. > > >>>>>>> Otherwise > > >>>>>>>>>> it's an update record". For the sake of correctness or > > >>>>>>>>>> simplicity, the Flink SQL engine also needs such information. > > >> If > > >>> we > > >>>>>>>>>> interpret all messages to "update record", some queries or > > >>>>>>>>>> operators may not work properly. It's weird to see an update > > >>> record > > >>>>> but > > >>>>>>>>> you > > >>>>>>>>>> haven't seen the insert record before. > > >>>>>>>>>> > > >>>>>>>>>> So what Flink should do is after reading out the records from > > >>> such > > >>>>>>> table, > > >>>>>>>>>> it needs to create a state to record which messages have > > >>>>>>>>>> been seen and then generate the correct row type > > >> correspondingly. > > >>>>> This > > >>>>>>>>> kind > > >>>>>>>>>> of couples the state and the data of the message > > >>>>>>>>>> queue, and it also creates conflicts with current kafka > > >>> connector. > > >>>>>>>>>> > > >>>>>>>>>> Think about if users suspend a running job (which contains > some > > >>>>> reading > > >>>>>>>>>> state now), and then change the start offset of the reader. > > >>>>>>>>>> By changing the reading offset, it actually change the whole > > >>> story > > >>>> of > > >>>>>>>>>> "which records should be insert messages and which records > > >>>>>>>>>> should be update messages). And it will also make Flink to > deal > > >>>> with > > >>>>>>>>>> another weird situation that it might receive a deletion > > >>>>>>>>>> on a non existing message. > > >>>>>>>>>> > > >>>>>>>>>> We were unsatisfied with all the frictions and conflicts it > > >> will > > >>>>> create > > >>>>>>>>> if > > >>>>>>>>>> we enable the "upsert & deletion" support to the current kafka > > >>>>>>>>>> connector. And later we begin to realize that we shouldn't > > >> treat > > >>> it > > >>>>> as > > >>>>>>> a > > >>>>>>>>>> normal message queue, but should treat it as a changing keyed > > >>>>>>>>>> table. We should be able to always get the whole data of such > > >>> table > > >>>>> (by > > >>>>>>>>>> disabling the start offset option) and we can also read the > > >>>>>>>>>> changelog out of such table. It's like a HBase table with > > >> binlog > > >>>>>>> support > > >>>>>>>>>> but doesn't have random access capability (which can be > > >> fulfilled > > >>>>>>>>>> by Flink's state). > > >>>>>>>>>> > > >>>>>>>>>> So our intention was instead of telling and persuading users > > >> what > > >>>>> kind > > >>>>>>> of > > >>>>>>>>>> options they should or should not use by extending > > >>>>>>>>>> current kafka connector when enable upsert support, we are > > >>> actually > > >>>>>>>>> create > > >>>>>>>>>> a whole new and different connector that has total > > >>>>>>>>>> different abstractions in SQL layer, and should be treated > > >>> totally > > >>>>>>>>>> different with current kafka connector. > > >>>>>>>>>> > > >>>>>>>>>> Hope this can clarify some of the concerns. > > >>>>>>>>>> > > >>>>>>>>>> Best, > > >>>>>>>>>> Kurt > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang < > > >> fskm...@gmail.com > > >>>> > > >>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> Hi devs, > > >>>>>>>>>>> > > >>>>>>>>>>> As many people are still confused about the difference option > > >>>>>>>>> behaviours > > >>>>>>>>>>> between the Kafka connector and KTable connector, Jark and I > > >>> list > > >>>>> the > > >>>>>>>>>>> differences in the doc[1]. > > >>>>>>>>>>> > > >>>>>>>>>>> Best, > > >>>>>>>>>>> Shengkai > > >>>>>>>>>>> > > >>>>>>>>>>> [1] > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit > > >>>>>>>>>>> > > >>>>>>>>>>> Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二 下午12:05写道: > > >>>>>>>>>>> > > >>>>>>>>>>>> Hi Konstantin, > > >>>>>>>>>>>> > > >>>>>>>>>>>> Thanks for your reply. > > >>>>>>>>>>>> > > >>>>>>>>>>>>> It uses the "kafka" connector and does not specify a > primary > > >>>> key. > > >>>>>>>>>>>> The dimensional table `users` is a ktable connector and we > > >> can > > >>>>>>>>> specify > > >>>>>>>>>>> the > > >>>>>>>>>>>> pk on the KTable. > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Will it possible to use a "ktable" as a dimensional table > in > > >>>>>>>>> FLIP-132 > > >>>>>>>>>>>> Yes. We can specify the watermark on the KTable and it can > be > > >>>> used > > >>>>>>>>> as a > > >>>>>>>>>>>> dimension table in temporal join. > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Introduce a new connector vs introduce a new property > > >>>>>>>>>>>> The main reason behind is that the KTable connector almost > > >> has > > >>> no > > >>>>>>>>>> common > > >>>>>>>>>>>> options with the Kafka connector. The options that can be > > >>> reused > > >>>> by > > >>>>>>>>>>> KTable > > >>>>>>>>>>>> connectors are 'topic', 'properties.bootstrap.servers' and > > >>>>>>>>>>>> 'value.fields-include' . We can't set cdc format for > > >>> 'key.format' > > >>>>> and > > >>>>>>>>>>>> 'value.format' in KTable connector now, which is available > > >> in > > >>>>> Kafka > > >>>>>>>>>>>> connector. Considering the difference between the options we > > >>> can > > >>>>> use, > > >>>>>>>>>>> it's > > >>>>>>>>>>>> more suitable to introduce an another connector rather than > a > > >>>>>>>>> property. > > >>>>>>>>>>>> > > >>>>>>>>>>>> We are also fine to use "compacted-kafka" as the name of the > > >>> new > > >>>>>>>>>>>> connector. What do you think? > > >>>>>>>>>>>> > > >>>>>>>>>>>> Best, > > >>>>>>>>>>>> Shengkai > > >>>>>>>>>>>> > > >>>>>>>>>>>> Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一 > > >> 下午10:15写道: > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Hi Shengkai, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Thank you for driving this effort. I believe this a very > > >>>> important > > >>>>>>>>>>> feature > > >>>>>>>>>>>>> for many users who use Kafka and Flink SQL together. A few > > >>>>> questions > > >>>>>>>>>> and > > >>>>>>>>>>>>> thoughts: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> * Is your example "Use KTable as a reference/dimension > > >> table" > > >>>>>>>>> correct? > > >>>>>>>>>>> It > > >>>>>>>>>>>>> uses the "kafka" connector and does not specify a primary > > >> key. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> * Will it be possible to use a "ktable" table directly as a > > >>>>>>>>>> dimensional > > >>>>>>>>>>>>> table in temporal join (*based on event time*) (FLIP-132)? > > >>> This > > >>>> is > > >>>>>>>>> not > > >>>>>>>>>>>>> completely clear to me from the FLIP. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> * I'd personally prefer not to introduce a new connector > and > > >>>>> instead > > >>>>>>>>>> to > > >>>>>>>>>>>>> extend the Kafka connector. We could add an additional > > >>> property > > >>>>>>>>>>>>> "compacted" > > >>>>>>>>>>>>> = "true"|"false". If it is set to "true", we can add > > >>> additional > > >>>>>>>>>>> validation > > >>>>>>>>>>>>> logic (e.g. "scan.startup.mode" can not be set, primary key > > >>>>>>>>> required, > > >>>>>>>>>>>>> etc.). If we stick to a separate connector I'd not call it > > >>>>> "ktable", > > >>>>>>>>>> but > > >>>>>>>>>>>>> rather "compacted-kafka" or similar. KTable seems to carry > > >>> more > > >>>>>>>>>> implicit > > >>>>>>>>>>>>> meaning than we want to imply here. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> * I agree that this is not a bounded source. If we want to > > >>>>> support a > > >>>>>>>>>>>>> bounded mode, this is an orthogonal concern that also > > >> applies > > >>> to > > >>>>>>>>> other > > >>>>>>>>>>>>> unbounded sources. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Best, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Konstantin > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <imj...@gmail.com> > > >>>> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> Hi Danny, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> First of all, we didn't introduce any concepts from KSQL > > >>> (e.g. > > >>>>>>>>>> Stream > > >>>>>>>>>>> vs > > >>>>>>>>>>>>>> Table notion). > > >>>>>>>>>>>>>> This new connector will produce a changelog stream, so > it's > > >>>> still > > >>>>>>>>> a > > >>>>>>>>>>>>> dynamic > > >>>>>>>>>>>>>> table and doesn't conflict with Flink core concepts. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> The "ktable" is just a connector name, we can also call it > > >>>>>>>>>>>>>> "compacted-kafka" or something else. > > >>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can migrate > > >> to > > >>>>>>>>> Flink > > >>>>>>>>>>> SQL > > >>>>>>>>>>>>>> easily. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Regarding to why introducing a new connector vs a new > > >>> property > > >>>> in > > >>>>>>>>>>>>> existing > > >>>>>>>>>>>>>> kafka connector: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> I think the main reason is that we want to have a clear > > >>>>> separation > > >>>>>>>>>> for > > >>>>>>>>>>>>> such > > >>>>>>>>>>>>>> two use cases, because they are very different. > > >>>>>>>>>>>>>> We also listed reasons in the FLIP, including: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 1) It's hard to explain what's the behavior when users > > >>> specify > > >>>>> the > > >>>>>>>>>>> start > > >>>>>>>>>>>>>> offset from a middle position (e.g. how to process non > > >> exist > > >>>>>>>>> delete > > >>>>>>>>>>>>>> events). > > >>>>>>>>>>>>>> It's dangerous if users do that. So we don't > provide > > >>> the > > >>>>>>>>> offset > > >>>>>>>>>>>>> option > > >>>>>>>>>>>>>> in the new connector at the moment. > > >>>>>>>>>>>>>> 2) It's a different perspective/abstraction on the same > > >> kafka > > >>>>>>>>> topic > > >>>>>>>>>>>>> (append > > >>>>>>>>>>>>>> vs. upsert). It would be easier to understand if we can > > >>>> separate > > >>>>>>>>>> them > > >>>>>>>>>>>>>> instead of mixing them in one connector. The new > > >>>> connector > > >>>>>>>>>>> requires > > >>>>>>>>>>>>>> hash sink partitioner, primary key declared, regular > > >> format. > > >>>>>>>>>>>>>> If we mix them in one connector, it might be > > >> confusing > > >>>> how > > >>>>> to > > >>>>>>>>>> use > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>> options correctly. > > >>>>>>>>>>>>>> 3) The semantic of the KTable connector is just the same > as > > >>>>> KTable > > >>>>>>>>>> in > > >>>>>>>>>>>>> Kafka > > >>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and KSQL > users. > > >>>>>>>>>>>>>> We have seen several questions in the mailing list > > >>> asking > > >>>>> how > > >>>>>>>>> to > > >>>>>>>>>>>>> model > > >>>>>>>>>>>>>> a KTable and how to join a KTable in Flink SQL. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>> Jark > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 19:53, Jark Wu <imj...@gmail.com> > > >>>> wrote: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Hi Jingsong, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> As the FLIP describes, "KTable connector produces a > > >>> changelog > > >>>>>>>>>>> stream, > > >>>>>>>>>>>>>>> where each data record represents an update or delete > > >>> event.". > > >>>>>>>>>>>>>>> Therefore, a ktable source is an unbounded stream source. > > >>>>>>>>>> Selecting > > >>>>>>>>>>> a > > >>>>>>>>>>>>>>> ktable source is similar to selecting a kafka source with > > >>>>>>>>>>>>> debezium-json > > >>>>>>>>>>>>>>> format > > >>>>>>>>>>>>>>> that it never ends and the results are continuously > > >> updated. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> It's possible to have a bounded ktable source in the > > >> future, > > >>>> for > > >>>>>>>>>>>>> example, > > >>>>>>>>>>>>>>> add an option 'bounded=true' or 'end-offset=xxx'. > > >>>>>>>>>>>>>>> In this way, the ktable will produce a bounded changelog > > >>>> stream. > > >>>>>>>>>>>>>>> So I think this can be a compatible feature in the > future. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I don't think we should associate with ksql related > > >>> concepts. > > >>>>>>>>>>>>> Actually, > > >>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>> didn't introduce any concepts from KSQL (e.g. Stream vs > > >>> Table > > >>>>>>>>>>> notion). > > >>>>>>>>>>>>>>> The "ktable" is just a connector name, we can also call > it > > >>>>>>>>>>>>>>> "compacted-kafka" or something else. > > >>>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can > migrate > > >>> to > > >>>>>>>>>> Flink > > >>>>>>>>>>>>> SQL > > >>>>>>>>>>>>>>> easily. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Regarding the "value.fields-include", this is an option > > >>>>>>>>> introduced > > >>>>>>>>>>> in > > >>>>>>>>>>>>>>> FLIP-107 for Kafka connector. > > >>>>>>>>>>>>>>> I think we should keep the same behavior with the Kafka > > >>>>>>>>> connector. > > >>>>>>>>>>> I'm > > >>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>> sure what's the default behavior of KSQL. > > >>>>>>>>>>>>>>> But I guess it also stores the keys in value from this > > >>> example > > >>>>>>>>>> docs > > >>>>>>>>>>>>> (see > > >>>>>>>>>>>>>>> the "users_original" table) [1]. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>> Jark > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> [1]: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 18:17, Danny Chan < > > >>>> yuzhao....@gmail.com> > > >>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> The concept seems conflicts with the Flink abstraction > > >>>> “dynamic > > >>>>>>>>>>>>> table”, > > >>>>>>>>>>>>>>>> in Flink we see both “stream” and “table” as a dynamic > > >>> table, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I think we should make clear first how to express stream > > >>> and > > >>>>>>>>>> table > > >>>>>>>>>>>>>>>> specific features on one “dynamic table”, > > >>>>>>>>>>>>>>>> it is more natural for KSQL because KSQL takes stream > and > > >>>> table > > >>>>>>>>>> as > > >>>>>>>>>>>>>>>> different abstractions for representing collections. In > > >>> KSQL, > > >>>>>>>>>> only > > >>>>>>>>>>>>>> table is > > >>>>>>>>>>>>>>>> mutable and can have a primary key. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Does this connector belongs to the “table” scope or > > >>> “stream” > > >>>>>>>>>> scope > > >>>>>>>>>>> ? > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Some of the concepts (such as the primary key on stream) > > >>>> should > > >>>>>>>>>> be > > >>>>>>>>>>>>>>>> suitable for all the connectors, not just Kafka, > > >> Shouldn’t > > >>>> this > > >>>>>>>>>> be > > >>>>>>>>>>> an > > >>>>>>>>>>>>>>>> extension of existing Kafka connector instead of a > > >> totally > > >>>> new > > >>>>>>>>>>>>>> connector ? > > >>>>>>>>>>>>>>>> What about the other connectors ? > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Because this touches the core abstraction of Flink, we > > >>> better > > >>>>>>>>>> have > > >>>>>>>>>>> a > > >>>>>>>>>>>>>>>> top-down overall design, following the KSQL directly is > > >> not > > >>>> the > > >>>>>>>>>>>>> answer. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> P.S. For the source > > >>>>>>>>>>>>>>>>> Shouldn’t this be an extension of existing Kafka > > >> connector > > >>>>>>>>>>> instead > > >>>>>>>>>>>>> of > > >>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>> totally new connector ? > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> How could we achieve that (e.g. set up the parallelism > > >>>>>>>>>> correctly) ? > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>> Danny Chan > > >>>>>>>>>>>>>>>> 在 2020年10月19日 +0800 PM5:17,Jingsong Li < > > >>>> jingsongl...@gmail.com > > >>>>>>>>>>>> ,写道: > > >>>>>>>>>>>>>>>>> Thanks Shengkai for your proposal. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> +1 for this feature. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Future Work: Support bounded KTable source > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> I don't think it should be a future work, I think it is > > >>> one > > >>>>>>>>> of > > >>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> important concepts of this FLIP. We need to understand > > >> it > > >>>>>>>>> now. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Intuitively, a ktable in my opinion is a bounded table > > >>>> rather > > >>>>>>>>>>> than > > >>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>> stream, so select should produce a bounded table by > > >>> default. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> I think we can list Kafka related knowledge, because > the > > >>>> word > > >>>>>>>>>>>>> `ktable` > > >>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>> easy to associate with ksql related concepts. (If > > >>> possible, > > >>>>>>>>>> it's > > >>>>>>>>>>>>>> better > > >>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> unify with it) > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> What do you think? > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> value.fields-include > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> What about the default behavior of KSQL? > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>> Jingsong > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang < > > >>>>>>>>>> fskm...@gmail.com > > >>>>>>>>>>>> > > >>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Hi, devs. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Jark and I want to start a new FLIP to introduce the > > >>> KTable > > >>>>>>>>>>>>>>>> connector. The > > >>>>>>>>>>>>>>>>>> KTable is a shortcut of "Kafka Table", it also has the > > >>> same > > >>>>>>>>>>>>>> semantics > > >>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>> the KTable notion in Kafka Stream. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> FLIP-149: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Currently many users have expressed their needs for > the > > >>>>>>>>>> upsert > > >>>>>>>>>>>>> Kafka > > >>>>>>>>>>>>>>>> by > > >>>>>>>>>>>>>>>>>> mail lists and issues. The KTable connector has > several > > >>>>>>>>>>> benefits > > >>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>> users: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> 1. Users are able to interpret a compacted Kafka Topic > > >> as > > >>>>>>>>> an > > >>>>>>>>>>>>> upsert > > >>>>>>>>>>>>>>>> stream > > >>>>>>>>>>>>>>>>>> in Apache Flink. And also be able to write a changelog > > >>>>>>>>> stream > > >>>>>>>>>>> to > > >>>>>>>>>>>>>> Kafka > > >>>>>>>>>>>>>>>>>> (into a compacted topic). > > >>>>>>>>>>>>>>>>>> 2. As a part of the real time pipeline, store join or > > >>>>>>>>>> aggregate > > >>>>>>>>>>>>>>>> result (may > > >>>>>>>>>>>>>>>>>> contain updates) into a Kafka topic for further > > >>>>>>>>> calculation; > > >>>>>>>>>>>>>>>>>> 3. The semantic of the KTable connector is just the > > >> same > > >>> as > > >>>>>>>>>>>>> KTable > > >>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>> Kafka > > >>>>>>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and KSQL > > >>> users. > > >>>>>>>>>> We > > >>>>>>>>>>>>> have > > >>>>>>>>>>>>>>>> seen > > >>>>>>>>>>>>>>>>>> several questions in the mailing list asking how to > > >>> model a > > >>>>>>>>>>>>> KTable > > >>>>>>>>>>>>>>>> and how > > >>>>>>>>>>>>>>>>>> to join a KTable in Flink SQL. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> We hope it can expand the usage of the Flink with > > >> Kafka. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> I'm looking forward to your feedback. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>> Shengkai > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>>> Best, Jingsong Lee > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> -- > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Konstantin Knauf > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> https://twitter.com/snntrable > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> https://github.com/knaufk > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> -- > > >>>>>>>>> > > >>>>>>>>> Konstantin Knauf > > >>>>>>>>> > > >>>>>>>>> https://twitter.com/snntrable > > >>>>>>>>> > > >>>>>>>>> https://github.com/knaufk > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>>> > > >>>> > > >>>> -- > > >>>> > > >>>> Seth Wiesman | Solutions Architect > > >>>> > > >>>> +1 314 387 1463 > > >>>> > > >>>> <https://www.ververica.com/> > > >>>> > > >>>> Follow us @VervericaData > > >>>> > > >>>> -- > > >>>> > > >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink > > >>>> Conference > > >>>> > > >>>> Stream Processing | Event Driven | Real Time > > >>>> > > >>> > > >> > > > > > > > > > -- > Best, Jingsong Lee >