I see, I understand what you mean is avoiding the loss of historical data

Logically, another option is never clean up, so don't have to turn on
compact

I am OK with the implementation, It's that feeling shouldn't be a logical
limitation

Best,
Jingsong

On Fri, Oct 23, 2020 at 4:09 PM Kurt Young <ykt...@gmail.com> wrote:

> To be precise, it means the Kakfa topic should set the configuration
> "cleanup.policy" to "compact" not "delete".
>
> Best,
> Kurt
>
>
> On Fri, Oct 23, 2020 at 4:01 PM Jingsong Li <jingsongl...@gmail.com>
> wrote:
>
> > I just notice there is a limitation in the FLIP:
> >
> > > Generally speaking, the underlying topic of the upsert-kafka source
> must
> > be compacted. Besides, the underlying topic must have all the data with
> the
> > same key in the same partition, otherwise, the result will be wrong.
> >
> > According to my understanding, this is not accurate? Compact is an
> > optimization, not a limitation. It depends on users.
> >
> > I don't want to stop voting, just want to make it clear.
> >
> > Best,
> > Jingsong
> >
> > On Fri, Oct 23, 2020 at 3:16 PM Timo Walther <twal...@apache.org> wrote:
> >
> > > +1 for voting
> > >
> > > Regards,
> > > Timo
> > >
> > > On 23.10.20 09:07, Jark Wu wrote:
> > > > Thanks Shengkai!
> > > >
> > > > +1 to start voting.
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Fri, 23 Oct 2020 at 15:02, Shengkai Fang <fskm...@gmail.com>
> wrote:
> > > >
> > > >> Add one more message, I have already updated the FLIP[1].
> > > >>
> > > >> [1]
> > > >>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector
> > > >>
> > > >> Shengkai Fang <fskm...@gmail.com> 于2020年10月23日周五 下午2:55写道:
> > > >>
> > > >>> Hi, all.
> > > >>> It seems we have reached a consensus on the FLIP. If no one has
> other
> > > >>> objections, I would like to start the vote for FLIP-149.
> > > >>>
> > > >>> Best,
> > > >>> Shengkai
> > > >>>
> > > >>> Jingsong Li <jingsongl...@gmail.com> 于2020年10月23日周五 下午2:25写道:
> > > >>>
> > > >>>> Thanks for explanation,
> > > >>>>
> > > >>>> I am OK for `upsert`. Yes, Its concept has been accepted by many
> > > >> systems.
> > > >>>>
> > > >>>> Best,
> > > >>>> Jingsong
> > > >>>>
> > > >>>> On Fri, Oct 23, 2020 at 12:38 PM Jark Wu <imj...@gmail.com>
> wrote:
> > > >>>>
> > > >>>>> Hi Timo,
> > > >>>>>
> > > >>>>> I have some concerns about `kafka-cdc`,
> > > >>>>> 1) cdc is an abbreviation of Change Data Capture which is
> commonly
> > > >> used
> > > >>>> for
> > > >>>>> databases, not for message queues.
> > > >>>>> 2) usually, cdc produces full content of changelog, including
> > > >>>>> UPDATE_BEFORE, however "upsert kafka" doesn't
> > > >>>>> 3) `kafka-cdc` sounds like a natively support for `debezium-json`
> > > >>>> format,
> > > >>>>> however, it is not and even we don't want
> > > >>>>>     "upsert kafka" to support "debezium-json"
> > > >>>>>
> > > >>>>>
> > > >>>>> Hi Jingsong,
> > > >>>>>
> > > >>>>> I think the terminology of "upsert" is fine, because Kafka also
> > uses
> > > >>>>> "upsert" to define such behavior in their official documentation
> > [1]:
> > > >>>>>
> > > >>>>>> a data record in a changelog stream is interpreted as an UPSERT
> > aka
> > > >>>>> INSERT/UPDATE
> > > >>>>>
> > > >>>>> Materialize uses the "UPSERT" keyword to define such behavior too
> > > [2].
> > > >>>>> Users have been requesting such feature using "upsert kafka"
> > > >>>> terminology in
> > > >>>>> user mailing lists [3][4].
> > > >>>>> Many other systems support "UPSERT" statement natively, such as
> > > impala
> > > >>>> [5],
> > > >>>>> SAP [6], Phoenix [7], Oracle NoSQL [8], etc..
> > > >>>>>
> > > >>>>> Therefore, I think we don't need to be afraid of introducing
> > "upsert"
> > > >>>>> terminology, it is widely accepted by users.
> > > >>>>>
> > > >>>>> Best,
> > > >>>>> Jark
> > > >>>>>
> > > >>>>>
> > > >>>>> [1]:
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable
> > > >>>>> [2]:
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic
> > > >>>>> [3]:
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-materialized-upsert-tables-td18482.html#a18503
> > > >>>>> [4]:
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> http://apache-flink.147419.n8.nabble.com/Kafka-Sink-AppendStreamTableSink-doesn-t-support-consuming-update-changes-td5959.html
> > > >>>>> [5]:
> > > >>>>
> https://impala.apache.org/docs/build/html/topics/impala_upsert.html
> > > >>>>> [6]:
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> https://help.sap.com/viewer/7c78579ce9b14a669c1f3295b0d8ca16/Cloud/en-US/ea8b6773be584203bcd99da76844c5ed.html
> > > >>>>> [7]: https://phoenix.apache.org/atomic_upsert.html
> > > >>>>> [8]:
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> https://docs.oracle.com/en/database/other-databases/nosql-database/18.3/sqlfornosql/adding-table-rows-using-insert-and-upsert-statements.html
> > > >>>>>
> > > >>>>> On Fri, 23 Oct 2020 at 10:36, Jingsong Li <
> jingsongl...@gmail.com>
> > > >>>> wrote:
> > > >>>>>
> > > >>>>>> The `kafka-cdc` looks good to me.
> > > >>>>>> We can even give options to indicate whether to turn on compact,
> > > >>>> because
> > > >>>>>> compact is just an optimization?
> > > >>>>>>
> > > >>>>>> - ktable let me think about KSQL.
> > > >>>>>> - kafka-compacted it is not just compacted, more than that, it
> > still
> > > >>>> has
> > > >>>>>> the ability of CDC
> > > >>>>>> - upsert-kafka , upsert is back, and I don't really want to see
> it
> > > >>>> again
> > > >>>>>> since we have CDC
> > > >>>>>>
> > > >>>>>> Best,
> > > >>>>>> Jingsong
> > > >>>>>>
> > > >>>>>> On Fri, Oct 23, 2020 at 2:21 AM Timo Walther <
> twal...@apache.org>
> > > >>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi Jark,
> > > >>>>>>>
> > > >>>>>>> I would be fine with `connector=upsert-kafka`. Another idea
> would
> > > >>>> be to
> > > >>>>>>> align the name to other available Flink connectors [1]:
> > > >>>>>>>
> > > >>>>>>> `connector=kafka-cdc`.
> > > >>>>>>>
> > > >>>>>>> Regards,
> > > >>>>>>> Timo
> > > >>>>>>>
> > > >>>>>>> [1] https://github.com/ververica/flink-cdc-connectors
> > > >>>>>>>
> > > >>>>>>> On 22.10.20 17:17, Jark Wu wrote:
> > > >>>>>>>> Another name is "connector=upsert-kafka', I think this can
> solve
> > > >>>>> Timo's
> > > >>>>>>>> concern on the "compacted" word.
> > > >>>>>>>>
> > > >>>>>>>> Materialize also uses "ENVELOPE UPSERT" [1] keyword to
> identify
> > > >>>> such
> > > >>>>>>> kafka
> > > >>>>>>>> sources.
> > > >>>>>>>> I think "upsert" is a well-known terminology widely used in
> many
> > > >>>>>> systems
> > > >>>>>>>> and matches the
> > > >>>>>>>>    behavior of how we handle the kafka messages.
> > > >>>>>>>>
> > > >>>>>>>> What do you think?
> > > >>>>>>>>
> > > >>>>>>>> Best,
> > > >>>>>>>> Jark
> > > >>>>>>>>
> > > >>>>>>>> [1]:
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> On Thu, 22 Oct 2020 at 22:53, Kurt Young <ykt...@gmail.com>
> > > >>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Good validation messages can't solve the broken user
> > > >> experience,
> > > >>>>>>> especially
> > > >>>>>>>>> that
> > > >>>>>>>>> such update mode option will implicitly make half of current
> > > >>>> kafka
> > > >>>>>>> options
> > > >>>>>>>>> invalid or doesn't
> > > >>>>>>>>> make sense.
> > > >>>>>>>>>
> > > >>>>>>>>> Best,
> > > >>>>>>>>> Kurt
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> On Thu, Oct 22, 2020 at 10:31 PM Jark Wu <imj...@gmail.com>
> > > >>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Hi Timo, Seth,
> > > >>>>>>>>>>
> > > >>>>>>>>>> The default value "inserting" of "mode" might be not
> suitable,
> > > >>>>>>>>>> because "debezium-json" emits changelog messages which
> include
> > > >>>>>> updates.
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Thu, 22 Oct 2020 at 22:10, Seth Wiesman <
> > > >> s...@ververica.com>
> > > >>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> +1 for supporting upsert results into Kafka.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> I have no comments on the implementation details.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> As far as configuration goes, I tend to favor Timo's option
> > > >>>> where
> > > >>>>> we
> > > >>>>>>>>> add
> > > >>>>>>>>>> a
> > > >>>>>>>>>>> "mode" property to the existing Kafka table with default
> > > >> value
> > > >>>>>>>>>> "inserting".
> > > >>>>>>>>>>> If the mode is set to "updating" then the validation
> changes
> > > >> to
> > > >>>>> the
> > > >>>>>>> new
> > > >>>>>>>>>>> requirements. I personally find it more intuitive than a
> > > >>>> seperate
> > > >>>>>>>>>>> connector, my fear is users won't understand its the same
> > > >>>> physical
> > > >>>>>>>>> kafka
> > > >>>>>>>>>>> sink under the hood and it will lead to other confusion
> like
> > > >>>> does
> > > >>>>> it
> > > >>>>>>>>>> offer
> > > >>>>>>>>>>> the same persistence guarantees? I think we are capable of
> > > >>>> adding
> > > >>>>>> good
> > > >>>>>>>>>>> valdiation messaging that solves Jark and Kurts concerns.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On Thu, Oct 22, 2020 at 8:51 AM Timo Walther <
> > > >>>> twal...@apache.org>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Hi Jark,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> "calling it "kafka-compacted" can even remind users to
> > > >> enable
> > > >>>> log
> > > >>>>>>>>>>>> compaction"
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> But sometimes users like to store a lineage of changes in
> > > >>>> their
> > > >>>>>>>>> topics.
> > > >>>>>>>>>>>> Indepent of any ktable/kstream interpretation.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> I let the majority decide on this topic to not further
> block
> > > >>>> this
> > > >>>>>>>>>>>> effort. But we might find a better name like:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> connector = kafka
> > > >>>>>>>>>>>> mode = updating/inserting
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> OR
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> connector = kafka-updating
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> ...
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Regards,
> > > >>>>>>>>>>>> Timo
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On 22.10.20 15:24, Jark Wu wrote:
> > > >>>>>>>>>>>>> Hi Timo,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Thanks for your opinions.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 1) Implementation
> > > >>>>>>>>>>>>> We will have an stateful operator to generate INSERT and
> > > >>>>>>>>>> UPDATE_BEFORE.
> > > >>>>>>>>>>>>> This operator is keyby-ed (primary key as the shuffle
> key)
> > > >>>> after
> > > >>>>>>>>> the
> > > >>>>>>>>>>>> source
> > > >>>>>>>>>>>>> operator.
> > > >>>>>>>>>>>>> The implementation of this operator is very similar to
> the
> > > >>>>>> existing
> > > >>>>>>>>>>>>> `DeduplicateKeepLastRowFunction`.
> > > >>>>>>>>>>>>> The operator will register a value state using the
> primary
> > > >>>> key
> > > >>>>>>>>> fields
> > > >>>>>>>>>>> as
> > > >>>>>>>>>>>>> keys.
> > > >>>>>>>>>>>>> When the value state is empty under current key, we will
> > > >> emit
> > > >>>>>>>>> INSERT
> > > >>>>>>>>>>> for
> > > >>>>>>>>>>>>> the input row.
> > > >>>>>>>>>>>>> When the value state is not empty under current key, we
> > > >> will
> > > >>>>> emit
> > > >>>>>>>>>>>>> UPDATE_BEFORE using the row in state,
> > > >>>>>>>>>>>>> and emit UPDATE_AFTER using the input row.
> > > >>>>>>>>>>>>> When the input row is DELETE, we will clear state and
> emit
> > > >>>>> DELETE
> > > >>>>>>>>>> row.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 2) new option vs new connector
> > > >>>>>>>>>>>>>> We recently simplified the table options to a minimum
> > > >>>> amount of
> > > >>>>>>>>>>>>> characters to be as concise as possible in the DDL.
> > > >>>>>>>>>>>>> I think this is the reason why we want to introduce a new
> > > >>>>>>>>> connector,
> > > >>>>>>>>>>>>> because we can simplify the options in DDL.
> > > >>>>>>>>>>>>> For example, if using a new option, the DDL may look like
> > > >>>> this:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> CREATE TABLE users (
> > > >>>>>>>>>>>>>      user_id BIGINT,
> > > >>>>>>>>>>>>>      user_name STRING,
> > > >>>>>>>>>>>>>      user_level STRING,
> > > >>>>>>>>>>>>>      region STRING,
> > > >>>>>>>>>>>>>      PRIMARY KEY (user_id) NOT ENFORCED
> > > >>>>>>>>>>>>> ) WITH (
> > > >>>>>>>>>>>>>      'connector' = 'kafka',
> > > >>>>>>>>>>>>>      'model' = 'table',
> > > >>>>>>>>>>>>>      'topic' = 'pageviews_per_region',
> > > >>>>>>>>>>>>>      'properties.bootstrap.servers' = '...',
> > > >>>>>>>>>>>>>      'properties.group.id' = 'testGroup',
> > > >>>>>>>>>>>>>      'scan.startup.mode' = 'earliest',
> > > >>>>>>>>>>>>>      'key.format' = 'csv',
> > > >>>>>>>>>>>>>      'key.fields' = 'user_id',
> > > >>>>>>>>>>>>>      'value.format' = 'avro',
> > > >>>>>>>>>>>>>      'sink.partitioner' = 'hash'
> > > >>>>>>>>>>>>> );
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> If using a new connector, we can have a different default
> > > >>>> value
> > > >>>>>> for
> > > >>>>>>>>>> the
> > > >>>>>>>>>>>>> options and remove unnecessary options,
> > > >>>>>>>>>>>>> the DDL can look like this which is much more concise:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> CREATE TABLE pageviews_per_region (
> > > >>>>>>>>>>>>>      user_id BIGINT,
> > > >>>>>>>>>>>>>      user_name STRING,
> > > >>>>>>>>>>>>>      user_level STRING,
> > > >>>>>>>>>>>>>      region STRING,
> > > >>>>>>>>>>>>>      PRIMARY KEY (user_id) NOT ENFORCED
> > > >>>>>>>>>>>>> ) WITH (
> > > >>>>>>>>>>>>>      'connector' = 'kafka-compacted',
> > > >>>>>>>>>>>>>      'topic' = 'pageviews_per_region',
> > > >>>>>>>>>>>>>      'properties.bootstrap.servers' = '...',
> > > >>>>>>>>>>>>>      'key.format' = 'csv',
> > > >>>>>>>>>>>>>      'value.format' = 'avro'
> > > >>>>>>>>>>>>> );
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> When people read `connector=kafka-compacted` they might
> > > >> not
> > > >>>>> know
> > > >>>>>>>>>> that
> > > >>>>>>>>>>> it
> > > >>>>>>>>>>>>>> has ktable semantics. You don't need to enable log
> > > >>>> compaction
> > > >>>>> in
> > > >>>>>>>>>> order
> > > >>>>>>>>>>>>>> to use a KTable as far as I know.
> > > >>>>>>>>>>>>> We don't need to let users know it has ktable semantics,
> as
> > > >>>>>>>>>> Konstantin
> > > >>>>>>>>>>>>> mentioned this may carry more implicit
> > > >>>>>>>>>>>>> meaning than we want to imply here. I agree users don't
> > > >> need
> > > >>>> to
> > > >>>>>>>>>> enable
> > > >>>>>>>>>>>> log
> > > >>>>>>>>>>>>> compaction, but from the production perspective,
> > > >>>>>>>>>>>>> log compaction should always be enabled if it is used in
> > > >> this
> > > >>>>>>>>>> purpose.
> > > >>>>>>>>>>>>> Calling it "kafka-compacted" can even remind users to
> > > >> enable
> > > >>>> log
> > > >>>>>>>>>>>> compaction.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> I don't agree to introduce "model = table/stream" option,
> > > >> or
> > > >>>>>>>>>>>>> "connector=kafka-table",
> > > >>>>>>>>>>>>> because this means we are introducing Table vs Stream
> > > >> concept
> > > >>>>> from
> > > >>>>>>>>>>> KSQL.
> > > >>>>>>>>>>>>> However, we don't have such top-level concept in Flink
> SQL
> > > >>>> now,
> > > >>>>>>>>> this
> > > >>>>>>>>>>> will
> > > >>>>>>>>>>>>> further confuse users.
> > > >>>>>>>>>>>>> In Flink SQL, all the things are STREAM, the differences
> > > >> are
> > > >>>>>>>>> whether
> > > >>>>>>>>>> it
> > > >>>>>>>>>>>> is
> > > >>>>>>>>>>>>> bounded or unbounded,
> > > >>>>>>>>>>>>>     whether it is insert-only or changelog.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>> Jark
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> On Thu, 22 Oct 2020 at 20:39, Timo Walther <
> > > >>>> twal...@apache.org>
> > > >>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Hi Shengkai, Hi Jark,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> thanks for this great proposal. It is time to finally
> > > >>>> connect
> > > >>>>> the
> > > >>>>>>>>>>>>>> changelog processor with a compacted Kafka topic.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> "The operator will produce INSERT rows, or additionally
> > > >>>>> generate
> > > >>>>>>>>>>>>>> UPDATE_BEFORE rows for the previous image, or produce
> > > >> DELETE
> > > >>>>> rows
> > > >>>>>>>>>> with
> > > >>>>>>>>>>>>>> all columns filled with values."
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Could you elaborate a bit on the implementation details
> in
> > > >>>> the
> > > >>>>>>>>> FLIP?
> > > >>>>>>>>>>> How
> > > >>>>>>>>>>>>>> are UPDATE_BEFOREs are generated. How much state is
> > > >>>> required to
> > > >>>>>>>>>>> perform
> > > >>>>>>>>>>>>>> this operation.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>     From a conceptual and semantical point of view, I'm
> > > >> fine
> > > >>>>> with
> > > >>>>>>>>> the
> > > >>>>>>>>>>>>>> proposal. But I would like to share my opinion about how
> > > >> we
> > > >>>>>> expose
> > > >>>>>>>>>>> this
> > > >>>>>>>>>>>>>> feature:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> ktable vs kafka-compacted
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I'm against having an additional connector like `ktable`
> > > >> or
> > > >>>>>>>>>>>>>> `kafka-compacted`. We recently simplified the table
> > > >> options
> > > >>>> to
> > > >>>>> a
> > > >>>>>>>>>>> minimum
> > > >>>>>>>>>>>>>> amount of characters to be as concise as possible in the
> > > >>>> DDL.
> > > >>>>>>>>>>> Therefore,
> > > >>>>>>>>>>>>>> I would keep the `connector=kafka` and introduce an
> > > >>>> additional
> > > >>>>>>>>>> option.
> > > >>>>>>>>>>>>>> Because a user wants to read "from Kafka". And the "how"
> > > >>>> should
> > > >>>>>> be
> > > >>>>>>>>>>>>>> determined in the lower options.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> When people read `connector=ktable` they might not know
> > > >> that
> > > >>>>> this
> > > >>>>>>>>> is
> > > >>>>>>>>>>>>>> Kafka. Or they wonder where `kstream` is?
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> When people read `connector=kafka-compacted` they might
> > > >> not
> > > >>>>> know
> > > >>>>>>>>>> that
> > > >>>>>>>>>>> it
> > > >>>>>>>>>>>>>> has ktable semantics. You don't need to enable log
> > > >>>> compaction
> > > >>>>> in
> > > >>>>>>>>>> order
> > > >>>>>>>>>>>>>> to use a KTable as far as I know. Log compaction and
> table
> > > >>>>>>>>> semantics
> > > >>>>>>>>>>> are
> > > >>>>>>>>>>>>>> orthogonal topics.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> In the end we will need 3 types of information when
> > > >>>> declaring a
> > > >>>>>>>>>> Kafka
> > > >>>>>>>>>>>>>> connector:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> CREATE TABLE ... WITH (
> > > >>>>>>>>>>>>>>       connector=kafka        -- Some information about
> the
> > > >>>>>> connector
> > > >>>>>>>>>>>>>>       end-offset = XXXX      -- Some information about
> the
> > > >>>>>>>>> boundedness
> > > >>>>>>>>>>>>>>       model = table/stream   -- Some information about
> > > >>>>>>>>> interpretation
> > > >>>>>>>>>>>>>> )
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> We can still apply all the constraints mentioned in the
> > > >>>> FLIP.
> > > >>>>>> When
> > > >>>>>>>>>>>>>> `model` is set to `table`.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> What do you think?
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Regards,
> > > >>>>>>>>>>>>>> Timo
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On 21.10.20 14:19, Jark Wu wrote:
> > > >>>>>>>>>>>>>>> Hi,
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> IMO, if we are going to mix them in one connector,
> > > >>>>>>>>>>>>>>> 1) either users need to set some options to a specific
> > > >>>> value
> > > >>>>>>>>>>>> explicitly,
> > > >>>>>>>>>>>>>>> e.g. "scan.startup.mode=earliest",
> > > >> "sink.partitioner=hash",
> > > >>>>>> etc..
> > > >>>>>>>>>>>>>>> This makes the connector awkward to use. Users may face
> > > >> to
> > > >>>> fix
> > > >>>>>>>>>>> options
> > > >>>>>>>>>>>>>> one
> > > >>>>>>>>>>>>>>> by one according to the exception.
> > > >>>>>>>>>>>>>>> Besides, in the future, it is still possible to use
> > > >>>>>>>>>>>>>>> "sink.partitioner=fixed" (reduce network cost) if users
> > > >> are
> > > >>>>>> aware
> > > >>>>>>>>>> of
> > > >>>>>>>>>>>>>>> the partition routing,
> > > >>>>>>>>>>>>>>> however, it's error-prone to have "fixed" as default
> for
> > > >>>>>>>>> compacted
> > > >>>>>>>>>>>> mode.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> 2) or make those options a different default value when
> > > >>>>>>>>>>>> "compacted=true".
> > > >>>>>>>>>>>>>>> This would be more confusing and unpredictable if the
> > > >>>> default
> > > >>>>>>>>> value
> > > >>>>>>>>>>> of
> > > >>>>>>>>>>>>>>> options will change according to other options.
> > > >>>>>>>>>>>>>>> What happens if we have a third mode in the future?
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> In terms of usage and options, it's very different from
> > > >> the
> > > >>>>>>>>>>>>>>> original "kafka" connector.
> > > >>>>>>>>>>>>>>> It would be more handy to use and less fallible if
> > > >>>> separating
> > > >>>>>>>>> them
> > > >>>>>>>>>>> into
> > > >>>>>>>>>>>>>> two
> > > >>>>>>>>>>>>>>> connectors.
> > > >>>>>>>>>>>>>>> In the implementation layer, we can reuse code as much
> as
> > > >>>>>>>>> possible.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Therefore, I'm still +1 to have a new connector.
> > > >>>>>>>>>>>>>>> The "kafka-compacted" name sounds good to me.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>> Jark
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf <
> > > >>>>>>>>> kna...@apache.org>
> > > >>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Hi Kurt, Hi Shengkai,
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> thanks for answering my questions and the additional
> > > >>>>>>>>>>> clarifications. I
> > > >>>>>>>>>>>>>>>> don't have a strong opinion on whether to extend the
> > > >>>> "kafka"
> > > >>>>>>>>>>> connector
> > > >>>>>>>>>>>>>> or
> > > >>>>>>>>>>>>>>>> to introduce a new connector. So, from my perspective
> > > >> feel
> > > >>>>> free
> > > >>>>>>>>> to
> > > >>>>>>>>>>> go
> > > >>>>>>>>>>>>>> with
> > > >>>>>>>>>>>>>>>> a separate connector. If we do introduce a new
> > > >> connector I
> > > >>>>>>>>>> wouldn't
> > > >>>>>>>>>>>>>> call it
> > > >>>>>>>>>>>>>>>> "ktable" for aforementioned reasons (In addition, we
> > > >> might
> > > >>>>>>>>> suggest
> > > >>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>>> there is also a "kstreams" connector for symmetry
> > > >>>> reasons). I
> > > >>>>>>>>>> don't
> > > >>>>>>>>>>>>>> have a
> > > >>>>>>>>>>>>>>>> good alternative name, though, maybe "kafka-compacted"
> > > >> or
> > > >>>>>>>>>>>>>>>> "compacted-kafka".
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Konstantin
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> On Wed, Oct 21, 2020 at 4:43 AM Kurt Young <
> > > >>>> ykt...@gmail.com
> > > >>>>>>
> > > >>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Hi all,
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> I want to describe the discussion process which drove
> > > >> us
> > > >>>> to
> > > >>>>>>>>> have
> > > >>>>>>>>>>> such
> > > >>>>>>>>>>>>>>>>> conclusion, this might make some of
> > > >>>>>>>>>>>>>>>>> the design choices easier to understand and keep
> > > >>>> everyone on
> > > >>>>>>>>> the
> > > >>>>>>>>>>> same
> > > >>>>>>>>>>>>>>>> page.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Back to the motivation, what functionality do we want
> > > >> to
> > > >>>>>>>>> provide
> > > >>>>>>>>>> in
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>> first place? We got a lot of feedback and
> > > >>>>>>>>>>>>>>>>> questions from mailing lists that people want to
> write
> > > >>>>>>>>>>>> Not-Insert-Only
> > > >>>>>>>>>>>>>>>>> messages into kafka. They might be
> > > >>>>>>>>>>>>>>>>> intentional or by accident, e.g. wrote an
> non-windowed
> > > >>>>>>>>> aggregate
> > > >>>>>>>>>>>> query
> > > >>>>>>>>>>>>>> or
> > > >>>>>>>>>>>>>>>>> non-windowed left outer join. And
> > > >>>>>>>>>>>>>>>>> some users from KSQL world also asked about why Flink
> > > >>>> didn't
> > > >>>>>>>>>>> leverage
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>> Key concept of every kafka topic
> > > >>>>>>>>>>>>>>>>> and make kafka as a dynamic changing keyed table.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> To work with kafka better, we were thinking to extend
> > > >> the
> > > >>>>>>>>>>>> functionality
> > > >>>>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>> the current kafka connector by letting it
> > > >>>>>>>>>>>>>>>>> accept updates and deletions. But due to the
> limitation
> > > >>>> of
> > > >>>>>>>>> kafka,
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>> update has to be "update by key", aka a table
> > > >>>>>>>>>>>>>>>>> with primary key.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> This introduces a couple of conflicts with current
> > > >> kafka
> > > >>>>>>>>> table's
> > > >>>>>>>>>>>>>> options:
> > > >>>>>>>>>>>>>>>>> 1. key.fields: as said above, we need the kafka table
> > > >> to
> > > >>>>> have
> > > >>>>>>>>> the
> > > >>>>>>>>>>>>>> primary
> > > >>>>>>>>>>>>>>>>> key constraint. And users can also configure
> > > >>>>>>>>>>>>>>>>> key.fields freely, this might cause friction. (Sure
> we
> > > >>>> can
> > > >>>>> do
> > > >>>>>>>>>> some
> > > >>>>>>>>>>>>>> sanity
> > > >>>>>>>>>>>>>>>>> check on this but it also creates friction.)
> > > >>>>>>>>>>>>>>>>> 2. sink.partitioner: to make the semantics right, we
> > > >>>> need to
> > > >>>>>>>>> make
> > > >>>>>>>>>>>> sure
> > > >>>>>>>>>>>>>>>> all
> > > >>>>>>>>>>>>>>>>> the updates on the same key are written to
> > > >>>>>>>>>>>>>>>>> the same kafka partition, such we should force to
> use a
> > > >>>> hash
> > > >>>>>> by
> > > >>>>>>>>>> key
> > > >>>>>>>>>>>>>>>>> partition inside such table. Again, this has
> conflicts
> > > >>>>>>>>>>>>>>>>> and creates friction with current user options.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> The above things are solvable, though not perfect or
> > > >> most
> > > >>>>> user
> > > >>>>>>>>>>>>>> friendly.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Let's take a look at the reading side. The keyed
> kafka
> > > >>>> table
> > > >>>>>>>>>>> contains
> > > >>>>>>>>>>>>>> two
> > > >>>>>>>>>>>>>>>>> kinds of messages: upsert or deletion. What upsert
> > > >>>>>>>>>>>>>>>>> means is "If the key doesn't exist yet, it's an
> insert
> > > >>>>> record.
> > > >>>>>>>>>>>>>> Otherwise
> > > >>>>>>>>>>>>>>>>> it's an update record". For the sake of correctness
> or
> > > >>>>>>>>>>>>>>>>> simplicity, the Flink SQL engine also needs such
> > > >>>>> information.
> > > >>>>>>>>> If
> > > >>>>>>>>>> we
> > > >>>>>>>>>>>>>>>>> interpret all messages to "update record", some
> queries
> > > >>>> or
> > > >>>>>>>>>>>>>>>>> operators may not work properly. It's weird to see an
> > > >>>> update
> > > >>>>>>>>>> record
> > > >>>>>>>>>>>> but
> > > >>>>>>>>>>>>>>>> you
> > > >>>>>>>>>>>>>>>>> haven't seen the insert record before.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> So what Flink should do is after reading out the
> > > >> records
> > > >>>>> from
> > > >>>>>>>>>> such
> > > >>>>>>>>>>>>>> table,
> > > >>>>>>>>>>>>>>>>> it needs to create a state to record which messages
> > > >> have
> > > >>>>>>>>>>>>>>>>> been seen and then generate the correct row type
> > > >>>>>>>>> correspondingly.
> > > >>>>>>>>>>>> This
> > > >>>>>>>>>>>>>>>> kind
> > > >>>>>>>>>>>>>>>>> of couples the state and the data of the message
> > > >>>>>>>>>>>>>>>>> queue, and it also creates conflicts with current
> kafka
> > > >>>>>>>>>> connector.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Think about if users suspend a running job (which
> > > >>>> contains
> > > >>>>>> some
> > > >>>>>>>>>>>> reading
> > > >>>>>>>>>>>>>>>>> state now), and then change the start offset of the
> > > >>>> reader.
> > > >>>>>>>>>>>>>>>>> By changing the reading offset, it actually change
> the
> > > >>>> whole
> > > >>>>>>>>>> story
> > > >>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>> "which records should be insert messages and which
> > > >>>> records
> > > >>>>>>>>>>>>>>>>> should be update messages). And it will also make
> Flink
> > > >>>> to
> > > >>>>>> deal
> > > >>>>>>>>>>> with
> > > >>>>>>>>>>>>>>>>> another weird situation that it might receive a
> > > >> deletion
> > > >>>>>>>>>>>>>>>>> on a non existing message.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> We were unsatisfied with all the frictions and
> > > >> conflicts
> > > >>>> it
> > > >>>>>>>>> will
> > > >>>>>>>>>>>> create
> > > >>>>>>>>>>>>>>>> if
> > > >>>>>>>>>>>>>>>>> we enable the "upsert & deletion" support to the
> > > >> current
> > > >>>>> kafka
> > > >>>>>>>>>>>>>>>>> connector. And later we begin to realize that we
> > > >>>> shouldn't
> > > >>>>>>>>> treat
> > > >>>>>>>>>> it
> > > >>>>>>>>>>>> as
> > > >>>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>> normal message queue, but should treat it as a
> changing
> > > >>>>> keyed
> > > >>>>>>>>>>>>>>>>> table. We should be able to always get the whole data
> > > >> of
> > > >>>>> such
> > > >>>>>>>>>> table
> > > >>>>>>>>>>>> (by
> > > >>>>>>>>>>>>>>>>> disabling the start offset option) and we can also
> read
> > > >>>> the
> > > >>>>>>>>>>>>>>>>> changelog out of such table. It's like a HBase table
> > > >> with
> > > >>>>>>>>> binlog
> > > >>>>>>>>>>>>>> support
> > > >>>>>>>>>>>>>>>>> but doesn't have random access capability (which can
> be
> > > >>>>>>>>> fulfilled
> > > >>>>>>>>>>>>>>>>> by Flink's state).
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> So our intention was instead of telling and
> persuading
> > > >>>> users
> > > >>>>>>>>> what
> > > >>>>>>>>>>>> kind
> > > >>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>> options they should or should not use by extending
> > > >>>>>>>>>>>>>>>>> current kafka connector when enable upsert support,
> we
> > > >>>> are
> > > >>>>>>>>>> actually
> > > >>>>>>>>>>>>>>>> create
> > > >>>>>>>>>>>>>>>>> a whole new and different connector that has total
> > > >>>>>>>>>>>>>>>>> different abstractions in SQL layer, and should be
> > > >>>> treated
> > > >>>>>>>>>> totally
> > > >>>>>>>>>>>>>>>>> different with current kafka connector.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Hope this can clarify some of the concerns.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>> Kurt
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang <
> > > >>>>>>>>> fskm...@gmail.com
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Hi devs,
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> As many people are still confused about the
> difference
> > > >>>>> option
> > > >>>>>>>>>>>>>>>> behaviours
> > > >>>>>>>>>>>>>>>>>> between the Kafka connector and KTable connector,
> Jark
> > > >>>> and
> > > >>>>> I
> > > >>>>>>>>>> list
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>> differences in the doc[1].
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>>> Shengkai
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> [1]
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二
> > > >>>>> 下午12:05写道:
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Hi Konstantin,
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Thanks for your reply.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> It uses the "kafka" connector and does not
> specify a
> > > >>>>>> primary
> > > >>>>>>>>>>> key.
> > > >>>>>>>>>>>>>>>>>>> The dimensional table `users` is a ktable connector
> > > >>>> and we
> > > >>>>>>>>> can
> > > >>>>>>>>>>>>>>>> specify
> > > >>>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>> pk on the KTable.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Will it possible to use a "ktable" as a
> dimensional
> > > >>>> table
> > > >>>>>> in
> > > >>>>>>>>>>>>>>>> FLIP-132
> > > >>>>>>>>>>>>>>>>>>> Yes. We can specify the watermark on the KTable and
> > > >> it
> > > >>>> can
> > > >>>>>> be
> > > >>>>>>>>>>> used
> > > >>>>>>>>>>>>>>>> as a
> > > >>>>>>>>>>>>>>>>>>> dimension table in temporal join.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Introduce a new connector vs introduce a new
> > > >> property
> > > >>>>>>>>>>>>>>>>>>> The main reason behind is that the KTable connector
> > > >>>> almost
> > > >>>>>>>>> has
> > > >>>>>>>>>> no
> > > >>>>>>>>>>>>>>>>> common
> > > >>>>>>>>>>>>>>>>>>> options with the Kafka connector. The options that
> > > >> can
> > > >>>> be
> > > >>>>>>>>>> reused
> > > >>>>>>>>>>> by
> > > >>>>>>>>>>>>>>>>>> KTable
> > > >>>>>>>>>>>>>>>>>>> connectors are 'topic',
> > > >> 'properties.bootstrap.servers'
> > > >>>> and
> > > >>>>>>>>>>>>>>>>>>> 'value.fields-include' . We can't set cdc format
> for
> > > >>>>>>>>>> 'key.format'
> > > >>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>>> 'value.format' in KTable connector now, which is
> > > >>>>> available
> > > >>>>>>>>> in
> > > >>>>>>>>>>>> Kafka
> > > >>>>>>>>>>>>>>>>>>> connector. Considering the difference between the
> > > >>>> options
> > > >>>>> we
> > > >>>>>>>>>> can
> > > >>>>>>>>>>>> use,
> > > >>>>>>>>>>>>>>>>>> it's
> > > >>>>>>>>>>>>>>>>>>> more suitable to introduce an another connector
> > > >> rather
> > > >>>>> than
> > > >>>>>> a
> > > >>>>>>>>>>>>>>>> property.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> We are also fine to use "compacted-kafka" as the
> name
> > > >>>> of
> > > >>>>> the
> > > >>>>>>>>>> new
> > > >>>>>>>>>>>>>>>>>>> connector. What do you think?
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>>>> Shengkai
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Konstantin Knauf <kna...@apache.org>
> 于2020年10月19日周一
> > > >>>>>>>>> 下午10:15写道:
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Hi Shengkai,
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Thank you for driving this effort. I believe this
> a
> > > >>>> very
> > > >>>>>>>>>>> important
> > > >>>>>>>>>>>>>>>>>> feature
> > > >>>>>>>>>>>>>>>>>>>> for many users who use Kafka and Flink SQL
> > > >> together. A
> > > >>>>> few
> > > >>>>>>>>>>>> questions
> > > >>>>>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>>>> thoughts:
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> * Is your example "Use KTable as a
> > > >> reference/dimension
> > > >>>>>>>>> table"
> > > >>>>>>>>>>>>>>>> correct?
> > > >>>>>>>>>>>>>>>>>> It
> > > >>>>>>>>>>>>>>>>>>>> uses the "kafka" connector and does not specify a
> > > >>>> primary
> > > >>>>>>>>> key.
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> * Will it be possible to use a "ktable" table
> > > >> directly
> > > >>>>> as a
> > > >>>>>>>>>>>>>>>>> dimensional
> > > >>>>>>>>>>>>>>>>>>>> table in temporal join (*based on event time*)
> > > >>>>> (FLIP-132)?
> > > >>>>>>>>>> This
> > > >>>>>>>>>>> is
> > > >>>>>>>>>>>>>>>> not
> > > >>>>>>>>>>>>>>>>>>>> completely clear to me from the FLIP.
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> * I'd personally prefer not to introduce a new
> > > >>>> connector
> > > >>>>>> and
> > > >>>>>>>>>>>> instead
> > > >>>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>> extend the Kafka connector. We could add an
> > > >> additional
> > > >>>>>>>>>> property
> > > >>>>>>>>>>>>>>>>>>>> "compacted"
> > > >>>>>>>>>>>>>>>>>>>> = "true"|"false". If it is set to "true", we can
> add
> > > >>>>>>>>>> additional
> > > >>>>>>>>>>>>>>>>>> validation
> > > >>>>>>>>>>>>>>>>>>>> logic (e.g. "scan.startup.mode" can not be set,
> > > >>>> primary
> > > >>>>> key
> > > >>>>>>>>>>>>>>>> required,
> > > >>>>>>>>>>>>>>>>>>>> etc.). If we stick to a separate connector I'd not
> > > >>>> call
> > > >>>>> it
> > > >>>>>>>>>>>> "ktable",
> > > >>>>>>>>>>>>>>>>> but
> > > >>>>>>>>>>>>>>>>>>>> rather "compacted-kafka" or similar. KTable seems
> to
> > > >>>>> carry
> > > >>>>>>>>>> more
> > > >>>>>>>>>>>>>>>>> implicit
> > > >>>>>>>>>>>>>>>>>>>> meaning than we want to imply here.
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> * I agree that this is not a bounded source. If we
> > > >>>> want
> > > >>>>> to
> > > >>>>>>>>>>>> support a
> > > >>>>>>>>>>>>>>>>>>>> bounded mode, this is an orthogonal concern that
> > > >> also
> > > >>>>>>>>> applies
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>>>>>> other
> > > >>>>>>>>>>>>>>>>>>>> unbounded sources.
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Konstantin
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <
> > > >>>>> imj...@gmail.com>
> > > >>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Hi Danny,
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> First of all, we didn't introduce any concepts
> from
> > > >>>> KSQL
> > > >>>>>>>>>> (e.g.
> > > >>>>>>>>>>>>>>>>> Stream
> > > >>>>>>>>>>>>>>>>>> vs
> > > >>>>>>>>>>>>>>>>>>>>> Table notion).
> > > >>>>>>>>>>>>>>>>>>>>> This new connector will produce a changelog
> stream,
> > > >>>> so
> > > >>>>>> it's
> > > >>>>>>>>>>> still
> > > >>>>>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>>>>> dynamic
> > > >>>>>>>>>>>>>>>>>>>>> table and doesn't conflict with Flink core
> > > >> concepts.
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> The "ktable" is just a connector name, we can
> also
> > > >>>> call
> > > >>>>> it
> > > >>>>>>>>>>>>>>>>>>>>> "compacted-kafka" or something else.
> > > >>>>>>>>>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users
> can
> > > >>>>> migrate
> > > >>>>>>>>> to
> > > >>>>>>>>>>>>>>>> Flink
> > > >>>>>>>>>>>>>>>>>> SQL
> > > >>>>>>>>>>>>>>>>>>>>> easily.
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Regarding to why introducing a new connector vs a
> > > >> new
> > > >>>>>>>>>> property
> > > >>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>>>>>> existing
> > > >>>>>>>>>>>>>>>>>>>>> kafka connector:
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> I think the main reason is that we want to have a
> > > >>>> clear
> > > >>>>>>>>>>>> separation
> > > >>>>>>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>>>>>> such
> > > >>>>>>>>>>>>>>>>>>>>> two use cases, because they are very different.
> > > >>>>>>>>>>>>>>>>>>>>> We also listed reasons in the FLIP, including:
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> 1) It's hard to explain what's the behavior when
> > > >>>> users
> > > >>>>>>>>>> specify
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>> start
> > > >>>>>>>>>>>>>>>>>>>>> offset from a middle position (e.g. how to
> process
> > > >>>> non
> > > >>>>>>>>> exist
> > > >>>>>>>>>>>>>>>> delete
> > > >>>>>>>>>>>>>>>>>>>>> events).
> > > >>>>>>>>>>>>>>>>>>>>>         It's dangerous if users do that. So we
> > don't
> > > >>>>>> provide
> > > >>>>>>>>>> the
> > > >>>>>>>>>>>>>>>> offset
> > > >>>>>>>>>>>>>>>>>>>> option
> > > >>>>>>>>>>>>>>>>>>>>> in the new connector at the moment.
> > > >>>>>>>>>>>>>>>>>>>>> 2) It's a different perspective/abstraction on
> the
> > > >>>> same
> > > >>>>>>>>> kafka
> > > >>>>>>>>>>>>>>>> topic
> > > >>>>>>>>>>>>>>>>>>>> (append
> > > >>>>>>>>>>>>>>>>>>>>> vs. upsert). It would be easier to understand if
> we
> > > >>>> can
> > > >>>>>>>>>>> separate
> > > >>>>>>>>>>>>>>>>> them
> > > >>>>>>>>>>>>>>>>>>>>>         instead of mixing them in one connector.
> > The
> > > >>>> new
> > > >>>>>>>>>>> connector
> > > >>>>>>>>>>>>>>>>>> requires
> > > >>>>>>>>>>>>>>>>>>>>> hash sink partitioner, primary key declared,
> > > >> regular
> > > >>>>>>>>> format.
> > > >>>>>>>>>>>>>>>>>>>>>         If we mix them in one connector, it might
> > be
> > > >>>>>>>>> confusing
> > > >>>>>>>>>>> how
> > > >>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>> use
> > > >>>>>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>> options correctly.
> > > >>>>>>>>>>>>>>>>>>>>> 3) The semantic of the KTable connector is just
> the
> > > >>>> same
> > > >>>>>> as
> > > >>>>>>>>>>>> KTable
> > > >>>>>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>>>>>> Kafka
> > > >>>>>>>>>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and
> > > >> KSQL
> > > >>>>>> users.
> > > >>>>>>>>>>>>>>>>>>>>>         We have seen several questions in the
> > > >> mailing
> > > >>>>> list
> > > >>>>>>>>>> asking
> > > >>>>>>>>>>>> how
> > > >>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>> model
> > > >>>>>>>>>>>>>>>>>>>>> a KTable and how to join a KTable in Flink SQL.
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>>>>>> Jark
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 19:53, Jark Wu <
> > > >>>> imj...@gmail.com
> > > >>>>>>
> > > >>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> Hi Jingsong,
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> As the FLIP describes, "KTable connector
> produces
> > > >> a
> > > >>>>>>>>>> changelog
> > > >>>>>>>>>>>>>>>>>> stream,
> > > >>>>>>>>>>>>>>>>>>>>>> where each data record represents an update or
> > > >>>> delete
> > > >>>>>>>>>> event.".
> > > >>>>>>>>>>>>>>>>>>>>>> Therefore, a ktable source is an unbounded
> stream
> > > >>>>> source.
> > > >>>>>>>>>>>>>>>>> Selecting
> > > >>>>>>>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>>>>>>> ktable source is similar to selecting a kafka
> > > >> source
> > > >>>>> with
> > > >>>>>>>>>>>>>>>>>>>> debezium-json
> > > >>>>>>>>>>>>>>>>>>>>>> format
> > > >>>>>>>>>>>>>>>>>>>>>> that it never ends and the results are
> > > >> continuously
> > > >>>>>>>>> updated.
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> It's possible to have a bounded ktable source in
> > > >> the
> > > >>>>>>>>> future,
> > > >>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>>>>>> example,
> > > >>>>>>>>>>>>>>>>>>>>>> add an option 'bounded=true' or
> 'end-offset=xxx'.
> > > >>>>>>>>>>>>>>>>>>>>>> In this way, the ktable will produce a bounded
> > > >>>>> changelog
> > > >>>>>>>>>>> stream.
> > > >>>>>>>>>>>>>>>>>>>>>> So I think this can be a compatible feature in
> the
> > > >>>>>> future.
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> I don't think we should associate with ksql
> > > >> related
> > > >>>>>>>>>> concepts.
> > > >>>>>>>>>>>>>>>>>>>> Actually,
> > > >>>>>>>>>>>>>>>>>>>>> we
> > > >>>>>>>>>>>>>>>>>>>>>> didn't introduce any concepts from KSQL (e.g.
> > > >>>> Stream vs
> > > >>>>>>>>>> Table
> > > >>>>>>>>>>>>>>>>>> notion).
> > > >>>>>>>>>>>>>>>>>>>>>> The "ktable" is just a connector name, we can
> also
> > > >>>> call
> > > >>>>>> it
> > > >>>>>>>>>>>>>>>>>>>>>> "compacted-kafka" or something else.
> > > >>>>>>>>>>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users
> can
> > > >>>>>> migrate
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>> Flink
> > > >>>>>>>>>>>>>>>>>>>> SQL
> > > >>>>>>>>>>>>>>>>>>>>>> easily.
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> Regarding the "value.fields-include", this is an
> > > >>>> option
> > > >>>>>>>>>>>>>>>> introduced
> > > >>>>>>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>>>>>>>> FLIP-107 for Kafka connector.
> > > >>>>>>>>>>>>>>>>>>>>>> I think we should keep the same behavior with
> the
> > > >>>> Kafka
> > > >>>>>>>>>>>>>>>> connector.
> > > >>>>>>>>>>>>>>>>>> I'm
> > > >>>>>>>>>>>>>>>>>>>>> not
> > > >>>>>>>>>>>>>>>>>>>>>> sure what's the default behavior of KSQL.
> > > >>>>>>>>>>>>>>>>>>>>>> But I guess it also stores the keys in value
> from
> > > >>>> this
> > > >>>>>>>>>> example
> > > >>>>>>>>>>>>>>>>> docs
> > > >>>>>>>>>>>>>>>>>>>> (see
> > > >>>>>>>>>>>>>>>>>>>>>> the "users_original" table) [1].
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>>>>>>> Jark
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> [1]:
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 18:17, Danny Chan <
> > > >>>>>>>>>>> yuzhao....@gmail.com>
> > > >>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> The concept seems conflicts with the Flink
> > > >>>> abstraction
> > > >>>>>>>>>>> “dynamic
> > > >>>>>>>>>>>>>>>>>>>> table”,
> > > >>>>>>>>>>>>>>>>>>>>>>> in Flink we see both “stream” and “table” as a
> > > >>>> dynamic
> > > >>>>>>>>>> table,
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> I think we should make clear first how to
> express
> > > >>>>> stream
> > > >>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>> table
> > > >>>>>>>>>>>>>>>>>>>>>>> specific features on one “dynamic table”,
> > > >>>>>>>>>>>>>>>>>>>>>>> it is more natural for KSQL because KSQL takes
> > > >>>> stream
> > > >>>>>> and
> > > >>>>>>>>>>> table
> > > >>>>>>>>>>>>>>>>> as
> > > >>>>>>>>>>>>>>>>>>>>>>> different abstractions for representing
> > > >>>> collections.
> > > >>>>> In
> > > >>>>>>>>>> KSQL,
> > > >>>>>>>>>>>>>>>>> only
> > > >>>>>>>>>>>>>>>>>>>>> table is
> > > >>>>>>>>>>>>>>>>>>>>>>> mutable and can have a primary key.
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> Does this connector belongs to the “table”
> scope
> > > >> or
> > > >>>>>>>>>> “stream”
> > > >>>>>>>>>>>>>>>>> scope
> > > >>>>>>>>>>>>>>>>>> ?
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> Some of the concepts (such as the primary key
> on
> > > >>>>> stream)
> > > >>>>>>>>>>> should
> > > >>>>>>>>>>>>>>>>> be
> > > >>>>>>>>>>>>>>>>>>>>>>> suitable for all the connectors, not just
> Kafka,
> > > >>>>>>>>> Shouldn’t
> > > >>>>>>>>>>> this
> > > >>>>>>>>>>>>>>>>> be
> > > >>>>>>>>>>>>>>>>>> an
> > > >>>>>>>>>>>>>>>>>>>>>>> extension of existing Kafka connector instead
> of
> > > >> a
> > > >>>>>>>>> totally
> > > >>>>>>>>>>> new
> > > >>>>>>>>>>>>>>>>>>>>> connector ?
> > > >>>>>>>>>>>>>>>>>>>>>>> What about the other connectors ?
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> Because this touches the core abstraction of
> > > >>>> Flink, we
> > > >>>>>>>>>> better
> > > >>>>>>>>>>>>>>>>> have
> > > >>>>>>>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>>>>>>>> top-down overall design, following the KSQL
> > > >>>> directly
> > > >>>>> is
> > > >>>>>>>>> not
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>> answer.
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> P.S. For the source
> > > >>>>>>>>>>>>>>>>>>>>>>>> Shouldn’t this be an extension of existing
> Kafka
> > > >>>>>>>>> connector
> > > >>>>>>>>>>>>>>>>>> instead
> > > >>>>>>>>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>>>>>>>> totally new connector ?
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> How could we achieve that (e.g. set up the
> > > >>>> parallelism
> > > >>>>>>>>>>>>>>>>> correctly) ?
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>>>>>>>> Danny Chan
> > > >>>>>>>>>>>>>>>>>>>>>>> 在 2020年10月19日 +0800 PM5:17,Jingsong Li <
> > > >>>>>>>>>>> jingsongl...@gmail.com
> > > >>>>>>>>>>>>>>>>>>> ,写道:
> > > >>>>>>>>>>>>>>>>>>>>>>>> Thanks Shengkai for your proposal.
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> +1 for this feature.
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>> Future Work: Support bounded KTable source
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> I don't think it should be a future work, I
> > > >> think
> > > >>>> it
> > > >>>>> is
> > > >>>>>>>>>> one
> > > >>>>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>>>>> important concepts of this FLIP. We need to
> > > >>>>> understand
> > > >>>>>>>>> it
> > > >>>>>>>>>>>>>>>> now.
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Intuitively, a ktable in my opinion is a
> bounded
> > > >>>>> table
> > > >>>>>>>>>>> rather
> > > >>>>>>>>>>>>>>>>>> than
> > > >>>>>>>>>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>>>>>>>>> stream, so select should produce a bounded
> table
> > > >>>> by
> > > >>>>>>>>>> default.
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> I think we can list Kafka related knowledge,
> > > >>>> because
> > > >>>>>> the
> > > >>>>>>>>>>> word
> > > >>>>>>>>>>>>>>>>>>>> `ktable`
> > > >>>>>>>>>>>>>>>>>>>>>>> is
> > > >>>>>>>>>>>>>>>>>>>>>>>> easy to associate with ksql related concepts.
> > > >> (If
> > > >>>>>>>>>> possible,
> > > >>>>>>>>>>>>>>>>> it's
> > > >>>>>>>>>>>>>>>>>>>>> better
> > > >>>>>>>>>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>>>>> unify with it)
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> What do you think?
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>> value.fields-include
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> What about the default behavior of KSQL?
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>>>>>>>>> Jingsong
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang
> <
> > > >>>>>>>>>>>>>>>>> fskm...@gmail.com
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>> Hi, devs.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>> Jark and I want to start a new FLIP to
> > > >> introduce
> > > >>>> the
> > > >>>>>>>>>> KTable
> > > >>>>>>>>>>>>>>>>>>>>>>> connector. The
> > > >>>>>>>>>>>>>>>>>>>>>>>>> KTable is a shortcut of "Kafka Table", it
> also
> > > >>>> has
> > > >>>>> the
> > > >>>>>>>>>> same
> > > >>>>>>>>>>>>>>>>>>>>> semantics
> > > >>>>>>>>>>>>>>>>>>>>>>> with
> > > >>>>>>>>>>>>>>>>>>>>>>>>> the KTable notion in Kafka Stream.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>> FLIP-149:
> > > >>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector
> > > >>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>> Currently many users have expressed their
> needs
> > > >>>> for
> > > >>>>>> the
> > > >>>>>>>>>>>>>>>>> upsert
> > > >>>>>>>>>>>>>>>>>>>> Kafka
> > > >>>>>>>>>>>>>>>>>>>>>>> by
> > > >>>>>>>>>>>>>>>>>>>>>>>>> mail lists and issues. The KTable connector
> has
> > > >>>>>> several
> > > >>>>>>>>>>>>>>>>>> benefits
> > > >>>>>>>>>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>>>>>>>>> users:
> > > >>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>> 1. Users are able to interpret a compacted
> > > >> Kafka
> > > >>>>> Topic
> > > >>>>>>>>> as
> > > >>>>>>>>>>>>>>>> an
> > > >>>>>>>>>>>>>>>>>>>> upsert
> > > >>>>>>>>>>>>>>>>>>>>>>> stream
> > > >>>>>>>>>>>>>>>>>>>>>>>>> in Apache Flink. And also be able to write a
> > > >>>>> changelog
> > > >>>>>>>>>>>>>>>> stream
> > > >>>>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>> Kafka
> > > >>>>>>>>>>>>>>>>>>>>>>>>> (into a compacted topic).
> > > >>>>>>>>>>>>>>>>>>>>>>>>> 2. As a part of the real time pipeline, store
> > > >>>> join
> > > >>>>> or
> > > >>>>>>>>>>>>>>>>> aggregate
> > > >>>>>>>>>>>>>>>>>>>>>>> result (may
> > > >>>>>>>>>>>>>>>>>>>>>>>>> contain updates) into a Kafka topic for
> further
> > > >>>>>>>>>>>>>>>> calculation;
> > > >>>>>>>>>>>>>>>>>>>>>>>>> 3. The semantic of the KTable connector is
> just
> > > >>>> the
> > > >>>>>>>>> same
> > > >>>>>>>>>> as
> > > >>>>>>>>>>>>>>>>>>>> KTable
> > > >>>>>>>>>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>>>>>>>>> Kafka
> > > >>>>>>>>>>>>>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream
> and
> > > >>>> KSQL
> > > >>>>>>>>>> users.
> > > >>>>>>>>>>>>>>>>> We
> > > >>>>>>>>>>>>>>>>>>>> have
> > > >>>>>>>>>>>>>>>>>>>>>>> seen
> > > >>>>>>>>>>>>>>>>>>>>>>>>> several questions in the mailing list asking
> > > >> how
> > > >>>> to
> > > >>>>>>>>>> model a
> > > >>>>>>>>>>>>>>>>>>>> KTable
> > > >>>>>>>>>>>>>>>>>>>>>>> and how
> > > >>>>>>>>>>>>>>>>>>>>>>>>> to join a KTable in Flink SQL.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>> We hope it can expand the usage of the Flink
> > > >> with
> > > >>>>>>>>> Kafka.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>> I'm looking forward to your feedback.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>>>>>>>>>> Shengkai
> > > >>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> --
> > > >>>>>>>>>>>>>>>>>>>>>>>> Best, Jingsong Lee
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> --
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Konstantin Knauf
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> https://twitter.com/snntrable
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> https://github.com/knaufk
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> --
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Konstantin Knauf
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> https://twitter.com/snntrable
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> https://github.com/knaufk
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> --
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Seth Wiesman | Solutions Architect
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> +1 314 387 1463
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> <https://www.ververica.com/>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Follow us @VervericaData
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> --
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The
> Apache
> > > >>>>> Flink
> > > >>>>>>>>>>> Conference
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Stream Processing | Event Driven | Real Time
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>> --
> > > >>>>>> Best, Jingsong Lee
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>> --
> > > >>>> Best, Jingsong Lee
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
> > --
> > Best, Jingsong Lee
> >
>


-- 
Best, Jingsong Lee

Reply via email to