Thanks for explanation,

I am OK for `upsert`. Yes, Its concept has been accepted by many systems.

Best,
Jingsong

On Fri, Oct 23, 2020 at 12:38 PM Jark Wu <imj...@gmail.com> wrote:

> Hi Timo,
>
> I have some concerns about `kafka-cdc`,
> 1) cdc is an abbreviation of Change Data Capture which is commonly used for
> databases, not for message queues.
> 2) usually, cdc produces full content of changelog, including
> UPDATE_BEFORE, however "upsert kafka" doesn't
> 3) `kafka-cdc` sounds like a natively support for `debezium-json` format,
> however, it is not and even we don't want
>    "upsert kafka" to support "debezium-json"
>
>
> Hi Jingsong,
>
> I think the terminology of "upsert" is fine, because Kafka also uses
> "upsert" to define such behavior in their official documentation [1]:
>
> > a data record in a changelog stream is interpreted as an UPSERT aka
> INSERT/UPDATE
>
> Materialize uses the "UPSERT" keyword to define such behavior too [2].
> Users have been requesting such feature using "upsert kafka" terminology in
> user mailing lists [3][4].
> Many other systems support "UPSERT" statement natively, such as impala [5],
> SAP [6], Phoenix [7], Oracle NoSQL [8], etc..
>
> Therefore, I think we don't need to be afraid of introducing "upsert"
> terminology, it is widely accepted by users.
>
> Best,
> Jark
>
>
> [1]:
>
> https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable
> [2]:
>
> https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic
> [3]:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-materialized-upsert-tables-td18482.html#a18503
> [4]:
>
> http://apache-flink.147419.n8.nabble.com/Kafka-Sink-AppendStreamTableSink-doesn-t-support-consuming-update-changes-td5959.html
> [5]: https://impala.apache.org/docs/build/html/topics/impala_upsert.html
> [6]:
>
> https://help.sap.com/viewer/7c78579ce9b14a669c1f3295b0d8ca16/Cloud/en-US/ea8b6773be584203bcd99da76844c5ed.html
> [7]: https://phoenix.apache.org/atomic_upsert.html
> [8]:
>
> https://docs.oracle.com/en/database/other-databases/nosql-database/18.3/sqlfornosql/adding-table-rows-using-insert-and-upsert-statements.html
>
> On Fri, 23 Oct 2020 at 10:36, Jingsong Li <jingsongl...@gmail.com> wrote:
>
> > The `kafka-cdc` looks good to me.
> > We can even give options to indicate whether to turn on compact, because
> > compact is just an optimization?
> >
> > - ktable let me think about KSQL.
> > - kafka-compacted it is not just compacted, more than that, it still has
> > the ability of CDC
> > - upsert-kafka , upsert is back, and I don't really want to see it again
> > since we have CDC
> >
> > Best,
> > Jingsong
> >
> > On Fri, Oct 23, 2020 at 2:21 AM Timo Walther <twal...@apache.org> wrote:
> >
> > > Hi Jark,
> > >
> > > I would be fine with `connector=upsert-kafka`. Another idea would be to
> > > align the name to other available Flink connectors [1]:
> > >
> > > `connector=kafka-cdc`.
> > >
> > > Regards,
> > > Timo
> > >
> > > [1] https://github.com/ververica/flink-cdc-connectors
> > >
> > > On 22.10.20 17:17, Jark Wu wrote:
> > > > Another name is "connector=upsert-kafka', I think this can solve
> Timo's
> > > > concern on the "compacted" word.
> > > >
> > > > Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify such
> > > kafka
> > > > sources.
> > > > I think "upsert" is a well-known terminology widely used in many
> > systems
> > > > and matches the
> > > >   behavior of how we handle the kafka messages.
> > > >
> > > > What do you think?
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > [1]:
> > > >
> > >
> >
> https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic
> > > >
> > > >
> > > >
> > > >
> > > > On Thu, 22 Oct 2020 at 22:53, Kurt Young <ykt...@gmail.com> wrote:
> > > >
> > > >> Good validation messages can't solve the broken user experience,
> > > especially
> > > >> that
> > > >> such update mode option will implicitly make half of current kafka
> > > options
> > > >> invalid or doesn't
> > > >> make sense.
> > > >>
> > > >> Best,
> > > >> Kurt
> > > >>
> > > >>
> > > >> On Thu, Oct 22, 2020 at 10:31 PM Jark Wu <imj...@gmail.com> wrote:
> > > >>
> > > >>> Hi Timo, Seth,
> > > >>>
> > > >>> The default value "inserting" of "mode" might be not suitable,
> > > >>> because "debezium-json" emits changelog messages which include
> > updates.
> > > >>>
> > > >>> On Thu, 22 Oct 2020 at 22:10, Seth Wiesman <s...@ververica.com>
> > wrote:
> > > >>>
> > > >>>> +1 for supporting upsert results into Kafka.
> > > >>>>
> > > >>>> I have no comments on the implementation details.
> > > >>>>
> > > >>>> As far as configuration goes, I tend to favor Timo's option where
> we
> > > >> add
> > > >>> a
> > > >>>> "mode" property to the existing Kafka table with default value
> > > >>> "inserting".
> > > >>>> If the mode is set to "updating" then the validation changes to
> the
> > > new
> > > >>>> requirements. I personally find it more intuitive than a seperate
> > > >>>> connector, my fear is users won't understand its the same physical
> > > >> kafka
> > > >>>> sink under the hood and it will lead to other confusion like does
> it
> > > >>> offer
> > > >>>> the same persistence guarantees? I think we are capable of adding
> > good
> > > >>>> valdiation messaging that solves Jark and Kurts concerns.
> > > >>>>
> > > >>>>
> > > >>>> On Thu, Oct 22, 2020 at 8:51 AM Timo Walther <twal...@apache.org>
> > > >> wrote:
> > > >>>>
> > > >>>>> Hi Jark,
> > > >>>>>
> > > >>>>> "calling it "kafka-compacted" can even remind users to enable log
> > > >>>>> compaction"
> > > >>>>>
> > > >>>>> But sometimes users like to store a lineage of changes in their
> > > >> topics.
> > > >>>>> Indepent of any ktable/kstream interpretation.
> > > >>>>>
> > > >>>>> I let the majority decide on this topic to not further block this
> > > >>>>> effort. But we might find a better name like:
> > > >>>>>
> > > >>>>> connector = kafka
> > > >>>>> mode = updating/inserting
> > > >>>>>
> > > >>>>> OR
> > > >>>>>
> > > >>>>> connector = kafka-updating
> > > >>>>>
> > > >>>>> ...
> > > >>>>>
> > > >>>>> Regards,
> > > >>>>> Timo
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> On 22.10.20 15:24, Jark Wu wrote:
> > > >>>>>> Hi Timo,
> > > >>>>>>
> > > >>>>>> Thanks for your opinions.
> > > >>>>>>
> > > >>>>>> 1) Implementation
> > > >>>>>> We will have an stateful operator to generate INSERT and
> > > >>> UPDATE_BEFORE.
> > > >>>>>> This operator is keyby-ed (primary key as the shuffle key) after
> > > >> the
> > > >>>>> source
> > > >>>>>> operator.
> > > >>>>>> The implementation of this operator is very similar to the
> > existing
> > > >>>>>> `DeduplicateKeepLastRowFunction`.
> > > >>>>>> The operator will register a value state using the primary key
> > > >> fields
> > > >>>> as
> > > >>>>>> keys.
> > > >>>>>> When the value state is empty under current key, we will emit
> > > >> INSERT
> > > >>>> for
> > > >>>>>> the input row.
> > > >>>>>> When the value state is not empty under current key, we will
> emit
> > > >>>>>> UPDATE_BEFORE using the row in state,
> > > >>>>>> and emit UPDATE_AFTER using the input row.
> > > >>>>>> When the input row is DELETE, we will clear state and emit
> DELETE
> > > >>> row.
> > > >>>>>>
> > > >>>>>> 2) new option vs new connector
> > > >>>>>>> We recently simplified the table options to a minimum amount of
> > > >>>>>> characters to be as concise as possible in the DDL.
> > > >>>>>> I think this is the reason why we want to introduce a new
> > > >> connector,
> > > >>>>>> because we can simplify the options in DDL.
> > > >>>>>> For example, if using a new option, the DDL may look like this:
> > > >>>>>>
> > > >>>>>> CREATE TABLE users (
> > > >>>>>>     user_id BIGINT,
> > > >>>>>>     user_name STRING,
> > > >>>>>>     user_level STRING,
> > > >>>>>>     region STRING,
> > > >>>>>>     PRIMARY KEY (user_id) NOT ENFORCED
> > > >>>>>> ) WITH (
> > > >>>>>>     'connector' = 'kafka',
> > > >>>>>>     'model' = 'table',
> > > >>>>>>     'topic' = 'pageviews_per_region',
> > > >>>>>>     'properties.bootstrap.servers' = '...',
> > > >>>>>>     'properties.group.id' = 'testGroup',
> > > >>>>>>     'scan.startup.mode' = 'earliest',
> > > >>>>>>     'key.format' = 'csv',
> > > >>>>>>     'key.fields' = 'user_id',
> > > >>>>>>     'value.format' = 'avro',
> > > >>>>>>     'sink.partitioner' = 'hash'
> > > >>>>>> );
> > > >>>>>>
> > > >>>>>> If using a new connector, we can have a different default value
> > for
> > > >>> the
> > > >>>>>> options and remove unnecessary options,
> > > >>>>>> the DDL can look like this which is much more concise:
> > > >>>>>>
> > > >>>>>> CREATE TABLE pageviews_per_region (
> > > >>>>>>     user_id BIGINT,
> > > >>>>>>     user_name STRING,
> > > >>>>>>     user_level STRING,
> > > >>>>>>     region STRING,
> > > >>>>>>     PRIMARY KEY (user_id) NOT ENFORCED
> > > >>>>>> ) WITH (
> > > >>>>>>     'connector' = 'kafka-compacted',
> > > >>>>>>     'topic' = 'pageviews_per_region',
> > > >>>>>>     'properties.bootstrap.servers' = '...',
> > > >>>>>>     'key.format' = 'csv',
> > > >>>>>>     'value.format' = 'avro'
> > > >>>>>> );
> > > >>>>>>
> > > >>>>>>> When people read `connector=kafka-compacted` they might not
> know
> > > >>> that
> > > >>>> it
> > > >>>>>>> has ktable semantics. You don't need to enable log compaction
> in
> > > >>> order
> > > >>>>>>> to use a KTable as far as I know.
> > > >>>>>> We don't need to let users know it has ktable semantics, as
> > > >>> Konstantin
> > > >>>>>> mentioned this may carry more implicit
> > > >>>>>> meaning than we want to imply here. I agree users don't need to
> > > >>> enable
> > > >>>>> log
> > > >>>>>> compaction, but from the production perspective,
> > > >>>>>> log compaction should always be enabled if it is used in this
> > > >>> purpose.
> > > >>>>>> Calling it "kafka-compacted" can even remind users to enable log
> > > >>>>> compaction.
> > > >>>>>>
> > > >>>>>> I don't agree to introduce "model = table/stream" option, or
> > > >>>>>> "connector=kafka-table",
> > > >>>>>> because this means we are introducing Table vs Stream concept
> from
> > > >>>> KSQL.
> > > >>>>>> However, we don't have such top-level concept in Flink SQL now,
> > > >> this
> > > >>>> will
> > > >>>>>> further confuse users.
> > > >>>>>> In Flink SQL, all the things are STREAM, the differences are
> > > >> whether
> > > >>> it
> > > >>>>> is
> > > >>>>>> bounded or unbounded,
> > > >>>>>>    whether it is insert-only or changelog.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Best,
> > > >>>>>> Jark
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Thu, 22 Oct 2020 at 20:39, Timo Walther <twal...@apache.org>
> > > >>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi Shengkai, Hi Jark,
> > > >>>>>>>
> > > >>>>>>> thanks for this great proposal. It is time to finally connect
> the
> > > >>>>>>> changelog processor with a compacted Kafka topic.
> > > >>>>>>>
> > > >>>>>>> "The operator will produce INSERT rows, or additionally
> generate
> > > >>>>>>> UPDATE_BEFORE rows for the previous image, or produce DELETE
> rows
> > > >>> with
> > > >>>>>>> all columns filled with values."
> > > >>>>>>>
> > > >>>>>>> Could you elaborate a bit on the implementation details in the
> > > >> FLIP?
> > > >>>> How
> > > >>>>>>> are UPDATE_BEFOREs are generated. How much state is required to
> > > >>>> perform
> > > >>>>>>> this operation.
> > > >>>>>>>
> > > >>>>>>>    From a conceptual and semantical point of view, I'm fine
> with
> > > >> the
> > > >>>>>>> proposal. But I would like to share my opinion about how we
> > expose
> > > >>>> this
> > > >>>>>>> feature:
> > > >>>>>>>
> > > >>>>>>> ktable vs kafka-compacted
> > > >>>>>>>
> > > >>>>>>> I'm against having an additional connector like `ktable` or
> > > >>>>>>> `kafka-compacted`. We recently simplified the table options to
> a
> > > >>>> minimum
> > > >>>>>>> amount of characters to be as concise as possible in the DDL.
> > > >>>> Therefore,
> > > >>>>>>> I would keep the `connector=kafka` and introduce an additional
> > > >>> option.
> > > >>>>>>> Because a user wants to read "from Kafka". And the "how" should
> > be
> > > >>>>>>> determined in the lower options.
> > > >>>>>>>
> > > >>>>>>> When people read `connector=ktable` they might not know that
> this
> > > >> is
> > > >>>>>>> Kafka. Or they wonder where `kstream` is?
> > > >>>>>>>
> > > >>>>>>> When people read `connector=kafka-compacted` they might not
> know
> > > >>> that
> > > >>>> it
> > > >>>>>>> has ktable semantics. You don't need to enable log compaction
> in
> > > >>> order
> > > >>>>>>> to use a KTable as far as I know. Log compaction and table
> > > >> semantics
> > > >>>> are
> > > >>>>>>> orthogonal topics.
> > > >>>>>>>
> > > >>>>>>> In the end we will need 3 types of information when declaring a
> > > >>> Kafka
> > > >>>>>>> connector:
> > > >>>>>>>
> > > >>>>>>> CREATE TABLE ... WITH (
> > > >>>>>>>      connector=kafka        -- Some information about the
> > connector
> > > >>>>>>>      end-offset = XXXX      -- Some information about the
> > > >> boundedness
> > > >>>>>>>      model = table/stream   -- Some information about
> > > >> interpretation
> > > >>>>>>> )
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> We can still apply all the constraints mentioned in the FLIP.
> > When
> > > >>>>>>> `model` is set to `table`.
> > > >>>>>>>
> > > >>>>>>> What do you think?
> > > >>>>>>>
> > > >>>>>>> Regards,
> > > >>>>>>> Timo
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On 21.10.20 14:19, Jark Wu wrote:
> > > >>>>>>>> Hi,
> > > >>>>>>>>
> > > >>>>>>>> IMO, if we are going to mix them in one connector,
> > > >>>>>>>> 1) either users need to set some options to a specific value
> > > >>>>> explicitly,
> > > >>>>>>>> e.g. "scan.startup.mode=earliest", "sink.partitioner=hash",
> > etc..
> > > >>>>>>>> This makes the connector awkward to use. Users may face to fix
> > > >>>> options
> > > >>>>>>> one
> > > >>>>>>>> by one according to the exception.
> > > >>>>>>>> Besides, in the future, it is still possible to use
> > > >>>>>>>> "sink.partitioner=fixed" (reduce network cost) if users are
> > aware
> > > >>> of
> > > >>>>>>>> the partition routing,
> > > >>>>>>>> however, it's error-prone to have "fixed" as default for
> > > >> compacted
> > > >>>>> mode.
> > > >>>>>>>>
> > > >>>>>>>> 2) or make those options a different default value when
> > > >>>>> "compacted=true".
> > > >>>>>>>> This would be more confusing and unpredictable if the default
> > > >> value
> > > >>>> of
> > > >>>>>>>> options will change according to other options.
> > > >>>>>>>> What happens if we have a third mode in the future?
> > > >>>>>>>>
> > > >>>>>>>> In terms of usage and options, it's very different from the
> > > >>>>>>>> original "kafka" connector.
> > > >>>>>>>> It would be more handy to use and less fallible if separating
> > > >> them
> > > >>>> into
> > > >>>>>>> two
> > > >>>>>>>> connectors.
> > > >>>>>>>> In the implementation layer, we can reuse code as much as
> > > >> possible.
> > > >>>>>>>>
> > > >>>>>>>> Therefore, I'm still +1 to have a new connector.
> > > >>>>>>>> The "kafka-compacted" name sounds good to me.
> > > >>>>>>>>
> > > >>>>>>>> Best,
> > > >>>>>>>> Jark
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf <
> > > >> kna...@apache.org>
> > > >>>>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Hi Kurt, Hi Shengkai,
> > > >>>>>>>>>
> > > >>>>>>>>> thanks for answering my questions and the additional
> > > >>>> clarifications. I
> > > >>>>>>>>> don't have a strong opinion on whether to extend the "kafka"
> > > >>>> connector
> > > >>>>>>> or
> > > >>>>>>>>> to introduce a new connector. So, from my perspective feel
> free
> > > >> to
> > > >>>> go
> > > >>>>>>> with
> > > >>>>>>>>> a separate connector. If we do introduce a new connector I
> > > >>> wouldn't
> > > >>>>>>> call it
> > > >>>>>>>>> "ktable" for aforementioned reasons (In addition, we might
> > > >> suggest
> > > >>>>> that
> > > >>>>>>>>> there is also a "kstreams" connector for symmetry reasons). I
> > > >>> don't
> > > >>>>>>> have a
> > > >>>>>>>>> good alternative name, though, maybe "kafka-compacted" or
> > > >>>>>>>>> "compacted-kafka".
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks,
> > > >>>>>>>>>
> > > >>>>>>>>> Konstantin
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> On Wed, Oct 21, 2020 at 4:43 AM Kurt Young <ykt...@gmail.com
> >
> > > >>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Hi all,
> > > >>>>>>>>>>
> > > >>>>>>>>>> I want to describe the discussion process which drove us to
> > > >> have
> > > >>>> such
> > > >>>>>>>>>> conclusion, this might make some of
> > > >>>>>>>>>> the design choices easier to understand and keep everyone on
> > > >> the
> > > >>>> same
> > > >>>>>>>>> page.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Back to the motivation, what functionality do we want to
> > > >> provide
> > > >>> in
> > > >>>>> the
> > > >>>>>>>>>> first place? We got a lot of feedback and
> > > >>>>>>>>>> questions from mailing lists that people want to write
> > > >>>>> Not-Insert-Only
> > > >>>>>>>>>> messages into kafka. They might be
> > > >>>>>>>>>> intentional or by accident, e.g. wrote an non-windowed
> > > >> aggregate
> > > >>>>> query
> > > >>>>>>> or
> > > >>>>>>>>>> non-windowed left outer join. And
> > > >>>>>>>>>> some users from KSQL world also asked about why Flink didn't
> > > >>>> leverage
> > > >>>>>>> the
> > > >>>>>>>>>> Key concept of every kafka topic
> > > >>>>>>>>>> and make kafka as a dynamic changing keyed table.
> > > >>>>>>>>>>
> > > >>>>>>>>>> To work with kafka better, we were thinking to extend the
> > > >>>>> functionality
> > > >>>>>>>>> of
> > > >>>>>>>>>> the current kafka connector by letting it
> > > >>>>>>>>>> accept updates and deletions. But due to the limitation of
> > > >> kafka,
> > > >>>> the
> > > >>>>>>>>>> update has to be "update by key", aka a table
> > > >>>>>>>>>> with primary key.
> > > >>>>>>>>>>
> > > >>>>>>>>>> This introduces a couple of conflicts with current kafka
> > > >> table's
> > > >>>>>>> options:
> > > >>>>>>>>>> 1. key.fields: as said above, we need the kafka table to
> have
> > > >> the
> > > >>>>>>> primary
> > > >>>>>>>>>> key constraint. And users can also configure
> > > >>>>>>>>>> key.fields freely, this might cause friction. (Sure we can
> do
> > > >>> some
> > > >>>>>>> sanity
> > > >>>>>>>>>> check on this but it also creates friction.)
> > > >>>>>>>>>> 2. sink.partitioner: to make the semantics right, we need to
> > > >> make
> > > >>>>> sure
> > > >>>>>>>>> all
> > > >>>>>>>>>> the updates on the same key are written to
> > > >>>>>>>>>> the same kafka partition, such we should force to use a hash
> > by
> > > >>> key
> > > >>>>>>>>>> partition inside such table. Again, this has conflicts
> > > >>>>>>>>>> and creates friction with current user options.
> > > >>>>>>>>>>
> > > >>>>>>>>>> The above things are solvable, though not perfect or most
> user
> > > >>>>>>> friendly.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Let's take a look at the reading side. The keyed kafka table
> > > >>>> contains
> > > >>>>>>> two
> > > >>>>>>>>>> kinds of messages: upsert or deletion. What upsert
> > > >>>>>>>>>> means is "If the key doesn't exist yet, it's an insert
> record.
> > > >>>>>>> Otherwise
> > > >>>>>>>>>> it's an update record". For the sake of correctness or
> > > >>>>>>>>>> simplicity, the Flink SQL engine also needs such
> information.
> > > >> If
> > > >>> we
> > > >>>>>>>>>> interpret all messages to "update record", some queries or
> > > >>>>>>>>>> operators may not work properly. It's weird to see an update
> > > >>> record
> > > >>>>> but
> > > >>>>>>>>> you
> > > >>>>>>>>>> haven't seen the insert record before.
> > > >>>>>>>>>>
> > > >>>>>>>>>> So what Flink should do is after reading out the records
> from
> > > >>> such
> > > >>>>>>> table,
> > > >>>>>>>>>> it needs to create a state to record which messages have
> > > >>>>>>>>>> been seen and then generate the correct row type
> > > >> correspondingly.
> > > >>>>> This
> > > >>>>>>>>> kind
> > > >>>>>>>>>> of couples the state and the data of the message
> > > >>>>>>>>>> queue, and it also creates conflicts with current kafka
> > > >>> connector.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Think about if users suspend a running job (which contains
> > some
> > > >>>>> reading
> > > >>>>>>>>>> state now), and then change the start offset of the reader.
> > > >>>>>>>>>> By changing the reading offset, it actually change the whole
> > > >>> story
> > > >>>> of
> > > >>>>>>>>>> "which records should be insert messages and which records
> > > >>>>>>>>>> should be update messages). And it will also make Flink to
> > deal
> > > >>>> with
> > > >>>>>>>>>> another weird situation that it might receive a deletion
> > > >>>>>>>>>> on a non existing message.
> > > >>>>>>>>>>
> > > >>>>>>>>>> We were unsatisfied with all the frictions and conflicts it
> > > >> will
> > > >>>>> create
> > > >>>>>>>>> if
> > > >>>>>>>>>> we enable the "upsert & deletion" support to the current
> kafka
> > > >>>>>>>>>> connector. And later we begin to realize that we shouldn't
> > > >> treat
> > > >>> it
> > > >>>>> as
> > > >>>>>>> a
> > > >>>>>>>>>> normal message queue, but should treat it as a changing
> keyed
> > > >>>>>>>>>> table. We should be able to always get the whole data of
> such
> > > >>> table
> > > >>>>> (by
> > > >>>>>>>>>> disabling the start offset option) and we can also read the
> > > >>>>>>>>>> changelog out of such table. It's like a HBase table with
> > > >> binlog
> > > >>>>>>> support
> > > >>>>>>>>>> but doesn't have random access capability (which can be
> > > >> fulfilled
> > > >>>>>>>>>> by Flink's state).
> > > >>>>>>>>>>
> > > >>>>>>>>>> So our intention was instead of telling and persuading users
> > > >> what
> > > >>>>> kind
> > > >>>>>>> of
> > > >>>>>>>>>> options they should or should not use by extending
> > > >>>>>>>>>> current kafka connector when enable upsert support, we are
> > > >>> actually
> > > >>>>>>>>> create
> > > >>>>>>>>>> a whole new and different connector that has total
> > > >>>>>>>>>> different abstractions in SQL layer, and should be treated
> > > >>> totally
> > > >>>>>>>>>> different with current kafka connector.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Hope this can clarify some of the concerns.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Best,
> > > >>>>>>>>>> Kurt
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang <
> > > >> fskm...@gmail.com
> > > >>>>
> > > >>>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> Hi devs,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> As many people are still confused about the difference
> option
> > > >>>>>>>>> behaviours
> > > >>>>>>>>>>> between the Kafka connector and KTable connector, Jark and
> I
> > > >>> list
> > > >>>>> the
> > > >>>>>>>>>>> differences in the doc[1].
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Best,
> > > >>>>>>>>>>> Shengkai
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> [1]
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
> https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二
> 下午12:05写道:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Hi Konstantin,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Thanks for your reply.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> It uses the "kafka" connector and does not specify a
> > primary
> > > >>>> key.
> > > >>>>>>>>>>>> The dimensional table `users` is a ktable connector and we
> > > >> can
> > > >>>>>>>>> specify
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>> pk on the KTable.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> Will it possible to use a "ktable" as a dimensional table
> > in
> > > >>>>>>>>> FLIP-132
> > > >>>>>>>>>>>> Yes. We can specify the watermark on the KTable and it can
> > be
> > > >>>> used
> > > >>>>>>>>> as a
> > > >>>>>>>>>>>> dimension table in temporal join.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> Introduce a new connector vs introduce a new property
> > > >>>>>>>>>>>> The main reason behind is that the KTable connector almost
> > > >> has
> > > >>> no
> > > >>>>>>>>>> common
> > > >>>>>>>>>>>> options with the Kafka connector. The options that can be
> > > >>> reused
> > > >>>> by
> > > >>>>>>>>>>> KTable
> > > >>>>>>>>>>>> connectors are 'topic', 'properties.bootstrap.servers' and
> > > >>>>>>>>>>>> 'value.fields-include' . We can't set cdc format for
> > > >>> 'key.format'
> > > >>>>> and
> > > >>>>>>>>>>>> 'value.format' in KTable connector now, which is
> available
> > > >> in
> > > >>>>> Kafka
> > > >>>>>>>>>>>> connector. Considering the difference between the options
> we
> > > >>> can
> > > >>>>> use,
> > > >>>>>>>>>>> it's
> > > >>>>>>>>>>>> more suitable to introduce an another connector rather
> than
> > a
> > > >>>>>>>>> property.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> We are also fine to use "compacted-kafka" as the name of
> the
> > > >>> new
> > > >>>>>>>>>>>> connector. What do you think?
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Best,
> > > >>>>>>>>>>>> Shengkai
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一
> > > >> 下午10:15写道:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> Hi Shengkai,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Thank you for driving this effort. I believe this a very
> > > >>>> important
> > > >>>>>>>>>>> feature
> > > >>>>>>>>>>>>> for many users who use Kafka and Flink SQL together. A
> few
> > > >>>>> questions
> > > >>>>>>>>>> and
> > > >>>>>>>>>>>>> thoughts:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> * Is your example "Use KTable as a reference/dimension
> > > >> table"
> > > >>>>>>>>> correct?
> > > >>>>>>>>>>> It
> > > >>>>>>>>>>>>> uses the "kafka" connector and does not specify a primary
> > > >> key.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> * Will it be possible to use a "ktable" table directly
> as a
> > > >>>>>>>>>> dimensional
> > > >>>>>>>>>>>>> table in temporal join (*based on event time*)
> (FLIP-132)?
> > > >>> This
> > > >>>> is
> > > >>>>>>>>> not
> > > >>>>>>>>>>>>> completely clear to me from the FLIP.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> * I'd personally prefer not to introduce a new connector
> > and
> > > >>>>> instead
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>>> extend the Kafka connector. We could add an additional
> > > >>> property
> > > >>>>>>>>>>>>> "compacted"
> > > >>>>>>>>>>>>> = "true"|"false". If it is set to "true", we can add
> > > >>> additional
> > > >>>>>>>>>>> validation
> > > >>>>>>>>>>>>> logic (e.g. "scan.startup.mode" can not be set, primary
> key
> > > >>>>>>>>> required,
> > > >>>>>>>>>>>>> etc.). If we stick to a separate connector I'd not call
> it
> > > >>>>> "ktable",
> > > >>>>>>>>>> but
> > > >>>>>>>>>>>>> rather "compacted-kafka" or similar. KTable seems to
> carry
> > > >>> more
> > > >>>>>>>>>> implicit
> > > >>>>>>>>>>>>> meaning than we want to imply here.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> * I agree that this is not a bounded source. If we want
> to
> > > >>>>> support a
> > > >>>>>>>>>>>>> bounded mode, this is an orthogonal concern that also
> > > >> applies
> > > >>> to
> > > >>>>>>>>> other
> > > >>>>>>>>>>>>> unbounded sources.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Konstantin
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <
> imj...@gmail.com>
> > > >>>> wrote:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Hi Danny,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> First of all, we didn't introduce any concepts from KSQL
> > > >>> (e.g.
> > > >>>>>>>>>> Stream
> > > >>>>>>>>>>> vs
> > > >>>>>>>>>>>>>> Table notion).
> > > >>>>>>>>>>>>>> This new connector will produce a changelog stream, so
> > it's
> > > >>>> still
> > > >>>>>>>>> a
> > > >>>>>>>>>>>>> dynamic
> > > >>>>>>>>>>>>>> table and doesn't conflict with Flink core concepts.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> The "ktable" is just a connector name, we can also call
> it
> > > >>>>>>>>>>>>>> "compacted-kafka" or something else.
> > > >>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can
> migrate
> > > >> to
> > > >>>>>>>>> Flink
> > > >>>>>>>>>>> SQL
> > > >>>>>>>>>>>>>> easily.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Regarding to why introducing a new connector vs a new
> > > >>> property
> > > >>>> in
> > > >>>>>>>>>>>>> existing
> > > >>>>>>>>>>>>>> kafka connector:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I think the main reason is that we want to have a clear
> > > >>>>> separation
> > > >>>>>>>>>> for
> > > >>>>>>>>>>>>> such
> > > >>>>>>>>>>>>>> two use cases, because they are very different.
> > > >>>>>>>>>>>>>> We also listed reasons in the FLIP, including:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> 1) It's hard to explain what's the behavior when users
> > > >>> specify
> > > >>>>> the
> > > >>>>>>>>>>> start
> > > >>>>>>>>>>>>>> offset from a middle position (e.g. how to process non
> > > >> exist
> > > >>>>>>>>> delete
> > > >>>>>>>>>>>>>> events).
> > > >>>>>>>>>>>>>>        It's dangerous if users do that. So we don't
> > provide
> > > >>> the
> > > >>>>>>>>> offset
> > > >>>>>>>>>>>>> option
> > > >>>>>>>>>>>>>> in the new connector at the moment.
> > > >>>>>>>>>>>>>> 2) It's a different perspective/abstraction on the same
> > > >> kafka
> > > >>>>>>>>> topic
> > > >>>>>>>>>>>>> (append
> > > >>>>>>>>>>>>>> vs. upsert). It would be easier to understand if we can
> > > >>>> separate
> > > >>>>>>>>>> them
> > > >>>>>>>>>>>>>>        instead of mixing them in one connector. The new
> > > >>>> connector
> > > >>>>>>>>>>> requires
> > > >>>>>>>>>>>>>> hash sink partitioner, primary key declared, regular
> > > >> format.
> > > >>>>>>>>>>>>>>        If we mix them in one connector, it might be
> > > >> confusing
> > > >>>> how
> > > >>>>> to
> > > >>>>>>>>>> use
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>> options correctly.
> > > >>>>>>>>>>>>>> 3) The semantic of the KTable connector is just the same
> > as
> > > >>>>> KTable
> > > >>>>>>>>>> in
> > > >>>>>>>>>>>>> Kafka
> > > >>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and KSQL
> > users.
> > > >>>>>>>>>>>>>>        We have seen several questions in the mailing
> list
> > > >>> asking
> > > >>>>> how
> > > >>>>>>>>> to
> > > >>>>>>>>>>>>> model
> > > >>>>>>>>>>>>>> a KTable and how to join a KTable in Flink SQL.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>> Jark
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 19:53, Jark Wu <imj...@gmail.com
> >
> > > >>>> wrote:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Hi Jingsong,
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> As the FLIP describes, "KTable connector produces a
> > > >>> changelog
> > > >>>>>>>>>>> stream,
> > > >>>>>>>>>>>>>>> where each data record represents an update or delete
> > > >>> event.".
> > > >>>>>>>>>>>>>>> Therefore, a ktable source is an unbounded stream
> source.
> > > >>>>>>>>>> Selecting
> > > >>>>>>>>>>> a
> > > >>>>>>>>>>>>>>> ktable source is similar to selecting a kafka source
> with
> > > >>>>>>>>>>>>> debezium-json
> > > >>>>>>>>>>>>>>> format
> > > >>>>>>>>>>>>>>> that it never ends and the results are continuously
> > > >> updated.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> It's possible to have a bounded ktable source in the
> > > >> future,
> > > >>>> for
> > > >>>>>>>>>>>>> example,
> > > >>>>>>>>>>>>>>> add an option 'bounded=true' or 'end-offset=xxx'.
> > > >>>>>>>>>>>>>>> In this way, the ktable will produce a bounded
> changelog
> > > >>>> stream.
> > > >>>>>>>>>>>>>>> So I think this can be a compatible feature in the
> > future.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> I don't think we should associate with ksql related
> > > >>> concepts.
> > > >>>>>>>>>>>>> Actually,
> > > >>>>>>>>>>>>>> we
> > > >>>>>>>>>>>>>>> didn't introduce any concepts from KSQL (e.g. Stream vs
> > > >>> Table
> > > >>>>>>>>>>> notion).
> > > >>>>>>>>>>>>>>> The "ktable" is just a connector name, we can also call
> > it
> > > >>>>>>>>>>>>>>> "compacted-kafka" or something else.
> > > >>>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can
> > migrate
> > > >>> to
> > > >>>>>>>>>> Flink
> > > >>>>>>>>>>>>> SQL
> > > >>>>>>>>>>>>>>> easily.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Regarding the "value.fields-include", this is an option
> > > >>>>>>>>> introduced
> > > >>>>>>>>>>> in
> > > >>>>>>>>>>>>>>> FLIP-107 for Kafka connector.
> > > >>>>>>>>>>>>>>> I think we should keep the same behavior with the Kafka
> > > >>>>>>>>> connector.
> > > >>>>>>>>>>> I'm
> > > >>>>>>>>>>>>>> not
> > > >>>>>>>>>>>>>>> sure what's the default behavior of KSQL.
> > > >>>>>>>>>>>>>>> But I guess it also stores the keys in value from this
> > > >>> example
> > > >>>>>>>>>> docs
> > > >>>>>>>>>>>>> (see
> > > >>>>>>>>>>>>>>> the "users_original" table) [1].
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>> Jark
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> [1]:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
> https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 18:17, Danny Chan <
> > > >>>> yuzhao....@gmail.com>
> > > >>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> The concept seems conflicts with the Flink abstraction
> > > >>>> “dynamic
> > > >>>>>>>>>>>>> table”,
> > > >>>>>>>>>>>>>>>> in Flink we see both “stream” and “table” as a dynamic
> > > >>> table,
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> I think we should make clear first how to express
> stream
> > > >>> and
> > > >>>>>>>>>> table
> > > >>>>>>>>>>>>>>>> specific features on one “dynamic table”,
> > > >>>>>>>>>>>>>>>> it is more natural for KSQL because KSQL takes stream
> > and
> > > >>>> table
> > > >>>>>>>>>> as
> > > >>>>>>>>>>>>>>>> different abstractions for representing collections.
> In
> > > >>> KSQL,
> > > >>>>>>>>>> only
> > > >>>>>>>>>>>>>> table is
> > > >>>>>>>>>>>>>>>> mutable and can have a primary key.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Does this connector belongs to the “table” scope or
> > > >>> “stream”
> > > >>>>>>>>>> scope
> > > >>>>>>>>>>> ?
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Some of the concepts (such as the primary key on
> stream)
> > > >>>> should
> > > >>>>>>>>>> be
> > > >>>>>>>>>>>>>>>> suitable for all the connectors, not just Kafka,
> > > >> Shouldn’t
> > > >>>> this
> > > >>>>>>>>>> be
> > > >>>>>>>>>>> an
> > > >>>>>>>>>>>>>>>> extension of existing Kafka connector instead of a
> > > >> totally
> > > >>>> new
> > > >>>>>>>>>>>>>> connector ?
> > > >>>>>>>>>>>>>>>> What about the other connectors ?
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Because this touches the core abstraction of Flink, we
> > > >>> better
> > > >>>>>>>>>> have
> > > >>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>> top-down overall design, following the KSQL directly
> is
> > > >> not
> > > >>>> the
> > > >>>>>>>>>>>>> answer.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> P.S. For the source
> > > >>>>>>>>>>>>>>>>> Shouldn’t this be an extension of existing Kafka
> > > >> connector
> > > >>>>>>>>>>> instead
> > > >>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>> totally new connector ?
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> How could we achieve that (e.g. set up the parallelism
> > > >>>>>>>>>> correctly) ?
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>> Danny Chan
> > > >>>>>>>>>>>>>>>> 在 2020年10月19日 +0800 PM5:17,Jingsong Li <
> > > >>>> jingsongl...@gmail.com
> > > >>>>>>>>>>>> ,写道:
> > > >>>>>>>>>>>>>>>>> Thanks Shengkai for your proposal.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> +1 for this feature.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Future Work: Support bounded KTable source
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> I don't think it should be a future work, I think it
> is
> > > >>> one
> > > >>>>>>>>> of
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>> important concepts of this FLIP. We need to
> understand
> > > >> it
> > > >>>>>>>>> now.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Intuitively, a ktable in my opinion is a bounded
> table
> > > >>>> rather
> > > >>>>>>>>>>> than
> > > >>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>> stream, so select should produce a bounded table by
> > > >>> default.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> I think we can list Kafka related knowledge, because
> > the
> > > >>>> word
> > > >>>>>>>>>>>>> `ktable`
> > > >>>>>>>>>>>>>>>> is
> > > >>>>>>>>>>>>>>>>> easy to associate with ksql related concepts. (If
> > > >>> possible,
> > > >>>>>>>>>> it's
> > > >>>>>>>>>>>>>> better
> > > >>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>> unify with it)
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> What do you think?
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> value.fields-include
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> What about the default behavior of KSQL?
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>> Jingsong
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang <
> > > >>>>>>>>>> fskm...@gmail.com
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Hi, devs.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Jark and I want to start a new FLIP to introduce the
> > > >>> KTable
> > > >>>>>>>>>>>>>>>> connector. The
> > > >>>>>>>>>>>>>>>>>> KTable is a shortcut of "Kafka Table", it also has
> the
> > > >>> same
> > > >>>>>>>>>>>>>> semantics
> > > >>>>>>>>>>>>>>>> with
> > > >>>>>>>>>>>>>>>>>> the KTable notion in Kafka Stream.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> FLIP-149:
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Currently many users have expressed their needs for
> > the
> > > >>>>>>>>>> upsert
> > > >>>>>>>>>>>>> Kafka
> > > >>>>>>>>>>>>>>>> by
> > > >>>>>>>>>>>>>>>>>> mail lists and issues. The KTable connector has
> > several
> > > >>>>>>>>>>> benefits
> > > >>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>> users:
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> 1. Users are able to interpret a compacted Kafka
> Topic
> > > >> as
> > > >>>>>>>>> an
> > > >>>>>>>>>>>>> upsert
> > > >>>>>>>>>>>>>>>> stream
> > > >>>>>>>>>>>>>>>>>> in Apache Flink. And also be able to write a
> changelog
> > > >>>>>>>>> stream
> > > >>>>>>>>>>> to
> > > >>>>>>>>>>>>>> Kafka
> > > >>>>>>>>>>>>>>>>>> (into a compacted topic).
> > > >>>>>>>>>>>>>>>>>> 2. As a part of the real time pipeline, store join
> or
> > > >>>>>>>>>> aggregate
> > > >>>>>>>>>>>>>>>> result (may
> > > >>>>>>>>>>>>>>>>>> contain updates) into a Kafka topic for further
> > > >>>>>>>>> calculation;
> > > >>>>>>>>>>>>>>>>>> 3. The semantic of the KTable connector is just the
> > > >> same
> > > >>> as
> > > >>>>>>>>>>>>> KTable
> > > >>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>> Kafka
> > > >>>>>>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and KSQL
> > > >>> users.
> > > >>>>>>>>>> We
> > > >>>>>>>>>>>>> have
> > > >>>>>>>>>>>>>>>> seen
> > > >>>>>>>>>>>>>>>>>> several questions in the mailing list asking how to
> > > >>> model a
> > > >>>>>>>>>>>>> KTable
> > > >>>>>>>>>>>>>>>> and how
> > > >>>>>>>>>>>>>>>>>> to join a KTable in Flink SQL.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> We hope it can expand the usage of the Flink with
> > > >> Kafka.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> I'm looking forward to your feedback.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>>> Shengkai
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> --
> > > >>>>>>>>>>>>>>>>> Best, Jingsong Lee
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> --
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Konstantin Knauf
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> https://twitter.com/snntrable
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> https://github.com/knaufk
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> --
> > > >>>>>>>>>
> > > >>>>>>>>> Konstantin Knauf
> > > >>>>>>>>>
> > > >>>>>>>>> https://twitter.com/snntrable
> > > >>>>>>>>>
> > > >>>>>>>>> https://github.com/knaufk
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>> --
> > > >>>>
> > > >>>> Seth Wiesman | Solutions Architect
> > > >>>>
> > > >>>> +1 314 387 1463
> > > >>>>
> > > >>>> <https://www.ververica.com/>
> > > >>>>
> > > >>>> Follow us @VervericaData
> > > >>>>
> > > >>>> --
> > > >>>>
> > > >>>> Join Flink Forward <https://flink-forward.org/> - The Apache
> Flink
> > > >>>> Conference
> > > >>>>
> > > >>>> Stream Processing | Event Driven | Real Time
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
> > --
> > Best, Jingsong Lee
> >
>


-- 
Best, Jingsong Lee

Reply via email to