Thanks Shengkai!

+1 to start voting.

Best,
Jark

On Fri, 23 Oct 2020 at 15:02, Shengkai Fang <fskm...@gmail.com> wrote:

> Add one more message, I have already updated the FLIP[1].
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector
>
> Shengkai Fang <fskm...@gmail.com> 于2020年10月23日周五 下午2:55写道:
>
> > Hi, all.
> > It seems we have reached a consensus on the FLIP. If no one has other
> > objections, I would like to start the vote for FLIP-149.
> >
> > Best,
> > Shengkai
> >
> > Jingsong Li <jingsongl...@gmail.com> 于2020年10月23日周五 下午2:25写道:
> >
> >> Thanks for explanation,
> >>
> >> I am OK for `upsert`. Yes, Its concept has been accepted by many
> systems.
> >>
> >> Best,
> >> Jingsong
> >>
> >> On Fri, Oct 23, 2020 at 12:38 PM Jark Wu <imj...@gmail.com> wrote:
> >>
> >> > Hi Timo,
> >> >
> >> > I have some concerns about `kafka-cdc`,
> >> > 1) cdc is an abbreviation of Change Data Capture which is commonly
> used
> >> for
> >> > databases, not for message queues.
> >> > 2) usually, cdc produces full content of changelog, including
> >> > UPDATE_BEFORE, however "upsert kafka" doesn't
> >> > 3) `kafka-cdc` sounds like a natively support for `debezium-json`
> >> format,
> >> > however, it is not and even we don't want
> >> >    "upsert kafka" to support "debezium-json"
> >> >
> >> >
> >> > Hi Jingsong,
> >> >
> >> > I think the terminology of "upsert" is fine, because Kafka also uses
> >> > "upsert" to define such behavior in their official documentation [1]:
> >> >
> >> > > a data record in a changelog stream is interpreted as an UPSERT aka
> >> > INSERT/UPDATE
> >> >
> >> > Materialize uses the "UPSERT" keyword to define such behavior too [2].
> >> > Users have been requesting such feature using "upsert kafka"
> >> terminology in
> >> > user mailing lists [3][4].
> >> > Many other systems support "UPSERT" statement natively, such as impala
> >> [5],
> >> > SAP [6], Phoenix [7], Oracle NoSQL [8], etc..
> >> >
> >> > Therefore, I think we don't need to be afraid of introducing "upsert"
> >> > terminology, it is widely accepted by users.
> >> >
> >> > Best,
> >> > Jark
> >> >
> >> >
> >> > [1]:
> >> >
> >> >
> >>
> https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable
> >> > [2]:
> >> >
> >> >
> >>
> https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic
> >> > [3]:
> >> >
> >> >
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-materialized-upsert-tables-td18482.html#a18503
> >> > [4]:
> >> >
> >> >
> >>
> http://apache-flink.147419.n8.nabble.com/Kafka-Sink-AppendStreamTableSink-doesn-t-support-consuming-update-changes-td5959.html
> >> > [5]:
> >> https://impala.apache.org/docs/build/html/topics/impala_upsert.html
> >> > [6]:
> >> >
> >> >
> >>
> https://help.sap.com/viewer/7c78579ce9b14a669c1f3295b0d8ca16/Cloud/en-US/ea8b6773be584203bcd99da76844c5ed.html
> >> > [7]: https://phoenix.apache.org/atomic_upsert.html
> >> > [8]:
> >> >
> >> >
> >>
> https://docs.oracle.com/en/database/other-databases/nosql-database/18.3/sqlfornosql/adding-table-rows-using-insert-and-upsert-statements.html
> >> >
> >> > On Fri, 23 Oct 2020 at 10:36, Jingsong Li <jingsongl...@gmail.com>
> >> wrote:
> >> >
> >> > > The `kafka-cdc` looks good to me.
> >> > > We can even give options to indicate whether to turn on compact,
> >> because
> >> > > compact is just an optimization?
> >> > >
> >> > > - ktable let me think about KSQL.
> >> > > - kafka-compacted it is not just compacted, more than that, it still
> >> has
> >> > > the ability of CDC
> >> > > - upsert-kafka , upsert is back, and I don't really want to see it
> >> again
> >> > > since we have CDC
> >> > >
> >> > > Best,
> >> > > Jingsong
> >> > >
> >> > > On Fri, Oct 23, 2020 at 2:21 AM Timo Walther <twal...@apache.org>
> >> wrote:
> >> > >
> >> > > > Hi Jark,
> >> > > >
> >> > > > I would be fine with `connector=upsert-kafka`. Another idea would
> >> be to
> >> > > > align the name to other available Flink connectors [1]:
> >> > > >
> >> > > > `connector=kafka-cdc`.
> >> > > >
> >> > > > Regards,
> >> > > > Timo
> >> > > >
> >> > > > [1] https://github.com/ververica/flink-cdc-connectors
> >> > > >
> >> > > > On 22.10.20 17:17, Jark Wu wrote:
> >> > > > > Another name is "connector=upsert-kafka', I think this can solve
> >> > Timo's
> >> > > > > concern on the "compacted" word.
> >> > > > >
> >> > > > > Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify
> >> such
> >> > > > kafka
> >> > > > > sources.
> >> > > > > I think "upsert" is a well-known terminology widely used in many
> >> > > systems
> >> > > > > and matches the
> >> > > > >   behavior of how we handle the kafka messages.
> >> > > > >
> >> > > > > What do you think?
> >> > > > >
> >> > > > > Best,
> >> > > > > Jark
> >> > > > >
> >> > > > > [1]:
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > On Thu, 22 Oct 2020 at 22:53, Kurt Young <ykt...@gmail.com>
> >> wrote:
> >> > > > >
> >> > > > >> Good validation messages can't solve the broken user
> experience,
> >> > > > especially
> >> > > > >> that
> >> > > > >> such update mode option will implicitly make half of current
> >> kafka
> >> > > > options
> >> > > > >> invalid or doesn't
> >> > > > >> make sense.
> >> > > > >>
> >> > > > >> Best,
> >> > > > >> Kurt
> >> > > > >>
> >> > > > >>
> >> > > > >> On Thu, Oct 22, 2020 at 10:31 PM Jark Wu <imj...@gmail.com>
> >> wrote:
> >> > > > >>
> >> > > > >>> Hi Timo, Seth,
> >> > > > >>>
> >> > > > >>> The default value "inserting" of "mode" might be not suitable,
> >> > > > >>> because "debezium-json" emits changelog messages which include
> >> > > updates.
> >> > > > >>>
> >> > > > >>> On Thu, 22 Oct 2020 at 22:10, Seth Wiesman <
> s...@ververica.com>
> >> > > wrote:
> >> > > > >>>
> >> > > > >>>> +1 for supporting upsert results into Kafka.
> >> > > > >>>>
> >> > > > >>>> I have no comments on the implementation details.
> >> > > > >>>>
> >> > > > >>>> As far as configuration goes, I tend to favor Timo's option
> >> where
> >> > we
> >> > > > >> add
> >> > > > >>> a
> >> > > > >>>> "mode" property to the existing Kafka table with default
> value
> >> > > > >>> "inserting".
> >> > > > >>>> If the mode is set to "updating" then the validation changes
> to
> >> > the
> >> > > > new
> >> > > > >>>> requirements. I personally find it more intuitive than a
> >> seperate
> >> > > > >>>> connector, my fear is users won't understand its the same
> >> physical
> >> > > > >> kafka
> >> > > > >>>> sink under the hood and it will lead to other confusion like
> >> does
> >> > it
> >> > > > >>> offer
> >> > > > >>>> the same persistence guarantees? I think we are capable of
> >> adding
> >> > > good
> >> > > > >>>> valdiation messaging that solves Jark and Kurts concerns.
> >> > > > >>>>
> >> > > > >>>>
> >> > > > >>>> On Thu, Oct 22, 2020 at 8:51 AM Timo Walther <
> >> twal...@apache.org>
> >> > > > >> wrote:
> >> > > > >>>>
> >> > > > >>>>> Hi Jark,
> >> > > > >>>>>
> >> > > > >>>>> "calling it "kafka-compacted" can even remind users to
> enable
> >> log
> >> > > > >>>>> compaction"
> >> > > > >>>>>
> >> > > > >>>>> But sometimes users like to store a lineage of changes in
> >> their
> >> > > > >> topics.
> >> > > > >>>>> Indepent of any ktable/kstream interpretation.
> >> > > > >>>>>
> >> > > > >>>>> I let the majority decide on this topic to not further block
> >> this
> >> > > > >>>>> effort. But we might find a better name like:
> >> > > > >>>>>
> >> > > > >>>>> connector = kafka
> >> > > > >>>>> mode = updating/inserting
> >> > > > >>>>>
> >> > > > >>>>> OR
> >> > > > >>>>>
> >> > > > >>>>> connector = kafka-updating
> >> > > > >>>>>
> >> > > > >>>>> ...
> >> > > > >>>>>
> >> > > > >>>>> Regards,
> >> > > > >>>>> Timo
> >> > > > >>>>>
> >> > > > >>>>>
> >> > > > >>>>>
> >> > > > >>>>>
> >> > > > >>>>> On 22.10.20 15:24, Jark Wu wrote:
> >> > > > >>>>>> Hi Timo,
> >> > > > >>>>>>
> >> > > > >>>>>> Thanks for your opinions.
> >> > > > >>>>>>
> >> > > > >>>>>> 1) Implementation
> >> > > > >>>>>> We will have an stateful operator to generate INSERT and
> >> > > > >>> UPDATE_BEFORE.
> >> > > > >>>>>> This operator is keyby-ed (primary key as the shuffle key)
> >> after
> >> > > > >> the
> >> > > > >>>>> source
> >> > > > >>>>>> operator.
> >> > > > >>>>>> The implementation of this operator is very similar to the
> >> > > existing
> >> > > > >>>>>> `DeduplicateKeepLastRowFunction`.
> >> > > > >>>>>> The operator will register a value state using the primary
> >> key
> >> > > > >> fields
> >> > > > >>>> as
> >> > > > >>>>>> keys.
> >> > > > >>>>>> When the value state is empty under current key, we will
> emit
> >> > > > >> INSERT
> >> > > > >>>> for
> >> > > > >>>>>> the input row.
> >> > > > >>>>>> When the value state is not empty under current key, we
> will
> >> > emit
> >> > > > >>>>>> UPDATE_BEFORE using the row in state,
> >> > > > >>>>>> and emit UPDATE_AFTER using the input row.
> >> > > > >>>>>> When the input row is DELETE, we will clear state and emit
> >> > DELETE
> >> > > > >>> row.
> >> > > > >>>>>>
> >> > > > >>>>>> 2) new option vs new connector
> >> > > > >>>>>>> We recently simplified the table options to a minimum
> >> amount of
> >> > > > >>>>>> characters to be as concise as possible in the DDL.
> >> > > > >>>>>> I think this is the reason why we want to introduce a new
> >> > > > >> connector,
> >> > > > >>>>>> because we can simplify the options in DDL.
> >> > > > >>>>>> For example, if using a new option, the DDL may look like
> >> this:
> >> > > > >>>>>>
> >> > > > >>>>>> CREATE TABLE users (
> >> > > > >>>>>>     user_id BIGINT,
> >> > > > >>>>>>     user_name STRING,
> >> > > > >>>>>>     user_level STRING,
> >> > > > >>>>>>     region STRING,
> >> > > > >>>>>>     PRIMARY KEY (user_id) NOT ENFORCED
> >> > > > >>>>>> ) WITH (
> >> > > > >>>>>>     'connector' = 'kafka',
> >> > > > >>>>>>     'model' = 'table',
> >> > > > >>>>>>     'topic' = 'pageviews_per_region',
> >> > > > >>>>>>     'properties.bootstrap.servers' = '...',
> >> > > > >>>>>>     'properties.group.id' = 'testGroup',
> >> > > > >>>>>>     'scan.startup.mode' = 'earliest',
> >> > > > >>>>>>     'key.format' = 'csv',
> >> > > > >>>>>>     'key.fields' = 'user_id',
> >> > > > >>>>>>     'value.format' = 'avro',
> >> > > > >>>>>>     'sink.partitioner' = 'hash'
> >> > > > >>>>>> );
> >> > > > >>>>>>
> >> > > > >>>>>> If using a new connector, we can have a different default
> >> value
> >> > > for
> >> > > > >>> the
> >> > > > >>>>>> options and remove unnecessary options,
> >> > > > >>>>>> the DDL can look like this which is much more concise:
> >> > > > >>>>>>
> >> > > > >>>>>> CREATE TABLE pageviews_per_region (
> >> > > > >>>>>>     user_id BIGINT,
> >> > > > >>>>>>     user_name STRING,
> >> > > > >>>>>>     user_level STRING,
> >> > > > >>>>>>     region STRING,
> >> > > > >>>>>>     PRIMARY KEY (user_id) NOT ENFORCED
> >> > > > >>>>>> ) WITH (
> >> > > > >>>>>>     'connector' = 'kafka-compacted',
> >> > > > >>>>>>     'topic' = 'pageviews_per_region',
> >> > > > >>>>>>     'properties.bootstrap.servers' = '...',
> >> > > > >>>>>>     'key.format' = 'csv',
> >> > > > >>>>>>     'value.format' = 'avro'
> >> > > > >>>>>> );
> >> > > > >>>>>>
> >> > > > >>>>>>> When people read `connector=kafka-compacted` they might
> not
> >> > know
> >> > > > >>> that
> >> > > > >>>> it
> >> > > > >>>>>>> has ktable semantics. You don't need to enable log
> >> compaction
> >> > in
> >> > > > >>> order
> >> > > > >>>>>>> to use a KTable as far as I know.
> >> > > > >>>>>> We don't need to let users know it has ktable semantics, as
> >> > > > >>> Konstantin
> >> > > > >>>>>> mentioned this may carry more implicit
> >> > > > >>>>>> meaning than we want to imply here. I agree users don't
> need
> >> to
> >> > > > >>> enable
> >> > > > >>>>> log
> >> > > > >>>>>> compaction, but from the production perspective,
> >> > > > >>>>>> log compaction should always be enabled if it is used in
> this
> >> > > > >>> purpose.
> >> > > > >>>>>> Calling it "kafka-compacted" can even remind users to
> enable
> >> log
> >> > > > >>>>> compaction.
> >> > > > >>>>>>
> >> > > > >>>>>> I don't agree to introduce "model = table/stream" option,
> or
> >> > > > >>>>>> "connector=kafka-table",
> >> > > > >>>>>> because this means we are introducing Table vs Stream
> concept
> >> > from
> >> > > > >>>> KSQL.
> >> > > > >>>>>> However, we don't have such top-level concept in Flink SQL
> >> now,
> >> > > > >> this
> >> > > > >>>> will
> >> > > > >>>>>> further confuse users.
> >> > > > >>>>>> In Flink SQL, all the things are STREAM, the differences
> are
> >> > > > >> whether
> >> > > > >>> it
> >> > > > >>>>> is
> >> > > > >>>>>> bounded or unbounded,
> >> > > > >>>>>>    whether it is insert-only or changelog.
> >> > > > >>>>>>
> >> > > > >>>>>>
> >> > > > >>>>>> Best,
> >> > > > >>>>>> Jark
> >> > > > >>>>>>
> >> > > > >>>>>>
> >> > > > >>>>>> On Thu, 22 Oct 2020 at 20:39, Timo Walther <
> >> twal...@apache.org>
> >> > > > >>> wrote:
> >> > > > >>>>>>
> >> > > > >>>>>>> Hi Shengkai, Hi Jark,
> >> > > > >>>>>>>
> >> > > > >>>>>>> thanks for this great proposal. It is time to finally
> >> connect
> >> > the
> >> > > > >>>>>>> changelog processor with a compacted Kafka topic.
> >> > > > >>>>>>>
> >> > > > >>>>>>> "The operator will produce INSERT rows, or additionally
> >> > generate
> >> > > > >>>>>>> UPDATE_BEFORE rows for the previous image, or produce
> DELETE
> >> > rows
> >> > > > >>> with
> >> > > > >>>>>>> all columns filled with values."
> >> > > > >>>>>>>
> >> > > > >>>>>>> Could you elaborate a bit on the implementation details in
> >> the
> >> > > > >> FLIP?
> >> > > > >>>> How
> >> > > > >>>>>>> are UPDATE_BEFOREs are generated. How much state is
> >> required to
> >> > > > >>>> perform
> >> > > > >>>>>>> this operation.
> >> > > > >>>>>>>
> >> > > > >>>>>>>    From a conceptual and semantical point of view, I'm
> fine
> >> > with
> >> > > > >> the
> >> > > > >>>>>>> proposal. But I would like to share my opinion about how
> we
> >> > > expose
> >> > > > >>>> this
> >> > > > >>>>>>> feature:
> >> > > > >>>>>>>
> >> > > > >>>>>>> ktable vs kafka-compacted
> >> > > > >>>>>>>
> >> > > > >>>>>>> I'm against having an additional connector like `ktable`
> or
> >> > > > >>>>>>> `kafka-compacted`. We recently simplified the table
> options
> >> to
> >> > a
> >> > > > >>>> minimum
> >> > > > >>>>>>> amount of characters to be as concise as possible in the
> >> DDL.
> >> > > > >>>> Therefore,
> >> > > > >>>>>>> I would keep the `connector=kafka` and introduce an
> >> additional
> >> > > > >>> option.
> >> > > > >>>>>>> Because a user wants to read "from Kafka". And the "how"
> >> should
> >> > > be
> >> > > > >>>>>>> determined in the lower options.
> >> > > > >>>>>>>
> >> > > > >>>>>>> When people read `connector=ktable` they might not know
> that
> >> > this
> >> > > > >> is
> >> > > > >>>>>>> Kafka. Or they wonder where `kstream` is?
> >> > > > >>>>>>>
> >> > > > >>>>>>> When people read `connector=kafka-compacted` they might
> not
> >> > know
> >> > > > >>> that
> >> > > > >>>> it
> >> > > > >>>>>>> has ktable semantics. You don't need to enable log
> >> compaction
> >> > in
> >> > > > >>> order
> >> > > > >>>>>>> to use a KTable as far as I know. Log compaction and table
> >> > > > >> semantics
> >> > > > >>>> are
> >> > > > >>>>>>> orthogonal topics.
> >> > > > >>>>>>>
> >> > > > >>>>>>> In the end we will need 3 types of information when
> >> declaring a
> >> > > > >>> Kafka
> >> > > > >>>>>>> connector:
> >> > > > >>>>>>>
> >> > > > >>>>>>> CREATE TABLE ... WITH (
> >> > > > >>>>>>>      connector=kafka        -- Some information about the
> >> > > connector
> >> > > > >>>>>>>      end-offset = XXXX      -- Some information about the
> >> > > > >> boundedness
> >> > > > >>>>>>>      model = table/stream   -- Some information about
> >> > > > >> interpretation
> >> > > > >>>>>>> )
> >> > > > >>>>>>>
> >> > > > >>>>>>>
> >> > > > >>>>>>> We can still apply all the constraints mentioned in the
> >> FLIP.
> >> > > When
> >> > > > >>>>>>> `model` is set to `table`.
> >> > > > >>>>>>>
> >> > > > >>>>>>> What do you think?
> >> > > > >>>>>>>
> >> > > > >>>>>>> Regards,
> >> > > > >>>>>>> Timo
> >> > > > >>>>>>>
> >> > > > >>>>>>>
> >> > > > >>>>>>> On 21.10.20 14:19, Jark Wu wrote:
> >> > > > >>>>>>>> Hi,
> >> > > > >>>>>>>>
> >> > > > >>>>>>>> IMO, if we are going to mix them in one connector,
> >> > > > >>>>>>>> 1) either users need to set some options to a specific
> >> value
> >> > > > >>>>> explicitly,
> >> > > > >>>>>>>> e.g. "scan.startup.mode=earliest",
> "sink.partitioner=hash",
> >> > > etc..
> >> > > > >>>>>>>> This makes the connector awkward to use. Users may face
> to
> >> fix
> >> > > > >>>> options
> >> > > > >>>>>>> one
> >> > > > >>>>>>>> by one according to the exception.
> >> > > > >>>>>>>> Besides, in the future, it is still possible to use
> >> > > > >>>>>>>> "sink.partitioner=fixed" (reduce network cost) if users
> are
> >> > > aware
> >> > > > >>> of
> >> > > > >>>>>>>> the partition routing,
> >> > > > >>>>>>>> however, it's error-prone to have "fixed" as default for
> >> > > > >> compacted
> >> > > > >>>>> mode.
> >> > > > >>>>>>>>
> >> > > > >>>>>>>> 2) or make those options a different default value when
> >> > > > >>>>> "compacted=true".
> >> > > > >>>>>>>> This would be more confusing and unpredictable if the
> >> default
> >> > > > >> value
> >> > > > >>>> of
> >> > > > >>>>>>>> options will change according to other options.
> >> > > > >>>>>>>> What happens if we have a third mode in the future?
> >> > > > >>>>>>>>
> >> > > > >>>>>>>> In terms of usage and options, it's very different from
> the
> >> > > > >>>>>>>> original "kafka" connector.
> >> > > > >>>>>>>> It would be more handy to use and less fallible if
> >> separating
> >> > > > >> them
> >> > > > >>>> into
> >> > > > >>>>>>> two
> >> > > > >>>>>>>> connectors.
> >> > > > >>>>>>>> In the implementation layer, we can reuse code as much as
> >> > > > >> possible.
> >> > > > >>>>>>>>
> >> > > > >>>>>>>> Therefore, I'm still +1 to have a new connector.
> >> > > > >>>>>>>> The "kafka-compacted" name sounds good to me.
> >> > > > >>>>>>>>
> >> > > > >>>>>>>> Best,
> >> > > > >>>>>>>> Jark
> >> > > > >>>>>>>>
> >> > > > >>>>>>>>
> >> > > > >>>>>>>> On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf <
> >> > > > >> kna...@apache.org>
> >> > > > >>>>>>> wrote:
> >> > > > >>>>>>>>
> >> > > > >>>>>>>>> Hi Kurt, Hi Shengkai,
> >> > > > >>>>>>>>>
> >> > > > >>>>>>>>> thanks for answering my questions and the additional
> >> > > > >>>> clarifications. I
> >> > > > >>>>>>>>> don't have a strong opinion on whether to extend the
> >> "kafka"
> >> > > > >>>> connector
> >> > > > >>>>>>> or
> >> > > > >>>>>>>>> to introduce a new connector. So, from my perspective
> feel
> >> > free
> >> > > > >> to
> >> > > > >>>> go
> >> > > > >>>>>>> with
> >> > > > >>>>>>>>> a separate connector. If we do introduce a new
> connector I
> >> > > > >>> wouldn't
> >> > > > >>>>>>> call it
> >> > > > >>>>>>>>> "ktable" for aforementioned reasons (In addition, we
> might
> >> > > > >> suggest
> >> > > > >>>>> that
> >> > > > >>>>>>>>> there is also a "kstreams" connector for symmetry
> >> reasons). I
> >> > > > >>> don't
> >> > > > >>>>>>> have a
> >> > > > >>>>>>>>> good alternative name, though, maybe "kafka-compacted"
> or
> >> > > > >>>>>>>>> "compacted-kafka".
> >> > > > >>>>>>>>>
> >> > > > >>>>>>>>> Thanks,
> >> > > > >>>>>>>>>
> >> > > > >>>>>>>>> Konstantin
> >> > > > >>>>>>>>>
> >> > > > >>>>>>>>>
> >> > > > >>>>>>>>> On Wed, Oct 21, 2020 at 4:43 AM Kurt Young <
> >> ykt...@gmail.com
> >> > >
> >> > > > >>>> wrote:
> >> > > > >>>>>>>>>
> >> > > > >>>>>>>>>> Hi all,
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>> I want to describe the discussion process which drove
> us
> >> to
> >> > > > >> have
> >> > > > >>>> such
> >> > > > >>>>>>>>>> conclusion, this might make some of
> >> > > > >>>>>>>>>> the design choices easier to understand and keep
> >> everyone on
> >> > > > >> the
> >> > > > >>>> same
> >> > > > >>>>>>>>> page.
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>> Back to the motivation, what functionality do we want
> to
> >> > > > >> provide
> >> > > > >>> in
> >> > > > >>>>> the
> >> > > > >>>>>>>>>> first place? We got a lot of feedback and
> >> > > > >>>>>>>>>> questions from mailing lists that people want to write
> >> > > > >>>>> Not-Insert-Only
> >> > > > >>>>>>>>>> messages into kafka. They might be
> >> > > > >>>>>>>>>> intentional or by accident, e.g. wrote an non-windowed
> >> > > > >> aggregate
> >> > > > >>>>> query
> >> > > > >>>>>>> or
> >> > > > >>>>>>>>>> non-windowed left outer join. And
> >> > > > >>>>>>>>>> some users from KSQL world also asked about why Flink
> >> didn't
> >> > > > >>>> leverage
> >> > > > >>>>>>> the
> >> > > > >>>>>>>>>> Key concept of every kafka topic
> >> > > > >>>>>>>>>> and make kafka as a dynamic changing keyed table.
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>> To work with kafka better, we were thinking to extend
> the
> >> > > > >>>>> functionality
> >> > > > >>>>>>>>> of
> >> > > > >>>>>>>>>> the current kafka connector by letting it
> >> > > > >>>>>>>>>> accept updates and deletions. But due to the limitation
> >> of
> >> > > > >> kafka,
> >> > > > >>>> the
> >> > > > >>>>>>>>>> update has to be "update by key", aka a table
> >> > > > >>>>>>>>>> with primary key.
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>> This introduces a couple of conflicts with current
> kafka
> >> > > > >> table's
> >> > > > >>>>>>> options:
> >> > > > >>>>>>>>>> 1. key.fields: as said above, we need the kafka table
> to
> >> > have
> >> > > > >> the
> >> > > > >>>>>>> primary
> >> > > > >>>>>>>>>> key constraint. And users can also configure
> >> > > > >>>>>>>>>> key.fields freely, this might cause friction. (Sure we
> >> can
> >> > do
> >> > > > >>> some
> >> > > > >>>>>>> sanity
> >> > > > >>>>>>>>>> check on this but it also creates friction.)
> >> > > > >>>>>>>>>> 2. sink.partitioner: to make the semantics right, we
> >> need to
> >> > > > >> make
> >> > > > >>>>> sure
> >> > > > >>>>>>>>> all
> >> > > > >>>>>>>>>> the updates on the same key are written to
> >> > > > >>>>>>>>>> the same kafka partition, such we should force to use a
> >> hash
> >> > > by
> >> > > > >>> key
> >> > > > >>>>>>>>>> partition inside such table. Again, this has conflicts
> >> > > > >>>>>>>>>> and creates friction with current user options.
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>> The above things are solvable, though not perfect or
> most
> >> > user
> >> > > > >>>>>>> friendly.
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>> Let's take a look at the reading side. The keyed kafka
> >> table
> >> > > > >>>> contains
> >> > > > >>>>>>> two
> >> > > > >>>>>>>>>> kinds of messages: upsert or deletion. What upsert
> >> > > > >>>>>>>>>> means is "If the key doesn't exist yet, it's an insert
> >> > record.
> >> > > > >>>>>>> Otherwise
> >> > > > >>>>>>>>>> it's an update record". For the sake of correctness or
> >> > > > >>>>>>>>>> simplicity, the Flink SQL engine also needs such
> >> > information.
> >> > > > >> If
> >> > > > >>> we
> >> > > > >>>>>>>>>> interpret all messages to "update record", some queries
> >> or
> >> > > > >>>>>>>>>> operators may not work properly. It's weird to see an
> >> update
> >> > > > >>> record
> >> > > > >>>>> but
> >> > > > >>>>>>>>> you
> >> > > > >>>>>>>>>> haven't seen the insert record before.
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>> So what Flink should do is after reading out the
> records
> >> > from
> >> > > > >>> such
> >> > > > >>>>>>> table,
> >> > > > >>>>>>>>>> it needs to create a state to record which messages
> have
> >> > > > >>>>>>>>>> been seen and then generate the correct row type
> >> > > > >> correspondingly.
> >> > > > >>>>> This
> >> > > > >>>>>>>>> kind
> >> > > > >>>>>>>>>> of couples the state and the data of the message
> >> > > > >>>>>>>>>> queue, and it also creates conflicts with current kafka
> >> > > > >>> connector.
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>> Think about if users suspend a running job (which
> >> contains
> >> > > some
> >> > > > >>>>> reading
> >> > > > >>>>>>>>>> state now), and then change the start offset of the
> >> reader.
> >> > > > >>>>>>>>>> By changing the reading offset, it actually change the
> >> whole
> >> > > > >>> story
> >> > > > >>>> of
> >> > > > >>>>>>>>>> "which records should be insert messages and which
> >> records
> >> > > > >>>>>>>>>> should be update messages). And it will also make Flink
> >> to
> >> > > deal
> >> > > > >>>> with
> >> > > > >>>>>>>>>> another weird situation that it might receive a
> deletion
> >> > > > >>>>>>>>>> on a non existing message.
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>> We were unsatisfied with all the frictions and
> conflicts
> >> it
> >> > > > >> will
> >> > > > >>>>> create
> >> > > > >>>>>>>>> if
> >> > > > >>>>>>>>>> we enable the "upsert & deletion" support to the
> current
> >> > kafka
> >> > > > >>>>>>>>>> connector. And later we begin to realize that we
> >> shouldn't
> >> > > > >> treat
> >> > > > >>> it
> >> > > > >>>>> as
> >> > > > >>>>>>> a
> >> > > > >>>>>>>>>> normal message queue, but should treat it as a changing
> >> > keyed
> >> > > > >>>>>>>>>> table. We should be able to always get the whole data
> of
> >> > such
> >> > > > >>> table
> >> > > > >>>>> (by
> >> > > > >>>>>>>>>> disabling the start offset option) and we can also read
> >> the
> >> > > > >>>>>>>>>> changelog out of such table. It's like a HBase table
> with
> >> > > > >> binlog
> >> > > > >>>>>>> support
> >> > > > >>>>>>>>>> but doesn't have random access capability (which can be
> >> > > > >> fulfilled
> >> > > > >>>>>>>>>> by Flink's state).
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>> So our intention was instead of telling and persuading
> >> users
> >> > > > >> what
> >> > > > >>>>> kind
> >> > > > >>>>>>> of
> >> > > > >>>>>>>>>> options they should or should not use by extending
> >> > > > >>>>>>>>>> current kafka connector when enable upsert support, we
> >> are
> >> > > > >>> actually
> >> > > > >>>>>>>>> create
> >> > > > >>>>>>>>>> a whole new and different connector that has total
> >> > > > >>>>>>>>>> different abstractions in SQL layer, and should be
> >> treated
> >> > > > >>> totally
> >> > > > >>>>>>>>>> different with current kafka connector.
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>> Hope this can clarify some of the concerns.
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>> Best,
> >> > > > >>>>>>>>>> Kurt
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>> On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang <
> >> > > > >> fskm...@gmail.com
> >> > > > >>>>
> >> > > > >>>>>>> wrote:
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>>> Hi devs,
> >> > > > >>>>>>>>>>>
> >> > > > >>>>>>>>>>> As many people are still confused about the difference
> >> > option
> >> > > > >>>>>>>>> behaviours
> >> > > > >>>>>>>>>>> between the Kafka connector and KTable connector, Jark
> >> and
> >> > I
> >> > > > >>> list
> >> > > > >>>>> the
> >> > > > >>>>>>>>>>> differences in the doc[1].
> >> > > > >>>>>>>>>>>
> >> > > > >>>>>>>>>>> Best,
> >> > > > >>>>>>>>>>> Shengkai
> >> > > > >>>>>>>>>>>
> >> > > > >>>>>>>>>>> [1]
> >> > > > >>>>>>>>>>>
> >> > > > >>>>>>>>>>>
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>
> >> > > > >>>>>>>
> >> > > > >>>>>
> >> > > > >>>>
> >> > > > >>>
> >> > > > >>
> >> > > >
> >> > >
> >> >
> >>
> https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit
> >> > > > >>>>>>>>>>>
> >> > > > >>>>>>>>>>> Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二
> >> > 下午12:05写道:
> >> > > > >>>>>>>>>>>
> >> > > > >>>>>>>>>>>> Hi Konstantin,
> >> > > > >>>>>>>>>>>>
> >> > > > >>>>>>>>>>>> Thanks for your reply.
> >> > > > >>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>> It uses the "kafka" connector and does not specify a
> >> > > primary
> >> > > > >>>> key.
> >> > > > >>>>>>>>>>>> The dimensional table `users` is a ktable connector
> >> and we
> >> > > > >> can
> >> > > > >>>>>>>>> specify
> >> > > > >>>>>>>>>>> the
> >> > > > >>>>>>>>>>>> pk on the KTable.
> >> > > > >>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>> Will it possible to use a "ktable" as a dimensional
> >> table
> >> > > in
> >> > > > >>>>>>>>> FLIP-132
> >> > > > >>>>>>>>>>>> Yes. We can specify the watermark on the KTable and
> it
> >> can
> >> > > be
> >> > > > >>>> used
> >> > > > >>>>>>>>> as a
> >> > > > >>>>>>>>>>>> dimension table in temporal join.
> >> > > > >>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>> Introduce a new connector vs introduce a new
> property
> >> > > > >>>>>>>>>>>> The main reason behind is that the KTable connector
> >> almost
> >> > > > >> has
> >> > > > >>> no
> >> > > > >>>>>>>>>> common
> >> > > > >>>>>>>>>>>> options with the Kafka connector. The options that
> can
> >> be
> >> > > > >>> reused
> >> > > > >>>> by
> >> > > > >>>>>>>>>>> KTable
> >> > > > >>>>>>>>>>>> connectors are 'topic',
> 'properties.bootstrap.servers'
> >> and
> >> > > > >>>>>>>>>>>> 'value.fields-include' . We can't set cdc format for
> >> > > > >>> 'key.format'
> >> > > > >>>>> and
> >> > > > >>>>>>>>>>>> 'value.format' in KTable connector now, which is
> >> > available
> >> > > > >> in
> >> > > > >>>>> Kafka
> >> > > > >>>>>>>>>>>> connector. Considering the difference between the
> >> options
> >> > we
> >> > > > >>> can
> >> > > > >>>>> use,
> >> > > > >>>>>>>>>>> it's
> >> > > > >>>>>>>>>>>> more suitable to introduce an another connector
> rather
> >> > than
> >> > > a
> >> > > > >>>>>>>>> property.
> >> > > > >>>>>>>>>>>>
> >> > > > >>>>>>>>>>>> We are also fine to use "compacted-kafka" as the name
> >> of
> >> > the
> >> > > > >>> new
> >> > > > >>>>>>>>>>>> connector. What do you think?
> >> > > > >>>>>>>>>>>>
> >> > > > >>>>>>>>>>>> Best,
> >> > > > >>>>>>>>>>>> Shengkai
> >> > > > >>>>>>>>>>>>
> >> > > > >>>>>>>>>>>> Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一
> >> > > > >> 下午10:15写道:
> >> > > > >>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>> Hi Shengkai,
> >> > > > >>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>> Thank you for driving this effort. I believe this a
> >> very
> >> > > > >>>> important
> >> > > > >>>>>>>>>>> feature
> >> > > > >>>>>>>>>>>>> for many users who use Kafka and Flink SQL
> together. A
> >> > few
> >> > > > >>>>> questions
> >> > > > >>>>>>>>>> and
> >> > > > >>>>>>>>>>>>> thoughts:
> >> > > > >>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>> * Is your example "Use KTable as a
> reference/dimension
> >> > > > >> table"
> >> > > > >>>>>>>>> correct?
> >> > > > >>>>>>>>>>> It
> >> > > > >>>>>>>>>>>>> uses the "kafka" connector and does not specify a
> >> primary
> >> > > > >> key.
> >> > > > >>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>> * Will it be possible to use a "ktable" table
> directly
> >> > as a
> >> > > > >>>>>>>>>> dimensional
> >> > > > >>>>>>>>>>>>> table in temporal join (*based on event time*)
> >> > (FLIP-132)?
> >> > > > >>> This
> >> > > > >>>> is
> >> > > > >>>>>>>>> not
> >> > > > >>>>>>>>>>>>> completely clear to me from the FLIP.
> >> > > > >>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>> * I'd personally prefer not to introduce a new
> >> connector
> >> > > and
> >> > > > >>>>> instead
> >> > > > >>>>>>>>>> to
> >> > > > >>>>>>>>>>>>> extend the Kafka connector. We could add an
> additional
> >> > > > >>> property
> >> > > > >>>>>>>>>>>>> "compacted"
> >> > > > >>>>>>>>>>>>> = "true"|"false". If it is set to "true", we can add
> >> > > > >>> additional
> >> > > > >>>>>>>>>>> validation
> >> > > > >>>>>>>>>>>>> logic (e.g. "scan.startup.mode" can not be set,
> >> primary
> >> > key
> >> > > > >>>>>>>>> required,
> >> > > > >>>>>>>>>>>>> etc.). If we stick to a separate connector I'd not
> >> call
> >> > it
> >> > > > >>>>> "ktable",
> >> > > > >>>>>>>>>> but
> >> > > > >>>>>>>>>>>>> rather "compacted-kafka" or similar. KTable seems to
> >> > carry
> >> > > > >>> more
> >> > > > >>>>>>>>>> implicit
> >> > > > >>>>>>>>>>>>> meaning than we want to imply here.
> >> > > > >>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>> * I agree that this is not a bounded source. If we
> >> want
> >> > to
> >> > > > >>>>> support a
> >> > > > >>>>>>>>>>>>> bounded mode, this is an orthogonal concern that
> also
> >> > > > >> applies
> >> > > > >>> to
> >> > > > >>>>>>>>> other
> >> > > > >>>>>>>>>>>>> unbounded sources.
> >> > > > >>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>> Best,
> >> > > > >>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>> Konstantin
> >> > > > >>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <
> >> > imj...@gmail.com>
> >> > > > >>>> wrote:
> >> > > > >>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>> Hi Danny,
> >> > > > >>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>> First of all, we didn't introduce any concepts from
> >> KSQL
> >> > > > >>> (e.g.
> >> > > > >>>>>>>>>> Stream
> >> > > > >>>>>>>>>>> vs
> >> > > > >>>>>>>>>>>>>> Table notion).
> >> > > > >>>>>>>>>>>>>> This new connector will produce a changelog stream,
> >> so
> >> > > it's
> >> > > > >>>> still
> >> > > > >>>>>>>>> a
> >> > > > >>>>>>>>>>>>> dynamic
> >> > > > >>>>>>>>>>>>>> table and doesn't conflict with Flink core
> concepts.
> >> > > > >>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>> The "ktable" is just a connector name, we can also
> >> call
> >> > it
> >> > > > >>>>>>>>>>>>>> "compacted-kafka" or something else.
> >> > > > >>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can
> >> > migrate
> >> > > > >> to
> >> > > > >>>>>>>>> Flink
> >> > > > >>>>>>>>>>> SQL
> >> > > > >>>>>>>>>>>>>> easily.
> >> > > > >>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>> Regarding to why introducing a new connector vs a
> new
> >> > > > >>> property
> >> > > > >>>> in
> >> > > > >>>>>>>>>>>>> existing
> >> > > > >>>>>>>>>>>>>> kafka connector:
> >> > > > >>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>> I think the main reason is that we want to have a
> >> clear
> >> > > > >>>>> separation
> >> > > > >>>>>>>>>> for
> >> > > > >>>>>>>>>>>>> such
> >> > > > >>>>>>>>>>>>>> two use cases, because they are very different.
> >> > > > >>>>>>>>>>>>>> We also listed reasons in the FLIP, including:
> >> > > > >>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>> 1) It's hard to explain what's the behavior when
> >> users
> >> > > > >>> specify
> >> > > > >>>>> the
> >> > > > >>>>>>>>>>> start
> >> > > > >>>>>>>>>>>>>> offset from a middle position (e.g. how to process
> >> non
> >> > > > >> exist
> >> > > > >>>>>>>>> delete
> >> > > > >>>>>>>>>>>>>> events).
> >> > > > >>>>>>>>>>>>>>        It's dangerous if users do that. So we don't
> >> > > provide
> >> > > > >>> the
> >> > > > >>>>>>>>> offset
> >> > > > >>>>>>>>>>>>> option
> >> > > > >>>>>>>>>>>>>> in the new connector at the moment.
> >> > > > >>>>>>>>>>>>>> 2) It's a different perspective/abstraction on the
> >> same
> >> > > > >> kafka
> >> > > > >>>>>>>>> topic
> >> > > > >>>>>>>>>>>>> (append
> >> > > > >>>>>>>>>>>>>> vs. upsert). It would be easier to understand if we
> >> can
> >> > > > >>>> separate
> >> > > > >>>>>>>>>> them
> >> > > > >>>>>>>>>>>>>>        instead of mixing them in one connector. The
> >> new
> >> > > > >>>> connector
> >> > > > >>>>>>>>>>> requires
> >> > > > >>>>>>>>>>>>>> hash sink partitioner, primary key declared,
> regular
> >> > > > >> format.
> >> > > > >>>>>>>>>>>>>>        If we mix them in one connector, it might be
> >> > > > >> confusing
> >> > > > >>>> how
> >> > > > >>>>> to
> >> > > > >>>>>>>>>> use
> >> > > > >>>>>>>>>>>>> the
> >> > > > >>>>>>>>>>>>>> options correctly.
> >> > > > >>>>>>>>>>>>>> 3) The semantic of the KTable connector is just the
> >> same
> >> > > as
> >> > > > >>>>> KTable
> >> > > > >>>>>>>>>> in
> >> > > > >>>>>>>>>>>>> Kafka
> >> > > > >>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and
> KSQL
> >> > > users.
> >> > > > >>>>>>>>>>>>>>        We have seen several questions in the
> mailing
> >> > list
> >> > > > >>> asking
> >> > > > >>>>> how
> >> > > > >>>>>>>>> to
> >> > > > >>>>>>>>>>>>> model
> >> > > > >>>>>>>>>>>>>> a KTable and how to join a KTable in Flink SQL.
> >> > > > >>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>> Best,
> >> > > > >>>>>>>>>>>>>> Jark
> >> > > > >>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 19:53, Jark Wu <
> >> imj...@gmail.com
> >> > >
> >> > > > >>>> wrote:
> >> > > > >>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>> Hi Jingsong,
> >> > > > >>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>> As the FLIP describes, "KTable connector produces
> a
> >> > > > >>> changelog
> >> > > > >>>>>>>>>>> stream,
> >> > > > >>>>>>>>>>>>>>> where each data record represents an update or
> >> delete
> >> > > > >>> event.".
> >> > > > >>>>>>>>>>>>>>> Therefore, a ktable source is an unbounded stream
> >> > source.
> >> > > > >>>>>>>>>> Selecting
> >> > > > >>>>>>>>>>> a
> >> > > > >>>>>>>>>>>>>>> ktable source is similar to selecting a kafka
> source
> >> > with
> >> > > > >>>>>>>>>>>>> debezium-json
> >> > > > >>>>>>>>>>>>>>> format
> >> > > > >>>>>>>>>>>>>>> that it never ends and the results are
> continuously
> >> > > > >> updated.
> >> > > > >>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>> It's possible to have a bounded ktable source in
> the
> >> > > > >> future,
> >> > > > >>>> for
> >> > > > >>>>>>>>>>>>> example,
> >> > > > >>>>>>>>>>>>>>> add an option 'bounded=true' or 'end-offset=xxx'.
> >> > > > >>>>>>>>>>>>>>> In this way, the ktable will produce a bounded
> >> > changelog
> >> > > > >>>> stream.
> >> > > > >>>>>>>>>>>>>>> So I think this can be a compatible feature in the
> >> > > future.
> >> > > > >>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>> I don't think we should associate with ksql
> related
> >> > > > >>> concepts.
> >> > > > >>>>>>>>>>>>> Actually,
> >> > > > >>>>>>>>>>>>>> we
> >> > > > >>>>>>>>>>>>>>> didn't introduce any concepts from KSQL (e.g.
> >> Stream vs
> >> > > > >>> Table
> >> > > > >>>>>>>>>>> notion).
> >> > > > >>>>>>>>>>>>>>> The "ktable" is just a connector name, we can also
> >> call
> >> > > it
> >> > > > >>>>>>>>>>>>>>> "compacted-kafka" or something else.
> >> > > > >>>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can
> >> > > migrate
> >> > > > >>> to
> >> > > > >>>>>>>>>> Flink
> >> > > > >>>>>>>>>>>>> SQL
> >> > > > >>>>>>>>>>>>>>> easily.
> >> > > > >>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>> Regarding the "value.fields-include", this is an
> >> option
> >> > > > >>>>>>>>> introduced
> >> > > > >>>>>>>>>>> in
> >> > > > >>>>>>>>>>>>>>> FLIP-107 for Kafka connector.
> >> > > > >>>>>>>>>>>>>>> I think we should keep the same behavior with the
> >> Kafka
> >> > > > >>>>>>>>> connector.
> >> > > > >>>>>>>>>>> I'm
> >> > > > >>>>>>>>>>>>>> not
> >> > > > >>>>>>>>>>>>>>> sure what's the default behavior of KSQL.
> >> > > > >>>>>>>>>>>>>>> But I guess it also stores the keys in value from
> >> this
> >> > > > >>> example
> >> > > > >>>>>>>>>> docs
> >> > > > >>>>>>>>>>>>> (see
> >> > > > >>>>>>>>>>>>>>> the "users_original" table) [1].
> >> > > > >>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>> Best,
> >> > > > >>>>>>>>>>>>>>> Jark
> >> > > > >>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>> [1]:
> >> > > > >>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>
> >> > > > >>>>>>>
> >> > > > >>>>>
> >> > > > >>>>
> >> > > > >>>
> >> > > > >>
> >> > > >
> >> > >
> >> >
> >>
> https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table
> >> > > > >>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 18:17, Danny Chan <
> >> > > > >>>> yuzhao....@gmail.com>
> >> > > > >>>>>>>>>>>>> wrote:
> >> > > > >>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>> The concept seems conflicts with the Flink
> >> abstraction
> >> > > > >>>> “dynamic
> >> > > > >>>>>>>>>>>>> table”,
> >> > > > >>>>>>>>>>>>>>>> in Flink we see both “stream” and “table” as a
> >> dynamic
> >> > > > >>> table,
> >> > > > >>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>> I think we should make clear first how to express
> >> > stream
> >> > > > >>> and
> >> > > > >>>>>>>>>> table
> >> > > > >>>>>>>>>>>>>>>> specific features on one “dynamic table”,
> >> > > > >>>>>>>>>>>>>>>> it is more natural for KSQL because KSQL takes
> >> stream
> >> > > and
> >> > > > >>>> table
> >> > > > >>>>>>>>>> as
> >> > > > >>>>>>>>>>>>>>>> different abstractions for representing
> >> collections.
> >> > In
> >> > > > >>> KSQL,
> >> > > > >>>>>>>>>> only
> >> > > > >>>>>>>>>>>>>> table is
> >> > > > >>>>>>>>>>>>>>>> mutable and can have a primary key.
> >> > > > >>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>> Does this connector belongs to the “table” scope
> or
> >> > > > >>> “stream”
> >> > > > >>>>>>>>>> scope
> >> > > > >>>>>>>>>>> ?
> >> > > > >>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>> Some of the concepts (such as the primary key on
> >> > stream)
> >> > > > >>>> should
> >> > > > >>>>>>>>>> be
> >> > > > >>>>>>>>>>>>>>>> suitable for all the connectors, not just Kafka,
> >> > > > >> Shouldn’t
> >> > > > >>>> this
> >> > > > >>>>>>>>>> be
> >> > > > >>>>>>>>>>> an
> >> > > > >>>>>>>>>>>>>>>> extension of existing Kafka connector instead of
> a
> >> > > > >> totally
> >> > > > >>>> new
> >> > > > >>>>>>>>>>>>>> connector ?
> >> > > > >>>>>>>>>>>>>>>> What about the other connectors ?
> >> > > > >>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>> Because this touches the core abstraction of
> >> Flink, we
> >> > > > >>> better
> >> > > > >>>>>>>>>> have
> >> > > > >>>>>>>>>>> a
> >> > > > >>>>>>>>>>>>>>>> top-down overall design, following the KSQL
> >> directly
> >> > is
> >> > > > >> not
> >> > > > >>>> the
> >> > > > >>>>>>>>>>>>> answer.
> >> > > > >>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>> P.S. For the source
> >> > > > >>>>>>>>>>>>>>>>> Shouldn’t this be an extension of existing Kafka
> >> > > > >> connector
> >> > > > >>>>>>>>>>> instead
> >> > > > >>>>>>>>>>>>> of
> >> > > > >>>>>>>>>>>>>> a
> >> > > > >>>>>>>>>>>>>>>> totally new connector ?
> >> > > > >>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>> How could we achieve that (e.g. set up the
> >> parallelism
> >> > > > >>>>>>>>>> correctly) ?
> >> > > > >>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>> Best,
> >> > > > >>>>>>>>>>>>>>>> Danny Chan
> >> > > > >>>>>>>>>>>>>>>> 在 2020年10月19日 +0800 PM5:17,Jingsong Li <
> >> > > > >>>> jingsongl...@gmail.com
> >> > > > >>>>>>>>>>>> ,写道:
> >> > > > >>>>>>>>>>>>>>>>> Thanks Shengkai for your proposal.
> >> > > > >>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>> +1 for this feature.
> >> > > > >>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>>> Future Work: Support bounded KTable source
> >> > > > >>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>> I don't think it should be a future work, I
> think
> >> it
> >> > is
> >> > > > >>> one
> >> > > > >>>>>>>>> of
> >> > > > >>>>>>>>>>> the
> >> > > > >>>>>>>>>>>>>>>>> important concepts of this FLIP. We need to
> >> > understand
> >> > > > >> it
> >> > > > >>>>>>>>> now.
> >> > > > >>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>> Intuitively, a ktable in my opinion is a bounded
> >> > table
> >> > > > >>>> rather
> >> > > > >>>>>>>>>>> than
> >> > > > >>>>>>>>>>>>> a
> >> > > > >>>>>>>>>>>>>>>>> stream, so select should produce a bounded table
> >> by
> >> > > > >>> default.
> >> > > > >>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>> I think we can list Kafka related knowledge,
> >> because
> >> > > the
> >> > > > >>>> word
> >> > > > >>>>>>>>>>>>> `ktable`
> >> > > > >>>>>>>>>>>>>>>> is
> >> > > > >>>>>>>>>>>>>>>>> easy to associate with ksql related concepts.
> (If
> >> > > > >>> possible,
> >> > > > >>>>>>>>>> it's
> >> > > > >>>>>>>>>>>>>> better
> >> > > > >>>>>>>>>>>>>>>> to
> >> > > > >>>>>>>>>>>>>>>>> unify with it)
> >> > > > >>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>> What do you think?
> >> > > > >>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>>> value.fields-include
> >> > > > >>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>> What about the default behavior of KSQL?
> >> > > > >>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>> Best,
> >> > > > >>>>>>>>>>>>>>>>> Jingsong
> >> > > > >>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>> On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang <
> >> > > > >>>>>>>>>> fskm...@gmail.com
> >> > > > >>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>> wrote:
> >> > > > >>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>>> Hi, devs.
> >> > > > >>>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>>> Jark and I want to start a new FLIP to
> introduce
> >> the
> >> > > > >>> KTable
> >> > > > >>>>>>>>>>>>>>>> connector. The
> >> > > > >>>>>>>>>>>>>>>>>> KTable is a shortcut of "Kafka Table", it also
> >> has
> >> > the
> >> > > > >>> same
> >> > > > >>>>>>>>>>>>>> semantics
> >> > > > >>>>>>>>>>>>>>>> with
> >> > > > >>>>>>>>>>>>>>>>>> the KTable notion in Kafka Stream.
> >> > > > >>>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>>> FLIP-149:
> >> > > > >>>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>
> >> > > > >>>>>>>
> >> > > > >>>>>
> >> > > > >>>>
> >> > > > >>>
> >> > > > >>
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector
> >> > > > >>>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>>> Currently many users have expressed their needs
> >> for
> >> > > the
> >> > > > >>>>>>>>>> upsert
> >> > > > >>>>>>>>>>>>> Kafka
> >> > > > >>>>>>>>>>>>>>>> by
> >> > > > >>>>>>>>>>>>>>>>>> mail lists and issues. The KTable connector has
> >> > > several
> >> > > > >>>>>>>>>>> benefits
> >> > > > >>>>>>>>>>>>> for
> >> > > > >>>>>>>>>>>>>>>> users:
> >> > > > >>>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>>> 1. Users are able to interpret a compacted
> Kafka
> >> > Topic
> >> > > > >> as
> >> > > > >>>>>>>>> an
> >> > > > >>>>>>>>>>>>> upsert
> >> > > > >>>>>>>>>>>>>>>> stream
> >> > > > >>>>>>>>>>>>>>>>>> in Apache Flink. And also be able to write a
> >> > changelog
> >> > > > >>>>>>>>> stream
> >> > > > >>>>>>>>>>> to
> >> > > > >>>>>>>>>>>>>> Kafka
> >> > > > >>>>>>>>>>>>>>>>>> (into a compacted topic).
> >> > > > >>>>>>>>>>>>>>>>>> 2. As a part of the real time pipeline, store
> >> join
> >> > or
> >> > > > >>>>>>>>>> aggregate
> >> > > > >>>>>>>>>>>>>>>> result (may
> >> > > > >>>>>>>>>>>>>>>>>> contain updates) into a Kafka topic for further
> >> > > > >>>>>>>>> calculation;
> >> > > > >>>>>>>>>>>>>>>>>> 3. The semantic of the KTable connector is just
> >> the
> >> > > > >> same
> >> > > > >>> as
> >> > > > >>>>>>>>>>>>> KTable
> >> > > > >>>>>>>>>>>>>> in
> >> > > > >>>>>>>>>>>>>>>> Kafka
> >> > > > >>>>>>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and
> >> KSQL
> >> > > > >>> users.
> >> > > > >>>>>>>>>> We
> >> > > > >>>>>>>>>>>>> have
> >> > > > >>>>>>>>>>>>>>>> seen
> >> > > > >>>>>>>>>>>>>>>>>> several questions in the mailing list asking
> how
> >> to
> >> > > > >>> model a
> >> > > > >>>>>>>>>>>>> KTable
> >> > > > >>>>>>>>>>>>>>>> and how
> >> > > > >>>>>>>>>>>>>>>>>> to join a KTable in Flink SQL.
> >> > > > >>>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>>> We hope it can expand the usage of the Flink
> with
> >> > > > >> Kafka.
> >> > > > >>>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>>> I'm looking forward to your feedback.
> >> > > > >>>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>>> Best,
> >> > > > >>>>>>>>>>>>>>>>>> Shengkai
> >> > > > >>>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>>> --
> >> > > > >>>>>>>>>>>>>>>>> Best, Jingsong Lee
> >> > > > >>>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>> --
> >> > > > >>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>> Konstantin Knauf
> >> > > > >>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>> https://twitter.com/snntrable
> >> > > > >>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>> https://github.com/knaufk
> >> > > > >>>>>>>>>>>>>
> >> > > > >>>>>>>>>>>>
> >> > > > >>>>>>>>>>>
> >> > > > >>>>>>>>>>
> >> > > > >>>>>>>>>
> >> > > > >>>>>>>>>
> >> > > > >>>>>>>>> --
> >> > > > >>>>>>>>>
> >> > > > >>>>>>>>> Konstantin Knauf
> >> > > > >>>>>>>>>
> >> > > > >>>>>>>>> https://twitter.com/snntrable
> >> > > > >>>>>>>>>
> >> > > > >>>>>>>>> https://github.com/knaufk
> >> > > > >>>>>>>>>
> >> > > > >>>>>>>>
> >> > > > >>>>>>>
> >> > > > >>>>>>>
> >> > > > >>>>>>
> >> > > > >>>>>
> >> > > > >>>>>
> >> > > > >>>>
> >> > > > >>>> --
> >> > > > >>>>
> >> > > > >>>> Seth Wiesman | Solutions Architect
> >> > > > >>>>
> >> > > > >>>> +1 314 387 1463
> >> > > > >>>>
> >> > > > >>>> <https://www.ververica.com/>
> >> > > > >>>>
> >> > > > >>>> Follow us @VervericaData
> >> > > > >>>>
> >> > > > >>>> --
> >> > > > >>>>
> >> > > > >>>> Join Flink Forward <https://flink-forward.org/> - The Apache
> >> > Flink
> >> > > > >>>> Conference
> >> > > > >>>>
> >> > > > >>>> Stream Processing | Event Driven | Real Time
> >> > > > >>>>
> >> > > > >>>
> >> > > > >>
> >> > > > >
> >> > > >
> >> > > >
> >> > >
> >> > > --
> >> > > Best, Jingsong Lee
> >> > >
> >> >
> >>
> >>
> >> --
> >> Best, Jingsong Lee
> >>
> >
>

Reply via email to