Thanks for explanation, I am OK for `upsert`. Yes, Its concept has been accepted by many systems.
Best, Jingsong On Fri, Oct 23, 2020 at 12:38 PM Jark Wu <imj...@gmail.com> wrote: > Hi Timo, > > I have some concerns about `kafka-cdc`, > 1) cdc is an abbreviation of Change Data Capture which is commonly used for > databases, not for message queues. > 2) usually, cdc produces full content of changelog, including > UPDATE_BEFORE, however "upsert kafka" doesn't > 3) `kafka-cdc` sounds like a natively support for `debezium-json` format, > however, it is not and even we don't want > "upsert kafka" to support "debezium-json" > > > Hi Jingsong, > > I think the terminology of "upsert" is fine, because Kafka also uses > "upsert" to define such behavior in their official documentation [1]: > > > a data record in a changelog stream is interpreted as an UPSERT aka > INSERT/UPDATE > > Materialize uses the "UPSERT" keyword to define such behavior too [2]. > Users have been requesting such feature using "upsert kafka" terminology in > user mailing lists [3][4]. > Many other systems support "UPSERT" statement natively, such as impala [5], > SAP [6], Phoenix [7], Oracle NoSQL [8], etc.. > > Therefore, I think we don't need to be afraid of introducing "upsert" > terminology, it is widely accepted by users. > > Best, > Jark > > > [1]: > > https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable > [2]: > > https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic > [3]: > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-materialized-upsert-tables-td18482.html#a18503 > [4]: > > http://apache-flink.147419.n8.nabble.com/Kafka-Sink-AppendStreamTableSink-doesn-t-support-consuming-update-changes-td5959.html > [5]: https://impala.apache.org/docs/build/html/topics/impala_upsert.html > [6]: > > https://help.sap.com/viewer/7c78579ce9b14a669c1f3295b0d8ca16/Cloud/en-US/ea8b6773be584203bcd99da76844c5ed.html > [7]: https://phoenix.apache.org/atomic_upsert.html > [8]: > > https://docs.oracle.com/en/database/other-databases/nosql-database/18.3/sqlfornosql/adding-table-rows-using-insert-and-upsert-statements.html > > On Fri, 23 Oct 2020 at 10:36, Jingsong Li <jingsongl...@gmail.com> wrote: > > > The `kafka-cdc` looks good to me. > > We can even give options to indicate whether to turn on compact, because > > compact is just an optimization? > > > > - ktable let me think about KSQL. > > - kafka-compacted it is not just compacted, more than that, it still has > > the ability of CDC > > - upsert-kafka , upsert is back, and I don't really want to see it again > > since we have CDC > > > > Best, > > Jingsong > > > > On Fri, Oct 23, 2020 at 2:21 AM Timo Walther <twal...@apache.org> wrote: > > > > > Hi Jark, > > > > > > I would be fine with `connector=upsert-kafka`. Another idea would be to > > > align the name to other available Flink connectors [1]: > > > > > > `connector=kafka-cdc`. > > > > > > Regards, > > > Timo > > > > > > [1] https://github.com/ververica/flink-cdc-connectors > > > > > > On 22.10.20 17:17, Jark Wu wrote: > > > > Another name is "connector=upsert-kafka', I think this can solve > Timo's > > > > concern on the "compacted" word. > > > > > > > > Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify such > > > kafka > > > > sources. > > > > I think "upsert" is a well-known terminology widely used in many > > systems > > > > and matches the > > > > behavior of how we handle the kafka messages. > > > > > > > > What do you think? > > > > > > > > Best, > > > > Jark > > > > > > > > [1]: > > > > > > > > > > https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic > > > > > > > > > > > > > > > > > > > > On Thu, 22 Oct 2020 at 22:53, Kurt Young <ykt...@gmail.com> wrote: > > > > > > > >> Good validation messages can't solve the broken user experience, > > > especially > > > >> that > > > >> such update mode option will implicitly make half of current kafka > > > options > > > >> invalid or doesn't > > > >> make sense. > > > >> > > > >> Best, > > > >> Kurt > > > >> > > > >> > > > >> On Thu, Oct 22, 2020 at 10:31 PM Jark Wu <imj...@gmail.com> wrote: > > > >> > > > >>> Hi Timo, Seth, > > > >>> > > > >>> The default value "inserting" of "mode" might be not suitable, > > > >>> because "debezium-json" emits changelog messages which include > > updates. > > > >>> > > > >>> On Thu, 22 Oct 2020 at 22:10, Seth Wiesman <s...@ververica.com> > > wrote: > > > >>> > > > >>>> +1 for supporting upsert results into Kafka. > > > >>>> > > > >>>> I have no comments on the implementation details. > > > >>>> > > > >>>> As far as configuration goes, I tend to favor Timo's option where > we > > > >> add > > > >>> a > > > >>>> "mode" property to the existing Kafka table with default value > > > >>> "inserting". > > > >>>> If the mode is set to "updating" then the validation changes to > the > > > new > > > >>>> requirements. I personally find it more intuitive than a seperate > > > >>>> connector, my fear is users won't understand its the same physical > > > >> kafka > > > >>>> sink under the hood and it will lead to other confusion like does > it > > > >>> offer > > > >>>> the same persistence guarantees? I think we are capable of adding > > good > > > >>>> valdiation messaging that solves Jark and Kurts concerns. > > > >>>> > > > >>>> > > > >>>> On Thu, Oct 22, 2020 at 8:51 AM Timo Walther <twal...@apache.org> > > > >> wrote: > > > >>>> > > > >>>>> Hi Jark, > > > >>>>> > > > >>>>> "calling it "kafka-compacted" can even remind users to enable log > > > >>>>> compaction" > > > >>>>> > > > >>>>> But sometimes users like to store a lineage of changes in their > > > >> topics. > > > >>>>> Indepent of any ktable/kstream interpretation. > > > >>>>> > > > >>>>> I let the majority decide on this topic to not further block this > > > >>>>> effort. But we might find a better name like: > > > >>>>> > > > >>>>> connector = kafka > > > >>>>> mode = updating/inserting > > > >>>>> > > > >>>>> OR > > > >>>>> > > > >>>>> connector = kafka-updating > > > >>>>> > > > >>>>> ... > > > >>>>> > > > >>>>> Regards, > > > >>>>> Timo > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> On 22.10.20 15:24, Jark Wu wrote: > > > >>>>>> Hi Timo, > > > >>>>>> > > > >>>>>> Thanks for your opinions. > > > >>>>>> > > > >>>>>> 1) Implementation > > > >>>>>> We will have an stateful operator to generate INSERT and > > > >>> UPDATE_BEFORE. > > > >>>>>> This operator is keyby-ed (primary key as the shuffle key) after > > > >> the > > > >>>>> source > > > >>>>>> operator. > > > >>>>>> The implementation of this operator is very similar to the > > existing > > > >>>>>> `DeduplicateKeepLastRowFunction`. > > > >>>>>> The operator will register a value state using the primary key > > > >> fields > > > >>>> as > > > >>>>>> keys. > > > >>>>>> When the value state is empty under current key, we will emit > > > >> INSERT > > > >>>> for > > > >>>>>> the input row. > > > >>>>>> When the value state is not empty under current key, we will > emit > > > >>>>>> UPDATE_BEFORE using the row in state, > > > >>>>>> and emit UPDATE_AFTER using the input row. > > > >>>>>> When the input row is DELETE, we will clear state and emit > DELETE > > > >>> row. > > > >>>>>> > > > >>>>>> 2) new option vs new connector > > > >>>>>>> We recently simplified the table options to a minimum amount of > > > >>>>>> characters to be as concise as possible in the DDL. > > > >>>>>> I think this is the reason why we want to introduce a new > > > >> connector, > > > >>>>>> because we can simplify the options in DDL. > > > >>>>>> For example, if using a new option, the DDL may look like this: > > > >>>>>> > > > >>>>>> CREATE TABLE users ( > > > >>>>>> user_id BIGINT, > > > >>>>>> user_name STRING, > > > >>>>>> user_level STRING, > > > >>>>>> region STRING, > > > >>>>>> PRIMARY KEY (user_id) NOT ENFORCED > > > >>>>>> ) WITH ( > > > >>>>>> 'connector' = 'kafka', > > > >>>>>> 'model' = 'table', > > > >>>>>> 'topic' = 'pageviews_per_region', > > > >>>>>> 'properties.bootstrap.servers' = '...', > > > >>>>>> 'properties.group.id' = 'testGroup', > > > >>>>>> 'scan.startup.mode' = 'earliest', > > > >>>>>> 'key.format' = 'csv', > > > >>>>>> 'key.fields' = 'user_id', > > > >>>>>> 'value.format' = 'avro', > > > >>>>>> 'sink.partitioner' = 'hash' > > > >>>>>> ); > > > >>>>>> > > > >>>>>> If using a new connector, we can have a different default value > > for > > > >>> the > > > >>>>>> options and remove unnecessary options, > > > >>>>>> the DDL can look like this which is much more concise: > > > >>>>>> > > > >>>>>> CREATE TABLE pageviews_per_region ( > > > >>>>>> user_id BIGINT, > > > >>>>>> user_name STRING, > > > >>>>>> user_level STRING, > > > >>>>>> region STRING, > > > >>>>>> PRIMARY KEY (user_id) NOT ENFORCED > > > >>>>>> ) WITH ( > > > >>>>>> 'connector' = 'kafka-compacted', > > > >>>>>> 'topic' = 'pageviews_per_region', > > > >>>>>> 'properties.bootstrap.servers' = '...', > > > >>>>>> 'key.format' = 'csv', > > > >>>>>> 'value.format' = 'avro' > > > >>>>>> ); > > > >>>>>> > > > >>>>>>> When people read `connector=kafka-compacted` they might not > know > > > >>> that > > > >>>> it > > > >>>>>>> has ktable semantics. You don't need to enable log compaction > in > > > >>> order > > > >>>>>>> to use a KTable as far as I know. > > > >>>>>> We don't need to let users know it has ktable semantics, as > > > >>> Konstantin > > > >>>>>> mentioned this may carry more implicit > > > >>>>>> meaning than we want to imply here. I agree users don't need to > > > >>> enable > > > >>>>> log > > > >>>>>> compaction, but from the production perspective, > > > >>>>>> log compaction should always be enabled if it is used in this > > > >>> purpose. > > > >>>>>> Calling it "kafka-compacted" can even remind users to enable log > > > >>>>> compaction. > > > >>>>>> > > > >>>>>> I don't agree to introduce "model = table/stream" option, or > > > >>>>>> "connector=kafka-table", > > > >>>>>> because this means we are introducing Table vs Stream concept > from > > > >>>> KSQL. > > > >>>>>> However, we don't have such top-level concept in Flink SQL now, > > > >> this > > > >>>> will > > > >>>>>> further confuse users. > > > >>>>>> In Flink SQL, all the things are STREAM, the differences are > > > >> whether > > > >>> it > > > >>>>> is > > > >>>>>> bounded or unbounded, > > > >>>>>> whether it is insert-only or changelog. > > > >>>>>> > > > >>>>>> > > > >>>>>> Best, > > > >>>>>> Jark > > > >>>>>> > > > >>>>>> > > > >>>>>> On Thu, 22 Oct 2020 at 20:39, Timo Walther <twal...@apache.org> > > > >>> wrote: > > > >>>>>> > > > >>>>>>> Hi Shengkai, Hi Jark, > > > >>>>>>> > > > >>>>>>> thanks for this great proposal. It is time to finally connect > the > > > >>>>>>> changelog processor with a compacted Kafka topic. > > > >>>>>>> > > > >>>>>>> "The operator will produce INSERT rows, or additionally > generate > > > >>>>>>> UPDATE_BEFORE rows for the previous image, or produce DELETE > rows > > > >>> with > > > >>>>>>> all columns filled with values." > > > >>>>>>> > > > >>>>>>> Could you elaborate a bit on the implementation details in the > > > >> FLIP? > > > >>>> How > > > >>>>>>> are UPDATE_BEFOREs are generated. How much state is required to > > > >>>> perform > > > >>>>>>> this operation. > > > >>>>>>> > > > >>>>>>> From a conceptual and semantical point of view, I'm fine > with > > > >> the > > > >>>>>>> proposal. But I would like to share my opinion about how we > > expose > > > >>>> this > > > >>>>>>> feature: > > > >>>>>>> > > > >>>>>>> ktable vs kafka-compacted > > > >>>>>>> > > > >>>>>>> I'm against having an additional connector like `ktable` or > > > >>>>>>> `kafka-compacted`. We recently simplified the table options to > a > > > >>>> minimum > > > >>>>>>> amount of characters to be as concise as possible in the DDL. > > > >>>> Therefore, > > > >>>>>>> I would keep the `connector=kafka` and introduce an additional > > > >>> option. > > > >>>>>>> Because a user wants to read "from Kafka". And the "how" should > > be > > > >>>>>>> determined in the lower options. > > > >>>>>>> > > > >>>>>>> When people read `connector=ktable` they might not know that > this > > > >> is > > > >>>>>>> Kafka. Or they wonder where `kstream` is? > > > >>>>>>> > > > >>>>>>> When people read `connector=kafka-compacted` they might not > know > > > >>> that > > > >>>> it > > > >>>>>>> has ktable semantics. You don't need to enable log compaction > in > > > >>> order > > > >>>>>>> to use a KTable as far as I know. Log compaction and table > > > >> semantics > > > >>>> are > > > >>>>>>> orthogonal topics. > > > >>>>>>> > > > >>>>>>> In the end we will need 3 types of information when declaring a > > > >>> Kafka > > > >>>>>>> connector: > > > >>>>>>> > > > >>>>>>> CREATE TABLE ... WITH ( > > > >>>>>>> connector=kafka -- Some information about the > > connector > > > >>>>>>> end-offset = XXXX -- Some information about the > > > >> boundedness > > > >>>>>>> model = table/stream -- Some information about > > > >> interpretation > > > >>>>>>> ) > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> We can still apply all the constraints mentioned in the FLIP. > > When > > > >>>>>>> `model` is set to `table`. > > > >>>>>>> > > > >>>>>>> What do you think? > > > >>>>>>> > > > >>>>>>> Regards, > > > >>>>>>> Timo > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On 21.10.20 14:19, Jark Wu wrote: > > > >>>>>>>> Hi, > > > >>>>>>>> > > > >>>>>>>> IMO, if we are going to mix them in one connector, > > > >>>>>>>> 1) either users need to set some options to a specific value > > > >>>>> explicitly, > > > >>>>>>>> e.g. "scan.startup.mode=earliest", "sink.partitioner=hash", > > etc.. > > > >>>>>>>> This makes the connector awkward to use. Users may face to fix > > > >>>> options > > > >>>>>>> one > > > >>>>>>>> by one according to the exception. > > > >>>>>>>> Besides, in the future, it is still possible to use > > > >>>>>>>> "sink.partitioner=fixed" (reduce network cost) if users are > > aware > > > >>> of > > > >>>>>>>> the partition routing, > > > >>>>>>>> however, it's error-prone to have "fixed" as default for > > > >> compacted > > > >>>>> mode. > > > >>>>>>>> > > > >>>>>>>> 2) or make those options a different default value when > > > >>>>> "compacted=true". > > > >>>>>>>> This would be more confusing and unpredictable if the default > > > >> value > > > >>>> of > > > >>>>>>>> options will change according to other options. > > > >>>>>>>> What happens if we have a third mode in the future? > > > >>>>>>>> > > > >>>>>>>> In terms of usage and options, it's very different from the > > > >>>>>>>> original "kafka" connector. > > > >>>>>>>> It would be more handy to use and less fallible if separating > > > >> them > > > >>>> into > > > >>>>>>> two > > > >>>>>>>> connectors. > > > >>>>>>>> In the implementation layer, we can reuse code as much as > > > >> possible. > > > >>>>>>>> > > > >>>>>>>> Therefore, I'm still +1 to have a new connector. > > > >>>>>>>> The "kafka-compacted" name sounds good to me. > > > >>>>>>>> > > > >>>>>>>> Best, > > > >>>>>>>> Jark > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf < > > > >> kna...@apache.org> > > > >>>>>>> wrote: > > > >>>>>>>> > > > >>>>>>>>> Hi Kurt, Hi Shengkai, > > > >>>>>>>>> > > > >>>>>>>>> thanks for answering my questions and the additional > > > >>>> clarifications. I > > > >>>>>>>>> don't have a strong opinion on whether to extend the "kafka" > > > >>>> connector > > > >>>>>>> or > > > >>>>>>>>> to introduce a new connector. So, from my perspective feel > free > > > >> to > > > >>>> go > > > >>>>>>> with > > > >>>>>>>>> a separate connector. If we do introduce a new connector I > > > >>> wouldn't > > > >>>>>>> call it > > > >>>>>>>>> "ktable" for aforementioned reasons (In addition, we might > > > >> suggest > > > >>>>> that > > > >>>>>>>>> there is also a "kstreams" connector for symmetry reasons). I > > > >>> don't > > > >>>>>>> have a > > > >>>>>>>>> good alternative name, though, maybe "kafka-compacted" or > > > >>>>>>>>> "compacted-kafka". > > > >>>>>>>>> > > > >>>>>>>>> Thanks, > > > >>>>>>>>> > > > >>>>>>>>> Konstantin > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> On Wed, Oct 21, 2020 at 4:43 AM Kurt Young <ykt...@gmail.com > > > > > >>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>>> Hi all, > > > >>>>>>>>>> > > > >>>>>>>>>> I want to describe the discussion process which drove us to > > > >> have > > > >>>> such > > > >>>>>>>>>> conclusion, this might make some of > > > >>>>>>>>>> the design choices easier to understand and keep everyone on > > > >> the > > > >>>> same > > > >>>>>>>>> page. > > > >>>>>>>>>> > > > >>>>>>>>>> Back to the motivation, what functionality do we want to > > > >> provide > > > >>> in > > > >>>>> the > > > >>>>>>>>>> first place? We got a lot of feedback and > > > >>>>>>>>>> questions from mailing lists that people want to write > > > >>>>> Not-Insert-Only > > > >>>>>>>>>> messages into kafka. They might be > > > >>>>>>>>>> intentional or by accident, e.g. wrote an non-windowed > > > >> aggregate > > > >>>>> query > > > >>>>>>> or > > > >>>>>>>>>> non-windowed left outer join. And > > > >>>>>>>>>> some users from KSQL world also asked about why Flink didn't > > > >>>> leverage > > > >>>>>>> the > > > >>>>>>>>>> Key concept of every kafka topic > > > >>>>>>>>>> and make kafka as a dynamic changing keyed table. > > > >>>>>>>>>> > > > >>>>>>>>>> To work with kafka better, we were thinking to extend the > > > >>>>> functionality > > > >>>>>>>>> of > > > >>>>>>>>>> the current kafka connector by letting it > > > >>>>>>>>>> accept updates and deletions. But due to the limitation of > > > >> kafka, > > > >>>> the > > > >>>>>>>>>> update has to be "update by key", aka a table > > > >>>>>>>>>> with primary key. > > > >>>>>>>>>> > > > >>>>>>>>>> This introduces a couple of conflicts with current kafka > > > >> table's > > > >>>>>>> options: > > > >>>>>>>>>> 1. key.fields: as said above, we need the kafka table to > have > > > >> the > > > >>>>>>> primary > > > >>>>>>>>>> key constraint. And users can also configure > > > >>>>>>>>>> key.fields freely, this might cause friction. (Sure we can > do > > > >>> some > > > >>>>>>> sanity > > > >>>>>>>>>> check on this but it also creates friction.) > > > >>>>>>>>>> 2. sink.partitioner: to make the semantics right, we need to > > > >> make > > > >>>>> sure > > > >>>>>>>>> all > > > >>>>>>>>>> the updates on the same key are written to > > > >>>>>>>>>> the same kafka partition, such we should force to use a hash > > by > > > >>> key > > > >>>>>>>>>> partition inside such table. Again, this has conflicts > > > >>>>>>>>>> and creates friction with current user options. > > > >>>>>>>>>> > > > >>>>>>>>>> The above things are solvable, though not perfect or most > user > > > >>>>>>> friendly. > > > >>>>>>>>>> > > > >>>>>>>>>> Let's take a look at the reading side. The keyed kafka table > > > >>>> contains > > > >>>>>>> two > > > >>>>>>>>>> kinds of messages: upsert or deletion. What upsert > > > >>>>>>>>>> means is "If the key doesn't exist yet, it's an insert > record. > > > >>>>>>> Otherwise > > > >>>>>>>>>> it's an update record". For the sake of correctness or > > > >>>>>>>>>> simplicity, the Flink SQL engine also needs such > information. > > > >> If > > > >>> we > > > >>>>>>>>>> interpret all messages to "update record", some queries or > > > >>>>>>>>>> operators may not work properly. It's weird to see an update > > > >>> record > > > >>>>> but > > > >>>>>>>>> you > > > >>>>>>>>>> haven't seen the insert record before. > > > >>>>>>>>>> > > > >>>>>>>>>> So what Flink should do is after reading out the records > from > > > >>> such > > > >>>>>>> table, > > > >>>>>>>>>> it needs to create a state to record which messages have > > > >>>>>>>>>> been seen and then generate the correct row type > > > >> correspondingly. > > > >>>>> This > > > >>>>>>>>> kind > > > >>>>>>>>>> of couples the state and the data of the message > > > >>>>>>>>>> queue, and it also creates conflicts with current kafka > > > >>> connector. > > > >>>>>>>>>> > > > >>>>>>>>>> Think about if users suspend a running job (which contains > > some > > > >>>>> reading > > > >>>>>>>>>> state now), and then change the start offset of the reader. > > > >>>>>>>>>> By changing the reading offset, it actually change the whole > > > >>> story > > > >>>> of > > > >>>>>>>>>> "which records should be insert messages and which records > > > >>>>>>>>>> should be update messages). And it will also make Flink to > > deal > > > >>>> with > > > >>>>>>>>>> another weird situation that it might receive a deletion > > > >>>>>>>>>> on a non existing message. > > > >>>>>>>>>> > > > >>>>>>>>>> We were unsatisfied with all the frictions and conflicts it > > > >> will > > > >>>>> create > > > >>>>>>>>> if > > > >>>>>>>>>> we enable the "upsert & deletion" support to the current > kafka > > > >>>>>>>>>> connector. And later we begin to realize that we shouldn't > > > >> treat > > > >>> it > > > >>>>> as > > > >>>>>>> a > > > >>>>>>>>>> normal message queue, but should treat it as a changing > keyed > > > >>>>>>>>>> table. We should be able to always get the whole data of > such > > > >>> table > > > >>>>> (by > > > >>>>>>>>>> disabling the start offset option) and we can also read the > > > >>>>>>>>>> changelog out of such table. It's like a HBase table with > > > >> binlog > > > >>>>>>> support > > > >>>>>>>>>> but doesn't have random access capability (which can be > > > >> fulfilled > > > >>>>>>>>>> by Flink's state). > > > >>>>>>>>>> > > > >>>>>>>>>> So our intention was instead of telling and persuading users > > > >> what > > > >>>>> kind > > > >>>>>>> of > > > >>>>>>>>>> options they should or should not use by extending > > > >>>>>>>>>> current kafka connector when enable upsert support, we are > > > >>> actually > > > >>>>>>>>> create > > > >>>>>>>>>> a whole new and different connector that has total > > > >>>>>>>>>> different abstractions in SQL layer, and should be treated > > > >>> totally > > > >>>>>>>>>> different with current kafka connector. > > > >>>>>>>>>> > > > >>>>>>>>>> Hope this can clarify some of the concerns. > > > >>>>>>>>>> > > > >>>>>>>>>> Best, > > > >>>>>>>>>> Kurt > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang < > > > >> fskm...@gmail.com > > > >>>> > > > >>>>>>> wrote: > > > >>>>>>>>>> > > > >>>>>>>>>>> Hi devs, > > > >>>>>>>>>>> > > > >>>>>>>>>>> As many people are still confused about the difference > option > > > >>>>>>>>> behaviours > > > >>>>>>>>>>> between the Kafka connector and KTable connector, Jark and > I > > > >>> list > > > >>>>> the > > > >>>>>>>>>>> differences in the doc[1]. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Best, > > > >>>>>>>>>>> Shengkai > > > >>>>>>>>>>> > > > >>>>>>>>>>> [1] > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>> > > > >>>>> > > > >>>> > > > >>> > > > >> > > > > > > https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit > > > >>>>>>>>>>> > > > >>>>>>>>>>> Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二 > 下午12:05写道: > > > >>>>>>>>>>> > > > >>>>>>>>>>>> Hi Konstantin, > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Thanks for your reply. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> It uses the "kafka" connector and does not specify a > > primary > > > >>>> key. > > > >>>>>>>>>>>> The dimensional table `users` is a ktable connector and we > > > >> can > > > >>>>>>>>> specify > > > >>>>>>>>>>> the > > > >>>>>>>>>>>> pk on the KTable. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> Will it possible to use a "ktable" as a dimensional table > > in > > > >>>>>>>>> FLIP-132 > > > >>>>>>>>>>>> Yes. We can specify the watermark on the KTable and it can > > be > > > >>>> used > > > >>>>>>>>> as a > > > >>>>>>>>>>>> dimension table in temporal join. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> Introduce a new connector vs introduce a new property > > > >>>>>>>>>>>> The main reason behind is that the KTable connector almost > > > >> has > > > >>> no > > > >>>>>>>>>> common > > > >>>>>>>>>>>> options with the Kafka connector. The options that can be > > > >>> reused > > > >>>> by > > > >>>>>>>>>>> KTable > > > >>>>>>>>>>>> connectors are 'topic', 'properties.bootstrap.servers' and > > > >>>>>>>>>>>> 'value.fields-include' . We can't set cdc format for > > > >>> 'key.format' > > > >>>>> and > > > >>>>>>>>>>>> 'value.format' in KTable connector now, which is > available > > > >> in > > > >>>>> Kafka > > > >>>>>>>>>>>> connector. Considering the difference between the options > we > > > >>> can > > > >>>>> use, > > > >>>>>>>>>>> it's > > > >>>>>>>>>>>> more suitable to introduce an another connector rather > than > > a > > > >>>>>>>>> property. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> We are also fine to use "compacted-kafka" as the name of > the > > > >>> new > > > >>>>>>>>>>>> connector. What do you think? > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Best, > > > >>>>>>>>>>>> Shengkai > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一 > > > >> 下午10:15写道: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> Hi Shengkai, > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Thank you for driving this effort. I believe this a very > > > >>>> important > > > >>>>>>>>>>> feature > > > >>>>>>>>>>>>> for many users who use Kafka and Flink SQL together. A > few > > > >>>>> questions > > > >>>>>>>>>> and > > > >>>>>>>>>>>>> thoughts: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> * Is your example "Use KTable as a reference/dimension > > > >> table" > > > >>>>>>>>> correct? > > > >>>>>>>>>>> It > > > >>>>>>>>>>>>> uses the "kafka" connector and does not specify a primary > > > >> key. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> * Will it be possible to use a "ktable" table directly > as a > > > >>>>>>>>>> dimensional > > > >>>>>>>>>>>>> table in temporal join (*based on event time*) > (FLIP-132)? > > > >>> This > > > >>>> is > > > >>>>>>>>> not > > > >>>>>>>>>>>>> completely clear to me from the FLIP. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> * I'd personally prefer not to introduce a new connector > > and > > > >>>>> instead > > > >>>>>>>>>> to > > > >>>>>>>>>>>>> extend the Kafka connector. We could add an additional > > > >>> property > > > >>>>>>>>>>>>> "compacted" > > > >>>>>>>>>>>>> = "true"|"false". If it is set to "true", we can add > > > >>> additional > > > >>>>>>>>>>> validation > > > >>>>>>>>>>>>> logic (e.g. "scan.startup.mode" can not be set, primary > key > > > >>>>>>>>> required, > > > >>>>>>>>>>>>> etc.). If we stick to a separate connector I'd not call > it > > > >>>>> "ktable", > > > >>>>>>>>>> but > > > >>>>>>>>>>>>> rather "compacted-kafka" or similar. KTable seems to > carry > > > >>> more > > > >>>>>>>>>> implicit > > > >>>>>>>>>>>>> meaning than we want to imply here. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> * I agree that this is not a bounded source. If we want > to > > > >>>>> support a > > > >>>>>>>>>>>>> bounded mode, this is an orthogonal concern that also > > > >> applies > > > >>> to > > > >>>>>>>>> other > > > >>>>>>>>>>>>> unbounded sources. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Konstantin > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu < > imj...@gmail.com> > > > >>>> wrote: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Hi Danny, > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> First of all, we didn't introduce any concepts from KSQL > > > >>> (e.g. > > > >>>>>>>>>> Stream > > > >>>>>>>>>>> vs > > > >>>>>>>>>>>>>> Table notion). > > > >>>>>>>>>>>>>> This new connector will produce a changelog stream, so > > it's > > > >>>> still > > > >>>>>>>>> a > > > >>>>>>>>>>>>> dynamic > > > >>>>>>>>>>>>>> table and doesn't conflict with Flink core concepts. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> The "ktable" is just a connector name, we can also call > it > > > >>>>>>>>>>>>>> "compacted-kafka" or something else. > > > >>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can > migrate > > > >> to > > > >>>>>>>>> Flink > > > >>>>>>>>>>> SQL > > > >>>>>>>>>>>>>> easily. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Regarding to why introducing a new connector vs a new > > > >>> property > > > >>>> in > > > >>>>>>>>>>>>> existing > > > >>>>>>>>>>>>>> kafka connector: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> I think the main reason is that we want to have a clear > > > >>>>> separation > > > >>>>>>>>>> for > > > >>>>>>>>>>>>> such > > > >>>>>>>>>>>>>> two use cases, because they are very different. > > > >>>>>>>>>>>>>> We also listed reasons in the FLIP, including: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 1) It's hard to explain what's the behavior when users > > > >>> specify > > > >>>>> the > > > >>>>>>>>>>> start > > > >>>>>>>>>>>>>> offset from a middle position (e.g. how to process non > > > >> exist > > > >>>>>>>>> delete > > > >>>>>>>>>>>>>> events). > > > >>>>>>>>>>>>>> It's dangerous if users do that. So we don't > > provide > > > >>> the > > > >>>>>>>>> offset > > > >>>>>>>>>>>>> option > > > >>>>>>>>>>>>>> in the new connector at the moment. > > > >>>>>>>>>>>>>> 2) It's a different perspective/abstraction on the same > > > >> kafka > > > >>>>>>>>> topic > > > >>>>>>>>>>>>> (append > > > >>>>>>>>>>>>>> vs. upsert). It would be easier to understand if we can > > > >>>> separate > > > >>>>>>>>>> them > > > >>>>>>>>>>>>>> instead of mixing them in one connector. The new > > > >>>> connector > > > >>>>>>>>>>> requires > > > >>>>>>>>>>>>>> hash sink partitioner, primary key declared, regular > > > >> format. > > > >>>>>>>>>>>>>> If we mix them in one connector, it might be > > > >> confusing > > > >>>> how > > > >>>>> to > > > >>>>>>>>>> use > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>>>>> options correctly. > > > >>>>>>>>>>>>>> 3) The semantic of the KTable connector is just the same > > as > > > >>>>> KTable > > > >>>>>>>>>> in > > > >>>>>>>>>>>>> Kafka > > > >>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and KSQL > > users. > > > >>>>>>>>>>>>>> We have seen several questions in the mailing > list > > > >>> asking > > > >>>>> how > > > >>>>>>>>> to > > > >>>>>>>>>>>>> model > > > >>>>>>>>>>>>>> a KTable and how to join a KTable in Flink SQL. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>> Jark > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 19:53, Jark Wu <imj...@gmail.com > > > > > >>>> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Hi Jingsong, > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> As the FLIP describes, "KTable connector produces a > > > >>> changelog > > > >>>>>>>>>>> stream, > > > >>>>>>>>>>>>>>> where each data record represents an update or delete > > > >>> event.". > > > >>>>>>>>>>>>>>> Therefore, a ktable source is an unbounded stream > source. > > > >>>>>>>>>> Selecting > > > >>>>>>>>>>> a > > > >>>>>>>>>>>>>>> ktable source is similar to selecting a kafka source > with > > > >>>>>>>>>>>>> debezium-json > > > >>>>>>>>>>>>>>> format > > > >>>>>>>>>>>>>>> that it never ends and the results are continuously > > > >> updated. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> It's possible to have a bounded ktable source in the > > > >> future, > > > >>>> for > > > >>>>>>>>>>>>> example, > > > >>>>>>>>>>>>>>> add an option 'bounded=true' or 'end-offset=xxx'. > > > >>>>>>>>>>>>>>> In this way, the ktable will produce a bounded > changelog > > > >>>> stream. > > > >>>>>>>>>>>>>>> So I think this can be a compatible feature in the > > future. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> I don't think we should associate with ksql related > > > >>> concepts. > > > >>>>>>>>>>>>> Actually, > > > >>>>>>>>>>>>>> we > > > >>>>>>>>>>>>>>> didn't introduce any concepts from KSQL (e.g. Stream vs > > > >>> Table > > > >>>>>>>>>>> notion). > > > >>>>>>>>>>>>>>> The "ktable" is just a connector name, we can also call > > it > > > >>>>>>>>>>>>>>> "compacted-kafka" or something else. > > > >>>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can > > migrate > > > >>> to > > > >>>>>>>>>> Flink > > > >>>>>>>>>>>>> SQL > > > >>>>>>>>>>>>>>> easily. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Regarding the "value.fields-include", this is an option > > > >>>>>>>>> introduced > > > >>>>>>>>>>> in > > > >>>>>>>>>>>>>>> FLIP-107 for Kafka connector. > > > >>>>>>>>>>>>>>> I think we should keep the same behavior with the Kafka > > > >>>>>>>>> connector. > > > >>>>>>>>>>> I'm > > > >>>>>>>>>>>>>> not > > > >>>>>>>>>>>>>>> sure what's the default behavior of KSQL. > > > >>>>>>>>>>>>>>> But I guess it also stores the keys in value from this > > > >>> example > > > >>>>>>>>>> docs > > > >>>>>>>>>>>>> (see > > > >>>>>>>>>>>>>>> the "users_original" table) [1]. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>> Jark > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> [1]: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>> > > > >>>>> > > > >>>> > > > >>> > > > >> > > > > > > https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 18:17, Danny Chan < > > > >>>> yuzhao....@gmail.com> > > > >>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> The concept seems conflicts with the Flink abstraction > > > >>>> “dynamic > > > >>>>>>>>>>>>> table”, > > > >>>>>>>>>>>>>>>> in Flink we see both “stream” and “table” as a dynamic > > > >>> table, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> I think we should make clear first how to express > stream > > > >>> and > > > >>>>>>>>>> table > > > >>>>>>>>>>>>>>>> specific features on one “dynamic table”, > > > >>>>>>>>>>>>>>>> it is more natural for KSQL because KSQL takes stream > > and > > > >>>> table > > > >>>>>>>>>> as > > > >>>>>>>>>>>>>>>> different abstractions for representing collections. > In > > > >>> KSQL, > > > >>>>>>>>>> only > > > >>>>>>>>>>>>>> table is > > > >>>>>>>>>>>>>>>> mutable and can have a primary key. > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Does this connector belongs to the “table” scope or > > > >>> “stream” > > > >>>>>>>>>> scope > > > >>>>>>>>>>> ? > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Some of the concepts (such as the primary key on > stream) > > > >>>> should > > > >>>>>>>>>> be > > > >>>>>>>>>>>>>>>> suitable for all the connectors, not just Kafka, > > > >> Shouldn’t > > > >>>> this > > > >>>>>>>>>> be > > > >>>>>>>>>>> an > > > >>>>>>>>>>>>>>>> extension of existing Kafka connector instead of a > > > >> totally > > > >>>> new > > > >>>>>>>>>>>>>> connector ? > > > >>>>>>>>>>>>>>>> What about the other connectors ? > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Because this touches the core abstraction of Flink, we > > > >>> better > > > >>>>>>>>>> have > > > >>>>>>>>>>> a > > > >>>>>>>>>>>>>>>> top-down overall design, following the KSQL directly > is > > > >> not > > > >>>> the > > > >>>>>>>>>>>>> answer. > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> P.S. For the source > > > >>>>>>>>>>>>>>>>> Shouldn’t this be an extension of existing Kafka > > > >> connector > > > >>>>>>>>>>> instead > > > >>>>>>>>>>>>> of > > > >>>>>>>>>>>>>> a > > > >>>>>>>>>>>>>>>> totally new connector ? > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> How could we achieve that (e.g. set up the parallelism > > > >>>>>>>>>> correctly) ? > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>> Danny Chan > > > >>>>>>>>>>>>>>>> 在 2020年10月19日 +0800 PM5:17,Jingsong Li < > > > >>>> jingsongl...@gmail.com > > > >>>>>>>>>>>> ,写道: > > > >>>>>>>>>>>>>>>>> Thanks Shengkai for your proposal. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> +1 for this feature. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Future Work: Support bounded KTable source > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> I don't think it should be a future work, I think it > is > > > >>> one > > > >>>>>>>>> of > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>> important concepts of this FLIP. We need to > understand > > > >> it > > > >>>>>>>>> now. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Intuitively, a ktable in my opinion is a bounded > table > > > >>>> rather > > > >>>>>>>>>>> than > > > >>>>>>>>>>>>> a > > > >>>>>>>>>>>>>>>>> stream, so select should produce a bounded table by > > > >>> default. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> I think we can list Kafka related knowledge, because > > the > > > >>>> word > > > >>>>>>>>>>>>> `ktable` > > > >>>>>>>>>>>>>>>> is > > > >>>>>>>>>>>>>>>>> easy to associate with ksql related concepts. (If > > > >>> possible, > > > >>>>>>>>>> it's > > > >>>>>>>>>>>>>> better > > > >>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>> unify with it) > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> What do you think? > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> value.fields-include > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> What about the default behavior of KSQL? > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>>> Jingsong > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang < > > > >>>>>>>>>> fskm...@gmail.com > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Hi, devs. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Jark and I want to start a new FLIP to introduce the > > > >>> KTable > > > >>>>>>>>>>>>>>>> connector. The > > > >>>>>>>>>>>>>>>>>> KTable is a shortcut of "Kafka Table", it also has > the > > > >>> same > > > >>>>>>>>>>>>>> semantics > > > >>>>>>>>>>>>>>>> with > > > >>>>>>>>>>>>>>>>>> the KTable notion in Kafka Stream. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> FLIP-149: > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>> > > > >>>>> > > > >>>> > > > >>> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Currently many users have expressed their needs for > > the > > > >>>>>>>>>> upsert > > > >>>>>>>>>>>>> Kafka > > > >>>>>>>>>>>>>>>> by > > > >>>>>>>>>>>>>>>>>> mail lists and issues. The KTable connector has > > several > > > >>>>>>>>>>> benefits > > > >>>>>>>>>>>>> for > > > >>>>>>>>>>>>>>>> users: > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> 1. Users are able to interpret a compacted Kafka > Topic > > > >> as > > > >>>>>>>>> an > > > >>>>>>>>>>>>> upsert > > > >>>>>>>>>>>>>>>> stream > > > >>>>>>>>>>>>>>>>>> in Apache Flink. And also be able to write a > changelog > > > >>>>>>>>> stream > > > >>>>>>>>>>> to > > > >>>>>>>>>>>>>> Kafka > > > >>>>>>>>>>>>>>>>>> (into a compacted topic). > > > >>>>>>>>>>>>>>>>>> 2. As a part of the real time pipeline, store join > or > > > >>>>>>>>>> aggregate > > > >>>>>>>>>>>>>>>> result (may > > > >>>>>>>>>>>>>>>>>> contain updates) into a Kafka topic for further > > > >>>>>>>>> calculation; > > > >>>>>>>>>>>>>>>>>> 3. The semantic of the KTable connector is just the > > > >> same > > > >>> as > > > >>>>>>>>>>>>> KTable > > > >>>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>> Kafka > > > >>>>>>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and KSQL > > > >>> users. > > > >>>>>>>>>> We > > > >>>>>>>>>>>>> have > > > >>>>>>>>>>>>>>>> seen > > > >>>>>>>>>>>>>>>>>> several questions in the mailing list asking how to > > > >>> model a > > > >>>>>>>>>>>>> KTable > > > >>>>>>>>>>>>>>>> and how > > > >>>>>>>>>>>>>>>>>> to join a KTable in Flink SQL. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> We hope it can expand the usage of the Flink with > > > >> Kafka. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> I'm looking forward to your feedback. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>>>> Shengkai > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> -- > > > >>>>>>>>>>>>>>>>> Best, Jingsong Lee > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> -- > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Konstantin Knauf > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> https://twitter.com/snntrable > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> https://github.com/knaufk > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> -- > > > >>>>>>>>> > > > >>>>>>>>> Konstantin Knauf > > > >>>>>>>>> > > > >>>>>>>>> https://twitter.com/snntrable > > > >>>>>>>>> > > > >>>>>>>>> https://github.com/knaufk > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>>> > > > >>>> > > > >>>> -- > > > >>>> > > > >>>> Seth Wiesman | Solutions Architect > > > >>>> > > > >>>> +1 314 387 1463 > > > >>>> > > > >>>> <https://www.ververica.com/> > > > >>>> > > > >>>> Follow us @VervericaData > > > >>>> > > > >>>> -- > > > >>>> > > > >>>> Join Flink Forward <https://flink-forward.org/> - The Apache > Flink > > > >>>> Conference > > > >>>> > > > >>>> Stream Processing | Event Driven | Real Time > > > >>>> > > > >>> > > > >> > > > > > > > > > > > > > > -- > > Best, Jingsong Lee > > > -- Best, Jingsong Lee