Add one more message, I have already updated the FLIP[1].

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector

Shengkai Fang <fskm...@gmail.com> 于2020年10月23日周五 下午2:55写道:

> Hi, all.
> It seems we have reached a consensus on the FLIP. If no one has other
> objections, I would like to start the vote for FLIP-149.
>
> Best,
> Shengkai
>
> Jingsong Li <jingsongl...@gmail.com> 于2020年10月23日周五 下午2:25写道:
>
>> Thanks for explanation,
>>
>> I am OK for `upsert`. Yes, Its concept has been accepted by many systems.
>>
>> Best,
>> Jingsong
>>
>> On Fri, Oct 23, 2020 at 12:38 PM Jark Wu <imj...@gmail.com> wrote:
>>
>> > Hi Timo,
>> >
>> > I have some concerns about `kafka-cdc`,
>> > 1) cdc is an abbreviation of Change Data Capture which is commonly used
>> for
>> > databases, not for message queues.
>> > 2) usually, cdc produces full content of changelog, including
>> > UPDATE_BEFORE, however "upsert kafka" doesn't
>> > 3) `kafka-cdc` sounds like a natively support for `debezium-json`
>> format,
>> > however, it is not and even we don't want
>> >    "upsert kafka" to support "debezium-json"
>> >
>> >
>> > Hi Jingsong,
>> >
>> > I think the terminology of "upsert" is fine, because Kafka also uses
>> > "upsert" to define such behavior in their official documentation [1]:
>> >
>> > > a data record in a changelog stream is interpreted as an UPSERT aka
>> > INSERT/UPDATE
>> >
>> > Materialize uses the "UPSERT" keyword to define such behavior too [2].
>> > Users have been requesting such feature using "upsert kafka"
>> terminology in
>> > user mailing lists [3][4].
>> > Many other systems support "UPSERT" statement natively, such as impala
>> [5],
>> > SAP [6], Phoenix [7], Oracle NoSQL [8], etc..
>> >
>> > Therefore, I think we don't need to be afraid of introducing "upsert"
>> > terminology, it is widely accepted by users.
>> >
>> > Best,
>> > Jark
>> >
>> >
>> > [1]:
>> >
>> >
>> https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable
>> > [2]:
>> >
>> >
>> https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic
>> > [3]:
>> >
>> >
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-materialized-upsert-tables-td18482.html#a18503
>> > [4]:
>> >
>> >
>> http://apache-flink.147419.n8.nabble.com/Kafka-Sink-AppendStreamTableSink-doesn-t-support-consuming-update-changes-td5959.html
>> > [5]:
>> https://impala.apache.org/docs/build/html/topics/impala_upsert.html
>> > [6]:
>> >
>> >
>> https://help.sap.com/viewer/7c78579ce9b14a669c1f3295b0d8ca16/Cloud/en-US/ea8b6773be584203bcd99da76844c5ed.html
>> > [7]: https://phoenix.apache.org/atomic_upsert.html
>> > [8]:
>> >
>> >
>> https://docs.oracle.com/en/database/other-databases/nosql-database/18.3/sqlfornosql/adding-table-rows-using-insert-and-upsert-statements.html
>> >
>> > On Fri, 23 Oct 2020 at 10:36, Jingsong Li <jingsongl...@gmail.com>
>> wrote:
>> >
>> > > The `kafka-cdc` looks good to me.
>> > > We can even give options to indicate whether to turn on compact,
>> because
>> > > compact is just an optimization?
>> > >
>> > > - ktable let me think about KSQL.
>> > > - kafka-compacted it is not just compacted, more than that, it still
>> has
>> > > the ability of CDC
>> > > - upsert-kafka , upsert is back, and I don't really want to see it
>> again
>> > > since we have CDC
>> > >
>> > > Best,
>> > > Jingsong
>> > >
>> > > On Fri, Oct 23, 2020 at 2:21 AM Timo Walther <twal...@apache.org>
>> wrote:
>> > >
>> > > > Hi Jark,
>> > > >
>> > > > I would be fine with `connector=upsert-kafka`. Another idea would
>> be to
>> > > > align the name to other available Flink connectors [1]:
>> > > >
>> > > > `connector=kafka-cdc`.
>> > > >
>> > > > Regards,
>> > > > Timo
>> > > >
>> > > > [1] https://github.com/ververica/flink-cdc-connectors
>> > > >
>> > > > On 22.10.20 17:17, Jark Wu wrote:
>> > > > > Another name is "connector=upsert-kafka', I think this can solve
>> > Timo's
>> > > > > concern on the "compacted" word.
>> > > > >
>> > > > > Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify
>> such
>> > > > kafka
>> > > > > sources.
>> > > > > I think "upsert" is a well-known terminology widely used in many
>> > > systems
>> > > > > and matches the
>> > > > >   behavior of how we handle the kafka messages.
>> > > > >
>> > > > > What do you think?
>> > > > >
>> > > > > Best,
>> > > > > Jark
>> > > > >
>> > > > > [1]:
>> > > > >
>> > > >
>> > >
>> >
>> https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > On Thu, 22 Oct 2020 at 22:53, Kurt Young <ykt...@gmail.com>
>> wrote:
>> > > > >
>> > > > >> Good validation messages can't solve the broken user experience,
>> > > > especially
>> > > > >> that
>> > > > >> such update mode option will implicitly make half of current
>> kafka
>> > > > options
>> > > > >> invalid or doesn't
>> > > > >> make sense.
>> > > > >>
>> > > > >> Best,
>> > > > >> Kurt
>> > > > >>
>> > > > >>
>> > > > >> On Thu, Oct 22, 2020 at 10:31 PM Jark Wu <imj...@gmail.com>
>> wrote:
>> > > > >>
>> > > > >>> Hi Timo, Seth,
>> > > > >>>
>> > > > >>> The default value "inserting" of "mode" might be not suitable,
>> > > > >>> because "debezium-json" emits changelog messages which include
>> > > updates.
>> > > > >>>
>> > > > >>> On Thu, 22 Oct 2020 at 22:10, Seth Wiesman <s...@ververica.com>
>> > > wrote:
>> > > > >>>
>> > > > >>>> +1 for supporting upsert results into Kafka.
>> > > > >>>>
>> > > > >>>> I have no comments on the implementation details.
>> > > > >>>>
>> > > > >>>> As far as configuration goes, I tend to favor Timo's option
>> where
>> > we
>> > > > >> add
>> > > > >>> a
>> > > > >>>> "mode" property to the existing Kafka table with default value
>> > > > >>> "inserting".
>> > > > >>>> If the mode is set to "updating" then the validation changes to
>> > the
>> > > > new
>> > > > >>>> requirements. I personally find it more intuitive than a
>> seperate
>> > > > >>>> connector, my fear is users won't understand its the same
>> physical
>> > > > >> kafka
>> > > > >>>> sink under the hood and it will lead to other confusion like
>> does
>> > it
>> > > > >>> offer
>> > > > >>>> the same persistence guarantees? I think we are capable of
>> adding
>> > > good
>> > > > >>>> valdiation messaging that solves Jark and Kurts concerns.
>> > > > >>>>
>> > > > >>>>
>> > > > >>>> On Thu, Oct 22, 2020 at 8:51 AM Timo Walther <
>> twal...@apache.org>
>> > > > >> wrote:
>> > > > >>>>
>> > > > >>>>> Hi Jark,
>> > > > >>>>>
>> > > > >>>>> "calling it "kafka-compacted" can even remind users to enable
>> log
>> > > > >>>>> compaction"
>> > > > >>>>>
>> > > > >>>>> But sometimes users like to store a lineage of changes in
>> their
>> > > > >> topics.
>> > > > >>>>> Indepent of any ktable/kstream interpretation.
>> > > > >>>>>
>> > > > >>>>> I let the majority decide on this topic to not further block
>> this
>> > > > >>>>> effort. But we might find a better name like:
>> > > > >>>>>
>> > > > >>>>> connector = kafka
>> > > > >>>>> mode = updating/inserting
>> > > > >>>>>
>> > > > >>>>> OR
>> > > > >>>>>
>> > > > >>>>> connector = kafka-updating
>> > > > >>>>>
>> > > > >>>>> ...
>> > > > >>>>>
>> > > > >>>>> Regards,
>> > > > >>>>> Timo
>> > > > >>>>>
>> > > > >>>>>
>> > > > >>>>>
>> > > > >>>>>
>> > > > >>>>> On 22.10.20 15:24, Jark Wu wrote:
>> > > > >>>>>> Hi Timo,
>> > > > >>>>>>
>> > > > >>>>>> Thanks for your opinions.
>> > > > >>>>>>
>> > > > >>>>>> 1) Implementation
>> > > > >>>>>> We will have an stateful operator to generate INSERT and
>> > > > >>> UPDATE_BEFORE.
>> > > > >>>>>> This operator is keyby-ed (primary key as the shuffle key)
>> after
>> > > > >> the
>> > > > >>>>> source
>> > > > >>>>>> operator.
>> > > > >>>>>> The implementation of this operator is very similar to the
>> > > existing
>> > > > >>>>>> `DeduplicateKeepLastRowFunction`.
>> > > > >>>>>> The operator will register a value state using the primary
>> key
>> > > > >> fields
>> > > > >>>> as
>> > > > >>>>>> keys.
>> > > > >>>>>> When the value state is empty under current key, we will emit
>> > > > >> INSERT
>> > > > >>>> for
>> > > > >>>>>> the input row.
>> > > > >>>>>> When the value state is not empty under current key, we will
>> > emit
>> > > > >>>>>> UPDATE_BEFORE using the row in state,
>> > > > >>>>>> and emit UPDATE_AFTER using the input row.
>> > > > >>>>>> When the input row is DELETE, we will clear state and emit
>> > DELETE
>> > > > >>> row.
>> > > > >>>>>>
>> > > > >>>>>> 2) new option vs new connector
>> > > > >>>>>>> We recently simplified the table options to a minimum
>> amount of
>> > > > >>>>>> characters to be as concise as possible in the DDL.
>> > > > >>>>>> I think this is the reason why we want to introduce a new
>> > > > >> connector,
>> > > > >>>>>> because we can simplify the options in DDL.
>> > > > >>>>>> For example, if using a new option, the DDL may look like
>> this:
>> > > > >>>>>>
>> > > > >>>>>> CREATE TABLE users (
>> > > > >>>>>>     user_id BIGINT,
>> > > > >>>>>>     user_name STRING,
>> > > > >>>>>>     user_level STRING,
>> > > > >>>>>>     region STRING,
>> > > > >>>>>>     PRIMARY KEY (user_id) NOT ENFORCED
>> > > > >>>>>> ) WITH (
>> > > > >>>>>>     'connector' = 'kafka',
>> > > > >>>>>>     'model' = 'table',
>> > > > >>>>>>     'topic' = 'pageviews_per_region',
>> > > > >>>>>>     'properties.bootstrap.servers' = '...',
>> > > > >>>>>>     'properties.group.id' = 'testGroup',
>> > > > >>>>>>     'scan.startup.mode' = 'earliest',
>> > > > >>>>>>     'key.format' = 'csv',
>> > > > >>>>>>     'key.fields' = 'user_id',
>> > > > >>>>>>     'value.format' = 'avro',
>> > > > >>>>>>     'sink.partitioner' = 'hash'
>> > > > >>>>>> );
>> > > > >>>>>>
>> > > > >>>>>> If using a new connector, we can have a different default
>> value
>> > > for
>> > > > >>> the
>> > > > >>>>>> options and remove unnecessary options,
>> > > > >>>>>> the DDL can look like this which is much more concise:
>> > > > >>>>>>
>> > > > >>>>>> CREATE TABLE pageviews_per_region (
>> > > > >>>>>>     user_id BIGINT,
>> > > > >>>>>>     user_name STRING,
>> > > > >>>>>>     user_level STRING,
>> > > > >>>>>>     region STRING,
>> > > > >>>>>>     PRIMARY KEY (user_id) NOT ENFORCED
>> > > > >>>>>> ) WITH (
>> > > > >>>>>>     'connector' = 'kafka-compacted',
>> > > > >>>>>>     'topic' = 'pageviews_per_region',
>> > > > >>>>>>     'properties.bootstrap.servers' = '...',
>> > > > >>>>>>     'key.format' = 'csv',
>> > > > >>>>>>     'value.format' = 'avro'
>> > > > >>>>>> );
>> > > > >>>>>>
>> > > > >>>>>>> When people read `connector=kafka-compacted` they might not
>> > know
>> > > > >>> that
>> > > > >>>> it
>> > > > >>>>>>> has ktable semantics. You don't need to enable log
>> compaction
>> > in
>> > > > >>> order
>> > > > >>>>>>> to use a KTable as far as I know.
>> > > > >>>>>> We don't need to let users know it has ktable semantics, as
>> > > > >>> Konstantin
>> > > > >>>>>> mentioned this may carry more implicit
>> > > > >>>>>> meaning than we want to imply here. I agree users don't need
>> to
>> > > > >>> enable
>> > > > >>>>> log
>> > > > >>>>>> compaction, but from the production perspective,
>> > > > >>>>>> log compaction should always be enabled if it is used in this
>> > > > >>> purpose.
>> > > > >>>>>> Calling it "kafka-compacted" can even remind users to enable
>> log
>> > > > >>>>> compaction.
>> > > > >>>>>>
>> > > > >>>>>> I don't agree to introduce "model = table/stream" option, or
>> > > > >>>>>> "connector=kafka-table",
>> > > > >>>>>> because this means we are introducing Table vs Stream concept
>> > from
>> > > > >>>> KSQL.
>> > > > >>>>>> However, we don't have such top-level concept in Flink SQL
>> now,
>> > > > >> this
>> > > > >>>> will
>> > > > >>>>>> further confuse users.
>> > > > >>>>>> In Flink SQL, all the things are STREAM, the differences are
>> > > > >> whether
>> > > > >>> it
>> > > > >>>>> is
>> > > > >>>>>> bounded or unbounded,
>> > > > >>>>>>    whether it is insert-only or changelog.
>> > > > >>>>>>
>> > > > >>>>>>
>> > > > >>>>>> Best,
>> > > > >>>>>> Jark
>> > > > >>>>>>
>> > > > >>>>>>
>> > > > >>>>>> On Thu, 22 Oct 2020 at 20:39, Timo Walther <
>> twal...@apache.org>
>> > > > >>> wrote:
>> > > > >>>>>>
>> > > > >>>>>>> Hi Shengkai, Hi Jark,
>> > > > >>>>>>>
>> > > > >>>>>>> thanks for this great proposal. It is time to finally
>> connect
>> > the
>> > > > >>>>>>> changelog processor with a compacted Kafka topic.
>> > > > >>>>>>>
>> > > > >>>>>>> "The operator will produce INSERT rows, or additionally
>> > generate
>> > > > >>>>>>> UPDATE_BEFORE rows for the previous image, or produce DELETE
>> > rows
>> > > > >>> with
>> > > > >>>>>>> all columns filled with values."
>> > > > >>>>>>>
>> > > > >>>>>>> Could you elaborate a bit on the implementation details in
>> the
>> > > > >> FLIP?
>> > > > >>>> How
>> > > > >>>>>>> are UPDATE_BEFOREs are generated. How much state is
>> required to
>> > > > >>>> perform
>> > > > >>>>>>> this operation.
>> > > > >>>>>>>
>> > > > >>>>>>>    From a conceptual and semantical point of view, I'm fine
>> > with
>> > > > >> the
>> > > > >>>>>>> proposal. But I would like to share my opinion about how we
>> > > expose
>> > > > >>>> this
>> > > > >>>>>>> feature:
>> > > > >>>>>>>
>> > > > >>>>>>> ktable vs kafka-compacted
>> > > > >>>>>>>
>> > > > >>>>>>> I'm against having an additional connector like `ktable` or
>> > > > >>>>>>> `kafka-compacted`. We recently simplified the table options
>> to
>> > a
>> > > > >>>> minimum
>> > > > >>>>>>> amount of characters to be as concise as possible in the
>> DDL.
>> > > > >>>> Therefore,
>> > > > >>>>>>> I would keep the `connector=kafka` and introduce an
>> additional
>> > > > >>> option.
>> > > > >>>>>>> Because a user wants to read "from Kafka". And the "how"
>> should
>> > > be
>> > > > >>>>>>> determined in the lower options.
>> > > > >>>>>>>
>> > > > >>>>>>> When people read `connector=ktable` they might not know that
>> > this
>> > > > >> is
>> > > > >>>>>>> Kafka. Or they wonder where `kstream` is?
>> > > > >>>>>>>
>> > > > >>>>>>> When people read `connector=kafka-compacted` they might not
>> > know
>> > > > >>> that
>> > > > >>>> it
>> > > > >>>>>>> has ktable semantics. You don't need to enable log
>> compaction
>> > in
>> > > > >>> order
>> > > > >>>>>>> to use a KTable as far as I know. Log compaction and table
>> > > > >> semantics
>> > > > >>>> are
>> > > > >>>>>>> orthogonal topics.
>> > > > >>>>>>>
>> > > > >>>>>>> In the end we will need 3 types of information when
>> declaring a
>> > > > >>> Kafka
>> > > > >>>>>>> connector:
>> > > > >>>>>>>
>> > > > >>>>>>> CREATE TABLE ... WITH (
>> > > > >>>>>>>      connector=kafka        -- Some information about the
>> > > connector
>> > > > >>>>>>>      end-offset = XXXX      -- Some information about the
>> > > > >> boundedness
>> > > > >>>>>>>      model = table/stream   -- Some information about
>> > > > >> interpretation
>> > > > >>>>>>> )
>> > > > >>>>>>>
>> > > > >>>>>>>
>> > > > >>>>>>> We can still apply all the constraints mentioned in the
>> FLIP.
>> > > When
>> > > > >>>>>>> `model` is set to `table`.
>> > > > >>>>>>>
>> > > > >>>>>>> What do you think?
>> > > > >>>>>>>
>> > > > >>>>>>> Regards,
>> > > > >>>>>>> Timo
>> > > > >>>>>>>
>> > > > >>>>>>>
>> > > > >>>>>>> On 21.10.20 14:19, Jark Wu wrote:
>> > > > >>>>>>>> Hi,
>> > > > >>>>>>>>
>> > > > >>>>>>>> IMO, if we are going to mix them in one connector,
>> > > > >>>>>>>> 1) either users need to set some options to a specific
>> value
>> > > > >>>>> explicitly,
>> > > > >>>>>>>> e.g. "scan.startup.mode=earliest", "sink.partitioner=hash",
>> > > etc..
>> > > > >>>>>>>> This makes the connector awkward to use. Users may face to
>> fix
>> > > > >>>> options
>> > > > >>>>>>> one
>> > > > >>>>>>>> by one according to the exception.
>> > > > >>>>>>>> Besides, in the future, it is still possible to use
>> > > > >>>>>>>> "sink.partitioner=fixed" (reduce network cost) if users are
>> > > aware
>> > > > >>> of
>> > > > >>>>>>>> the partition routing,
>> > > > >>>>>>>> however, it's error-prone to have "fixed" as default for
>> > > > >> compacted
>> > > > >>>>> mode.
>> > > > >>>>>>>>
>> > > > >>>>>>>> 2) or make those options a different default value when
>> > > > >>>>> "compacted=true".
>> > > > >>>>>>>> This would be more confusing and unpredictable if the
>> default
>> > > > >> value
>> > > > >>>> of
>> > > > >>>>>>>> options will change according to other options.
>> > > > >>>>>>>> What happens if we have a third mode in the future?
>> > > > >>>>>>>>
>> > > > >>>>>>>> In terms of usage and options, it's very different from the
>> > > > >>>>>>>> original "kafka" connector.
>> > > > >>>>>>>> It would be more handy to use and less fallible if
>> separating
>> > > > >> them
>> > > > >>>> into
>> > > > >>>>>>> two
>> > > > >>>>>>>> connectors.
>> > > > >>>>>>>> In the implementation layer, we can reuse code as much as
>> > > > >> possible.
>> > > > >>>>>>>>
>> > > > >>>>>>>> Therefore, I'm still +1 to have a new connector.
>> > > > >>>>>>>> The "kafka-compacted" name sounds good to me.
>> > > > >>>>>>>>
>> > > > >>>>>>>> Best,
>> > > > >>>>>>>> Jark
>> > > > >>>>>>>>
>> > > > >>>>>>>>
>> > > > >>>>>>>> On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf <
>> > > > >> kna...@apache.org>
>> > > > >>>>>>> wrote:
>> > > > >>>>>>>>
>> > > > >>>>>>>>> Hi Kurt, Hi Shengkai,
>> > > > >>>>>>>>>
>> > > > >>>>>>>>> thanks for answering my questions and the additional
>> > > > >>>> clarifications. I
>> > > > >>>>>>>>> don't have a strong opinion on whether to extend the
>> "kafka"
>> > > > >>>> connector
>> > > > >>>>>>> or
>> > > > >>>>>>>>> to introduce a new connector. So, from my perspective feel
>> > free
>> > > > >> to
>> > > > >>>> go
>> > > > >>>>>>> with
>> > > > >>>>>>>>> a separate connector. If we do introduce a new connector I
>> > > > >>> wouldn't
>> > > > >>>>>>> call it
>> > > > >>>>>>>>> "ktable" for aforementioned reasons (In addition, we might
>> > > > >> suggest
>> > > > >>>>> that
>> > > > >>>>>>>>> there is also a "kstreams" connector for symmetry
>> reasons). I
>> > > > >>> don't
>> > > > >>>>>>> have a
>> > > > >>>>>>>>> good alternative name, though, maybe "kafka-compacted" or
>> > > > >>>>>>>>> "compacted-kafka".
>> > > > >>>>>>>>>
>> > > > >>>>>>>>> Thanks,
>> > > > >>>>>>>>>
>> > > > >>>>>>>>> Konstantin
>> > > > >>>>>>>>>
>> > > > >>>>>>>>>
>> > > > >>>>>>>>> On Wed, Oct 21, 2020 at 4:43 AM Kurt Young <
>> ykt...@gmail.com
>> > >
>> > > > >>>> wrote:
>> > > > >>>>>>>>>
>> > > > >>>>>>>>>> Hi all,
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>> I want to describe the discussion process which drove us
>> to
>> > > > >> have
>> > > > >>>> such
>> > > > >>>>>>>>>> conclusion, this might make some of
>> > > > >>>>>>>>>> the design choices easier to understand and keep
>> everyone on
>> > > > >> the
>> > > > >>>> same
>> > > > >>>>>>>>> page.
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>> Back to the motivation, what functionality do we want to
>> > > > >> provide
>> > > > >>> in
>> > > > >>>>> the
>> > > > >>>>>>>>>> first place? We got a lot of feedback and
>> > > > >>>>>>>>>> questions from mailing lists that people want to write
>> > > > >>>>> Not-Insert-Only
>> > > > >>>>>>>>>> messages into kafka. They might be
>> > > > >>>>>>>>>> intentional or by accident, e.g. wrote an non-windowed
>> > > > >> aggregate
>> > > > >>>>> query
>> > > > >>>>>>> or
>> > > > >>>>>>>>>> non-windowed left outer join. And
>> > > > >>>>>>>>>> some users from KSQL world also asked about why Flink
>> didn't
>> > > > >>>> leverage
>> > > > >>>>>>> the
>> > > > >>>>>>>>>> Key concept of every kafka topic
>> > > > >>>>>>>>>> and make kafka as a dynamic changing keyed table.
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>> To work with kafka better, we were thinking to extend the
>> > > > >>>>> functionality
>> > > > >>>>>>>>> of
>> > > > >>>>>>>>>> the current kafka connector by letting it
>> > > > >>>>>>>>>> accept updates and deletions. But due to the limitation
>> of
>> > > > >> kafka,
>> > > > >>>> the
>> > > > >>>>>>>>>> update has to be "update by key", aka a table
>> > > > >>>>>>>>>> with primary key.
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>> This introduces a couple of conflicts with current kafka
>> > > > >> table's
>> > > > >>>>>>> options:
>> > > > >>>>>>>>>> 1. key.fields: as said above, we need the kafka table to
>> > have
>> > > > >> the
>> > > > >>>>>>> primary
>> > > > >>>>>>>>>> key constraint. And users can also configure
>> > > > >>>>>>>>>> key.fields freely, this might cause friction. (Sure we
>> can
>> > do
>> > > > >>> some
>> > > > >>>>>>> sanity
>> > > > >>>>>>>>>> check on this but it also creates friction.)
>> > > > >>>>>>>>>> 2. sink.partitioner: to make the semantics right, we
>> need to
>> > > > >> make
>> > > > >>>>> sure
>> > > > >>>>>>>>> all
>> > > > >>>>>>>>>> the updates on the same key are written to
>> > > > >>>>>>>>>> the same kafka partition, such we should force to use a
>> hash
>> > > by
>> > > > >>> key
>> > > > >>>>>>>>>> partition inside such table. Again, this has conflicts
>> > > > >>>>>>>>>> and creates friction with current user options.
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>> The above things are solvable, though not perfect or most
>> > user
>> > > > >>>>>>> friendly.
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>> Let's take a look at the reading side. The keyed kafka
>> table
>> > > > >>>> contains
>> > > > >>>>>>> two
>> > > > >>>>>>>>>> kinds of messages: upsert or deletion. What upsert
>> > > > >>>>>>>>>> means is "If the key doesn't exist yet, it's an insert
>> > record.
>> > > > >>>>>>> Otherwise
>> > > > >>>>>>>>>> it's an update record". For the sake of correctness or
>> > > > >>>>>>>>>> simplicity, the Flink SQL engine also needs such
>> > information.
>> > > > >> If
>> > > > >>> we
>> > > > >>>>>>>>>> interpret all messages to "update record", some queries
>> or
>> > > > >>>>>>>>>> operators may not work properly. It's weird to see an
>> update
>> > > > >>> record
>> > > > >>>>> but
>> > > > >>>>>>>>> you
>> > > > >>>>>>>>>> haven't seen the insert record before.
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>> So what Flink should do is after reading out the records
>> > from
>> > > > >>> such
>> > > > >>>>>>> table,
>> > > > >>>>>>>>>> it needs to create a state to record which messages have
>> > > > >>>>>>>>>> been seen and then generate the correct row type
>> > > > >> correspondingly.
>> > > > >>>>> This
>> > > > >>>>>>>>> kind
>> > > > >>>>>>>>>> of couples the state and the data of the message
>> > > > >>>>>>>>>> queue, and it also creates conflicts with current kafka
>> > > > >>> connector.
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>> Think about if users suspend a running job (which
>> contains
>> > > some
>> > > > >>>>> reading
>> > > > >>>>>>>>>> state now), and then change the start offset of the
>> reader.
>> > > > >>>>>>>>>> By changing the reading offset, it actually change the
>> whole
>> > > > >>> story
>> > > > >>>> of
>> > > > >>>>>>>>>> "which records should be insert messages and which
>> records
>> > > > >>>>>>>>>> should be update messages). And it will also make Flink
>> to
>> > > deal
>> > > > >>>> with
>> > > > >>>>>>>>>> another weird situation that it might receive a deletion
>> > > > >>>>>>>>>> on a non existing message.
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>> We were unsatisfied with all the frictions and conflicts
>> it
>> > > > >> will
>> > > > >>>>> create
>> > > > >>>>>>>>> if
>> > > > >>>>>>>>>> we enable the "upsert & deletion" support to the current
>> > kafka
>> > > > >>>>>>>>>> connector. And later we begin to realize that we
>> shouldn't
>> > > > >> treat
>> > > > >>> it
>> > > > >>>>> as
>> > > > >>>>>>> a
>> > > > >>>>>>>>>> normal message queue, but should treat it as a changing
>> > keyed
>> > > > >>>>>>>>>> table. We should be able to always get the whole data of
>> > such
>> > > > >>> table
>> > > > >>>>> (by
>> > > > >>>>>>>>>> disabling the start offset option) and we can also read
>> the
>> > > > >>>>>>>>>> changelog out of such table. It's like a HBase table with
>> > > > >> binlog
>> > > > >>>>>>> support
>> > > > >>>>>>>>>> but doesn't have random access capability (which can be
>> > > > >> fulfilled
>> > > > >>>>>>>>>> by Flink's state).
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>> So our intention was instead of telling and persuading
>> users
>> > > > >> what
>> > > > >>>>> kind
>> > > > >>>>>>> of
>> > > > >>>>>>>>>> options they should or should not use by extending
>> > > > >>>>>>>>>> current kafka connector when enable upsert support, we
>> are
>> > > > >>> actually
>> > > > >>>>>>>>> create
>> > > > >>>>>>>>>> a whole new and different connector that has total
>> > > > >>>>>>>>>> different abstractions in SQL layer, and should be
>> treated
>> > > > >>> totally
>> > > > >>>>>>>>>> different with current kafka connector.
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>> Hope this can clarify some of the concerns.
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>> Best,
>> > > > >>>>>>>>>> Kurt
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>> On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang <
>> > > > >> fskm...@gmail.com
>> > > > >>>>
>> > > > >>>>>>> wrote:
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>>> Hi devs,
>> > > > >>>>>>>>>>>
>> > > > >>>>>>>>>>> As many people are still confused about the difference
>> > option
>> > > > >>>>>>>>> behaviours
>> > > > >>>>>>>>>>> between the Kafka connector and KTable connector, Jark
>> and
>> > I
>> > > > >>> list
>> > > > >>>>> the
>> > > > >>>>>>>>>>> differences in the doc[1].
>> > > > >>>>>>>>>>>
>> > > > >>>>>>>>>>> Best,
>> > > > >>>>>>>>>>> Shengkai
>> > > > >>>>>>>>>>>
>> > > > >>>>>>>>>>> [1]
>> > > > >>>>>>>>>>>
>> > > > >>>>>>>>>>>
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>
>> > > > >>>>>>>
>> > > > >>>>>
>> > > > >>>>
>> > > > >>>
>> > > > >>
>> > > >
>> > >
>> >
>> https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit
>> > > > >>>>>>>>>>>
>> > > > >>>>>>>>>>> Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二
>> > 下午12:05写道:
>> > > > >>>>>>>>>>>
>> > > > >>>>>>>>>>>> Hi Konstantin,
>> > > > >>>>>>>>>>>>
>> > > > >>>>>>>>>>>> Thanks for your reply.
>> > > > >>>>>>>>>>>>
>> > > > >>>>>>>>>>>>> It uses the "kafka" connector and does not specify a
>> > > primary
>> > > > >>>> key.
>> > > > >>>>>>>>>>>> The dimensional table `users` is a ktable connector
>> and we
>> > > > >> can
>> > > > >>>>>>>>> specify
>> > > > >>>>>>>>>>> the
>> > > > >>>>>>>>>>>> pk on the KTable.
>> > > > >>>>>>>>>>>>
>> > > > >>>>>>>>>>>>> Will it possible to use a "ktable" as a dimensional
>> table
>> > > in
>> > > > >>>>>>>>> FLIP-132
>> > > > >>>>>>>>>>>> Yes. We can specify the watermark on the KTable and it
>> can
>> > > be
>> > > > >>>> used
>> > > > >>>>>>>>> as a
>> > > > >>>>>>>>>>>> dimension table in temporal join.
>> > > > >>>>>>>>>>>>
>> > > > >>>>>>>>>>>>> Introduce a new connector vs introduce a new property
>> > > > >>>>>>>>>>>> The main reason behind is that the KTable connector
>> almost
>> > > > >> has
>> > > > >>> no
>> > > > >>>>>>>>>> common
>> > > > >>>>>>>>>>>> options with the Kafka connector. The options that can
>> be
>> > > > >>> reused
>> > > > >>>> by
>> > > > >>>>>>>>>>> KTable
>> > > > >>>>>>>>>>>> connectors are 'topic', 'properties.bootstrap.servers'
>> and
>> > > > >>>>>>>>>>>> 'value.fields-include' . We can't set cdc format for
>> > > > >>> 'key.format'
>> > > > >>>>> and
>> > > > >>>>>>>>>>>> 'value.format' in KTable connector now, which is
>> > available
>> > > > >> in
>> > > > >>>>> Kafka
>> > > > >>>>>>>>>>>> connector. Considering the difference between the
>> options
>> > we
>> > > > >>> can
>> > > > >>>>> use,
>> > > > >>>>>>>>>>> it's
>> > > > >>>>>>>>>>>> more suitable to introduce an another connector rather
>> > than
>> > > a
>> > > > >>>>>>>>> property.
>> > > > >>>>>>>>>>>>
>> > > > >>>>>>>>>>>> We are also fine to use "compacted-kafka" as the name
>> of
>> > the
>> > > > >>> new
>> > > > >>>>>>>>>>>> connector. What do you think?
>> > > > >>>>>>>>>>>>
>> > > > >>>>>>>>>>>> Best,
>> > > > >>>>>>>>>>>> Shengkai
>> > > > >>>>>>>>>>>>
>> > > > >>>>>>>>>>>> Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一
>> > > > >> 下午10:15写道:
>> > > > >>>>>>>>>>>>
>> > > > >>>>>>>>>>>>> Hi Shengkai,
>> > > > >>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>> Thank you for driving this effort. I believe this a
>> very
>> > > > >>>> important
>> > > > >>>>>>>>>>> feature
>> > > > >>>>>>>>>>>>> for many users who use Kafka and Flink SQL together. A
>> > few
>> > > > >>>>> questions
>> > > > >>>>>>>>>> and
>> > > > >>>>>>>>>>>>> thoughts:
>> > > > >>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>> * Is your example "Use KTable as a reference/dimension
>> > > > >> table"
>> > > > >>>>>>>>> correct?
>> > > > >>>>>>>>>>> It
>> > > > >>>>>>>>>>>>> uses the "kafka" connector and does not specify a
>> primary
>> > > > >> key.
>> > > > >>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>> * Will it be possible to use a "ktable" table directly
>> > as a
>> > > > >>>>>>>>>> dimensional
>> > > > >>>>>>>>>>>>> table in temporal join (*based on event time*)
>> > (FLIP-132)?
>> > > > >>> This
>> > > > >>>> is
>> > > > >>>>>>>>> not
>> > > > >>>>>>>>>>>>> completely clear to me from the FLIP.
>> > > > >>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>> * I'd personally prefer not to introduce a new
>> connector
>> > > and
>> > > > >>>>> instead
>> > > > >>>>>>>>>> to
>> > > > >>>>>>>>>>>>> extend the Kafka connector. We could add an additional
>> > > > >>> property
>> > > > >>>>>>>>>>>>> "compacted"
>> > > > >>>>>>>>>>>>> = "true"|"false". If it is set to "true", we can add
>> > > > >>> additional
>> > > > >>>>>>>>>>> validation
>> > > > >>>>>>>>>>>>> logic (e.g. "scan.startup.mode" can not be set,
>> primary
>> > key
>> > > > >>>>>>>>> required,
>> > > > >>>>>>>>>>>>> etc.). If we stick to a separate connector I'd not
>> call
>> > it
>> > > > >>>>> "ktable",
>> > > > >>>>>>>>>> but
>> > > > >>>>>>>>>>>>> rather "compacted-kafka" or similar. KTable seems to
>> > carry
>> > > > >>> more
>> > > > >>>>>>>>>> implicit
>> > > > >>>>>>>>>>>>> meaning than we want to imply here.
>> > > > >>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>> * I agree that this is not a bounded source. If we
>> want
>> > to
>> > > > >>>>> support a
>> > > > >>>>>>>>>>>>> bounded mode, this is an orthogonal concern that also
>> > > > >> applies
>> > > > >>> to
>> > > > >>>>>>>>> other
>> > > > >>>>>>>>>>>>> unbounded sources.
>> > > > >>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>> Best,
>> > > > >>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>> Konstantin
>> > > > >>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <
>> > imj...@gmail.com>
>> > > > >>>> wrote:
>> > > > >>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>> Hi Danny,
>> > > > >>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>> First of all, we didn't introduce any concepts from
>> KSQL
>> > > > >>> (e.g.
>> > > > >>>>>>>>>> Stream
>> > > > >>>>>>>>>>> vs
>> > > > >>>>>>>>>>>>>> Table notion).
>> > > > >>>>>>>>>>>>>> This new connector will produce a changelog stream,
>> so
>> > > it's
>> > > > >>>> still
>> > > > >>>>>>>>> a
>> > > > >>>>>>>>>>>>> dynamic
>> > > > >>>>>>>>>>>>>> table and doesn't conflict with Flink core concepts.
>> > > > >>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>> The "ktable" is just a connector name, we can also
>> call
>> > it
>> > > > >>>>>>>>>>>>>> "compacted-kafka" or something else.
>> > > > >>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can
>> > migrate
>> > > > >> to
>> > > > >>>>>>>>> Flink
>> > > > >>>>>>>>>>> SQL
>> > > > >>>>>>>>>>>>>> easily.
>> > > > >>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>> Regarding to why introducing a new connector vs a new
>> > > > >>> property
>> > > > >>>> in
>> > > > >>>>>>>>>>>>> existing
>> > > > >>>>>>>>>>>>>> kafka connector:
>> > > > >>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>> I think the main reason is that we want to have a
>> clear
>> > > > >>>>> separation
>> > > > >>>>>>>>>> for
>> > > > >>>>>>>>>>>>> such
>> > > > >>>>>>>>>>>>>> two use cases, because they are very different.
>> > > > >>>>>>>>>>>>>> We also listed reasons in the FLIP, including:
>> > > > >>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>> 1) It's hard to explain what's the behavior when
>> users
>> > > > >>> specify
>> > > > >>>>> the
>> > > > >>>>>>>>>>> start
>> > > > >>>>>>>>>>>>>> offset from a middle position (e.g. how to process
>> non
>> > > > >> exist
>> > > > >>>>>>>>> delete
>> > > > >>>>>>>>>>>>>> events).
>> > > > >>>>>>>>>>>>>>        It's dangerous if users do that. So we don't
>> > > provide
>> > > > >>> the
>> > > > >>>>>>>>> offset
>> > > > >>>>>>>>>>>>> option
>> > > > >>>>>>>>>>>>>> in the new connector at the moment.
>> > > > >>>>>>>>>>>>>> 2) It's a different perspective/abstraction on the
>> same
>> > > > >> kafka
>> > > > >>>>>>>>> topic
>> > > > >>>>>>>>>>>>> (append
>> > > > >>>>>>>>>>>>>> vs. upsert). It would be easier to understand if we
>> can
>> > > > >>>> separate
>> > > > >>>>>>>>>> them
>> > > > >>>>>>>>>>>>>>        instead of mixing them in one connector. The
>> new
>> > > > >>>> connector
>> > > > >>>>>>>>>>> requires
>> > > > >>>>>>>>>>>>>> hash sink partitioner, primary key declared, regular
>> > > > >> format.
>> > > > >>>>>>>>>>>>>>        If we mix them in one connector, it might be
>> > > > >> confusing
>> > > > >>>> how
>> > > > >>>>> to
>> > > > >>>>>>>>>> use
>> > > > >>>>>>>>>>>>> the
>> > > > >>>>>>>>>>>>>> options correctly.
>> > > > >>>>>>>>>>>>>> 3) The semantic of the KTable connector is just the
>> same
>> > > as
>> > > > >>>>> KTable
>> > > > >>>>>>>>>> in
>> > > > >>>>>>>>>>>>> Kafka
>> > > > >>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and KSQL
>> > > users.
>> > > > >>>>>>>>>>>>>>        We have seen several questions in the mailing
>> > list
>> > > > >>> asking
>> > > > >>>>> how
>> > > > >>>>>>>>> to
>> > > > >>>>>>>>>>>>> model
>> > > > >>>>>>>>>>>>>> a KTable and how to join a KTable in Flink SQL.
>> > > > >>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>> Best,
>> > > > >>>>>>>>>>>>>> Jark
>> > > > >>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 19:53, Jark Wu <
>> imj...@gmail.com
>> > >
>> > > > >>>> wrote:
>> > > > >>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>> Hi Jingsong,
>> > > > >>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>> As the FLIP describes, "KTable connector produces a
>> > > > >>> changelog
>> > > > >>>>>>>>>>> stream,
>> > > > >>>>>>>>>>>>>>> where each data record represents an update or
>> delete
>> > > > >>> event.".
>> > > > >>>>>>>>>>>>>>> Therefore, a ktable source is an unbounded stream
>> > source.
>> > > > >>>>>>>>>> Selecting
>> > > > >>>>>>>>>>> a
>> > > > >>>>>>>>>>>>>>> ktable source is similar to selecting a kafka source
>> > with
>> > > > >>>>>>>>>>>>> debezium-json
>> > > > >>>>>>>>>>>>>>> format
>> > > > >>>>>>>>>>>>>>> that it never ends and the results are continuously
>> > > > >> updated.
>> > > > >>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>> It's possible to have a bounded ktable source in the
>> > > > >> future,
>> > > > >>>> for
>> > > > >>>>>>>>>>>>> example,
>> > > > >>>>>>>>>>>>>>> add an option 'bounded=true' or 'end-offset=xxx'.
>> > > > >>>>>>>>>>>>>>> In this way, the ktable will produce a bounded
>> > changelog
>> > > > >>>> stream.
>> > > > >>>>>>>>>>>>>>> So I think this can be a compatible feature in the
>> > > future.
>> > > > >>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>> I don't think we should associate with ksql related
>> > > > >>> concepts.
>> > > > >>>>>>>>>>>>> Actually,
>> > > > >>>>>>>>>>>>>> we
>> > > > >>>>>>>>>>>>>>> didn't introduce any concepts from KSQL (e.g.
>> Stream vs
>> > > > >>> Table
>> > > > >>>>>>>>>>> notion).
>> > > > >>>>>>>>>>>>>>> The "ktable" is just a connector name, we can also
>> call
>> > > it
>> > > > >>>>>>>>>>>>>>> "compacted-kafka" or something else.
>> > > > >>>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can
>> > > migrate
>> > > > >>> to
>> > > > >>>>>>>>>> Flink
>> > > > >>>>>>>>>>>>> SQL
>> > > > >>>>>>>>>>>>>>> easily.
>> > > > >>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>> Regarding the "value.fields-include", this is an
>> option
>> > > > >>>>>>>>> introduced
>> > > > >>>>>>>>>>> in
>> > > > >>>>>>>>>>>>>>> FLIP-107 for Kafka connector.
>> > > > >>>>>>>>>>>>>>> I think we should keep the same behavior with the
>> Kafka
>> > > > >>>>>>>>> connector.
>> > > > >>>>>>>>>>> I'm
>> > > > >>>>>>>>>>>>>> not
>> > > > >>>>>>>>>>>>>>> sure what's the default behavior of KSQL.
>> > > > >>>>>>>>>>>>>>> But I guess it also stores the keys in value from
>> this
>> > > > >>> example
>> > > > >>>>>>>>>> docs
>> > > > >>>>>>>>>>>>> (see
>> > > > >>>>>>>>>>>>>>> the "users_original" table) [1].
>> > > > >>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>> Best,
>> > > > >>>>>>>>>>>>>>> Jark
>> > > > >>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>> [1]:
>> > > > >>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>
>> > > > >>>>>>>>>>>
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>
>> > > > >>>>>>>
>> > > > >>>>>
>> > > > >>>>
>> > > > >>>
>> > > > >>
>> > > >
>> > >
>> >
>> https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table
>> > > > >>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 18:17, Danny Chan <
>> > > > >>>> yuzhao....@gmail.com>
>> > > > >>>>>>>>>>>>> wrote:
>> > > > >>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>> The concept seems conflicts with the Flink
>> abstraction
>> > > > >>>> “dynamic
>> > > > >>>>>>>>>>>>> table”,
>> > > > >>>>>>>>>>>>>>>> in Flink we see both “stream” and “table” as a
>> dynamic
>> > > > >>> table,
>> > > > >>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>> I think we should make clear first how to express
>> > stream
>> > > > >>> and
>> > > > >>>>>>>>>> table
>> > > > >>>>>>>>>>>>>>>> specific features on one “dynamic table”,
>> > > > >>>>>>>>>>>>>>>> it is more natural for KSQL because KSQL takes
>> stream
>> > > and
>> > > > >>>> table
>> > > > >>>>>>>>>> as
>> > > > >>>>>>>>>>>>>>>> different abstractions for representing
>> collections.
>> > In
>> > > > >>> KSQL,
>> > > > >>>>>>>>>> only
>> > > > >>>>>>>>>>>>>> table is
>> > > > >>>>>>>>>>>>>>>> mutable and can have a primary key.
>> > > > >>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>> Does this connector belongs to the “table” scope or
>> > > > >>> “stream”
>> > > > >>>>>>>>>> scope
>> > > > >>>>>>>>>>> ?
>> > > > >>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>> Some of the concepts (such as the primary key on
>> > stream)
>> > > > >>>> should
>> > > > >>>>>>>>>> be
>> > > > >>>>>>>>>>>>>>>> suitable for all the connectors, not just Kafka,
>> > > > >> Shouldn’t
>> > > > >>>> this
>> > > > >>>>>>>>>> be
>> > > > >>>>>>>>>>> an
>> > > > >>>>>>>>>>>>>>>> extension of existing Kafka connector instead of a
>> > > > >> totally
>> > > > >>>> new
>> > > > >>>>>>>>>>>>>> connector ?
>> > > > >>>>>>>>>>>>>>>> What about the other connectors ?
>> > > > >>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>> Because this touches the core abstraction of
>> Flink, we
>> > > > >>> better
>> > > > >>>>>>>>>> have
>> > > > >>>>>>>>>>> a
>> > > > >>>>>>>>>>>>>>>> top-down overall design, following the KSQL
>> directly
>> > is
>> > > > >> not
>> > > > >>>> the
>> > > > >>>>>>>>>>>>> answer.
>> > > > >>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>> P.S. For the source
>> > > > >>>>>>>>>>>>>>>>> Shouldn’t this be an extension of existing Kafka
>> > > > >> connector
>> > > > >>>>>>>>>>> instead
>> > > > >>>>>>>>>>>>> of
>> > > > >>>>>>>>>>>>>> a
>> > > > >>>>>>>>>>>>>>>> totally new connector ?
>> > > > >>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>> How could we achieve that (e.g. set up the
>> parallelism
>> > > > >>>>>>>>>> correctly) ?
>> > > > >>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>> Best,
>> > > > >>>>>>>>>>>>>>>> Danny Chan
>> > > > >>>>>>>>>>>>>>>> 在 2020年10月19日 +0800 PM5:17,Jingsong Li <
>> > > > >>>> jingsongl...@gmail.com
>> > > > >>>>>>>>>>>> ,写道:
>> > > > >>>>>>>>>>>>>>>>> Thanks Shengkai for your proposal.
>> > > > >>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>> +1 for this feature.
>> > > > >>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>>> Future Work: Support bounded KTable source
>> > > > >>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>> I don't think it should be a future work, I think
>> it
>> > is
>> > > > >>> one
>> > > > >>>>>>>>> of
>> > > > >>>>>>>>>>> the
>> > > > >>>>>>>>>>>>>>>>> important concepts of this FLIP. We need to
>> > understand
>> > > > >> it
>> > > > >>>>>>>>> now.
>> > > > >>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>> Intuitively, a ktable in my opinion is a bounded
>> > table
>> > > > >>>> rather
>> > > > >>>>>>>>>>> than
>> > > > >>>>>>>>>>>>> a
>> > > > >>>>>>>>>>>>>>>>> stream, so select should produce a bounded table
>> by
>> > > > >>> default.
>> > > > >>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>> I think we can list Kafka related knowledge,
>> because
>> > > the
>> > > > >>>> word
>> > > > >>>>>>>>>>>>> `ktable`
>> > > > >>>>>>>>>>>>>>>> is
>> > > > >>>>>>>>>>>>>>>>> easy to associate with ksql related concepts. (If
>> > > > >>> possible,
>> > > > >>>>>>>>>> it's
>> > > > >>>>>>>>>>>>>> better
>> > > > >>>>>>>>>>>>>>>> to
>> > > > >>>>>>>>>>>>>>>>> unify with it)
>> > > > >>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>> What do you think?
>> > > > >>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>>> value.fields-include
>> > > > >>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>> What about the default behavior of KSQL?
>> > > > >>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>> Best,
>> > > > >>>>>>>>>>>>>>>>> Jingsong
>> > > > >>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>> On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang <
>> > > > >>>>>>>>>> fskm...@gmail.com
>> > > > >>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>> wrote:
>> > > > >>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>>> Hi, devs.
>> > > > >>>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>>> Jark and I want to start a new FLIP to introduce
>> the
>> > > > >>> KTable
>> > > > >>>>>>>>>>>>>>>> connector. The
>> > > > >>>>>>>>>>>>>>>>>> KTable is a shortcut of "Kafka Table", it also
>> has
>> > the
>> > > > >>> same
>> > > > >>>>>>>>>>>>>> semantics
>> > > > >>>>>>>>>>>>>>>> with
>> > > > >>>>>>>>>>>>>>>>>> the KTable notion in Kafka Stream.
>> > > > >>>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>>> FLIP-149:
>> > > > >>>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>
>> > > > >>>>>>>>>>>
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>
>> > > > >>>>>>>
>> > > > >>>>>
>> > > > >>>>
>> > > > >>>
>> > > > >>
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector
>> > > > >>>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>>> Currently many users have expressed their needs
>> for
>> > > the
>> > > > >>>>>>>>>> upsert
>> > > > >>>>>>>>>>>>> Kafka
>> > > > >>>>>>>>>>>>>>>> by
>> > > > >>>>>>>>>>>>>>>>>> mail lists and issues. The KTable connector has
>> > > several
>> > > > >>>>>>>>>>> benefits
>> > > > >>>>>>>>>>>>> for
>> > > > >>>>>>>>>>>>>>>> users:
>> > > > >>>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>>> 1. Users are able to interpret a compacted Kafka
>> > Topic
>> > > > >> as
>> > > > >>>>>>>>> an
>> > > > >>>>>>>>>>>>> upsert
>> > > > >>>>>>>>>>>>>>>> stream
>> > > > >>>>>>>>>>>>>>>>>> in Apache Flink. And also be able to write a
>> > changelog
>> > > > >>>>>>>>> stream
>> > > > >>>>>>>>>>> to
>> > > > >>>>>>>>>>>>>> Kafka
>> > > > >>>>>>>>>>>>>>>>>> (into a compacted topic).
>> > > > >>>>>>>>>>>>>>>>>> 2. As a part of the real time pipeline, store
>> join
>> > or
>> > > > >>>>>>>>>> aggregate
>> > > > >>>>>>>>>>>>>>>> result (may
>> > > > >>>>>>>>>>>>>>>>>> contain updates) into a Kafka topic for further
>> > > > >>>>>>>>> calculation;
>> > > > >>>>>>>>>>>>>>>>>> 3. The semantic of the KTable connector is just
>> the
>> > > > >> same
>> > > > >>> as
>> > > > >>>>>>>>>>>>> KTable
>> > > > >>>>>>>>>>>>>> in
>> > > > >>>>>>>>>>>>>>>> Kafka
>> > > > >>>>>>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and
>> KSQL
>> > > > >>> users.
>> > > > >>>>>>>>>> We
>> > > > >>>>>>>>>>>>> have
>> > > > >>>>>>>>>>>>>>>> seen
>> > > > >>>>>>>>>>>>>>>>>> several questions in the mailing list asking how
>> to
>> > > > >>> model a
>> > > > >>>>>>>>>>>>> KTable
>> > > > >>>>>>>>>>>>>>>> and how
>> > > > >>>>>>>>>>>>>>>>>> to join a KTable in Flink SQL.
>> > > > >>>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>>> We hope it can expand the usage of the Flink with
>> > > > >> Kafka.
>> > > > >>>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>>> I'm looking forward to your feedback.
>> > > > >>>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>>> Best,
>> > > > >>>>>>>>>>>>>>>>>> Shengkai
>> > > > >>>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>>> --
>> > > > >>>>>>>>>>>>>>>>> Best, Jingsong Lee
>> > > > >>>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>> --
>> > > > >>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>> Konstantin Knauf
>> > > > >>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>> https://twitter.com/snntrable
>> > > > >>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>> https://github.com/knaufk
>> > > > >>>>>>>>>>>>>
>> > > > >>>>>>>>>>>>
>> > > > >>>>>>>>>>>
>> > > > >>>>>>>>>>
>> > > > >>>>>>>>>
>> > > > >>>>>>>>>
>> > > > >>>>>>>>> --
>> > > > >>>>>>>>>
>> > > > >>>>>>>>> Konstantin Knauf
>> > > > >>>>>>>>>
>> > > > >>>>>>>>> https://twitter.com/snntrable
>> > > > >>>>>>>>>
>> > > > >>>>>>>>> https://github.com/knaufk
>> > > > >>>>>>>>>
>> > > > >>>>>>>>
>> > > > >>>>>>>
>> > > > >>>>>>>
>> > > > >>>>>>
>> > > > >>>>>
>> > > > >>>>>
>> > > > >>>>
>> > > > >>>> --
>> > > > >>>>
>> > > > >>>> Seth Wiesman | Solutions Architect
>> > > > >>>>
>> > > > >>>> +1 314 387 1463
>> > > > >>>>
>> > > > >>>> <https://www.ververica.com/>
>> > > > >>>>
>> > > > >>>> Follow us @VervericaData
>> > > > >>>>
>> > > > >>>> --
>> > > > >>>>
>> > > > >>>> Join Flink Forward <https://flink-forward.org/> - The Apache
>> > Flink
>> > > > >>>> Conference
>> > > > >>>>
>> > > > >>>> Stream Processing | Event Driven | Real Time
>> > > > >>>>
>> > > > >>>
>> > > > >>
>> > > > >
>> > > >
>> > > >
>> > >
>> > > --
>> > > Best, Jingsong Lee
>> > >
>> >
>>
>>
>> --
>> Best, Jingsong Lee
>>
>

Reply via email to