The `kafka-cdc` looks good to me.
We can even give options to indicate whether to turn on compact, because
compact is just an optimization?

- ktable let me think about KSQL.
- kafka-compacted it is not just compacted, more than that, it still has
the ability of CDC
- upsert-kafka , upsert is back, and I don't really want to see it again
since we have CDC

Best,
Jingsong

On Fri, Oct 23, 2020 at 2:21 AM Timo Walther <twal...@apache.org> wrote:

> Hi Jark,
>
> I would be fine with `connector=upsert-kafka`. Another idea would be to
> align the name to other available Flink connectors [1]:
>
> `connector=kafka-cdc`.
>
> Regards,
> Timo
>
> [1] https://github.com/ververica/flink-cdc-connectors
>
> On 22.10.20 17:17, Jark Wu wrote:
> > Another name is "connector=upsert-kafka', I think this can solve Timo's
> > concern on the "compacted" word.
> >
> > Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify such
> kafka
> > sources.
> > I think "upsert" is a well-known terminology widely used in many systems
> > and matches the
> >   behavior of how we handle the kafka messages.
> >
> > What do you think?
> >
> > Best,
> > Jark
> >
> > [1]:
> >
> https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic
> >
> >
> >
> >
> > On Thu, 22 Oct 2020 at 22:53, Kurt Young <ykt...@gmail.com> wrote:
> >
> >> Good validation messages can't solve the broken user experience,
> especially
> >> that
> >> such update mode option will implicitly make half of current kafka
> options
> >> invalid or doesn't
> >> make sense.
> >>
> >> Best,
> >> Kurt
> >>
> >>
> >> On Thu, Oct 22, 2020 at 10:31 PM Jark Wu <imj...@gmail.com> wrote:
> >>
> >>> Hi Timo, Seth,
> >>>
> >>> The default value "inserting" of "mode" might be not suitable,
> >>> because "debezium-json" emits changelog messages which include updates.
> >>>
> >>> On Thu, 22 Oct 2020 at 22:10, Seth Wiesman <s...@ververica.com> wrote:
> >>>
> >>>> +1 for supporting upsert results into Kafka.
> >>>>
> >>>> I have no comments on the implementation details.
> >>>>
> >>>> As far as configuration goes, I tend to favor Timo's option where we
> >> add
> >>> a
> >>>> "mode" property to the existing Kafka table with default value
> >>> "inserting".
> >>>> If the mode is set to "updating" then the validation changes to the
> new
> >>>> requirements. I personally find it more intuitive than a seperate
> >>>> connector, my fear is users won't understand its the same physical
> >> kafka
> >>>> sink under the hood and it will lead to other confusion like does it
> >>> offer
> >>>> the same persistence guarantees? I think we are capable of adding good
> >>>> valdiation messaging that solves Jark and Kurts concerns.
> >>>>
> >>>>
> >>>> On Thu, Oct 22, 2020 at 8:51 AM Timo Walther <twal...@apache.org>
> >> wrote:
> >>>>
> >>>>> Hi Jark,
> >>>>>
> >>>>> "calling it "kafka-compacted" can even remind users to enable log
> >>>>> compaction"
> >>>>>
> >>>>> But sometimes users like to store a lineage of changes in their
> >> topics.
> >>>>> Indepent of any ktable/kstream interpretation.
> >>>>>
> >>>>> I let the majority decide on this topic to not further block this
> >>>>> effort. But we might find a better name like:
> >>>>>
> >>>>> connector = kafka
> >>>>> mode = updating/inserting
> >>>>>
> >>>>> OR
> >>>>>
> >>>>> connector = kafka-updating
> >>>>>
> >>>>> ...
> >>>>>
> >>>>> Regards,
> >>>>> Timo
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On 22.10.20 15:24, Jark Wu wrote:
> >>>>>> Hi Timo,
> >>>>>>
> >>>>>> Thanks for your opinions.
> >>>>>>
> >>>>>> 1) Implementation
> >>>>>> We will have an stateful operator to generate INSERT and
> >>> UPDATE_BEFORE.
> >>>>>> This operator is keyby-ed (primary key as the shuffle key) after
> >> the
> >>>>> source
> >>>>>> operator.
> >>>>>> The implementation of this operator is very similar to the existing
> >>>>>> `DeduplicateKeepLastRowFunction`.
> >>>>>> The operator will register a value state using the primary key
> >> fields
> >>>> as
> >>>>>> keys.
> >>>>>> When the value state is empty under current key, we will emit
> >> INSERT
> >>>> for
> >>>>>> the input row.
> >>>>>> When the value state is not empty under current key, we will emit
> >>>>>> UPDATE_BEFORE using the row in state,
> >>>>>> and emit UPDATE_AFTER using the input row.
> >>>>>> When the input row is DELETE, we will clear state and emit DELETE
> >>> row.
> >>>>>>
> >>>>>> 2) new option vs new connector
> >>>>>>> We recently simplified the table options to a minimum amount of
> >>>>>> characters to be as concise as possible in the DDL.
> >>>>>> I think this is the reason why we want to introduce a new
> >> connector,
> >>>>>> because we can simplify the options in DDL.
> >>>>>> For example, if using a new option, the DDL may look like this:
> >>>>>>
> >>>>>> CREATE TABLE users (
> >>>>>>     user_id BIGINT,
> >>>>>>     user_name STRING,
> >>>>>>     user_level STRING,
> >>>>>>     region STRING,
> >>>>>>     PRIMARY KEY (user_id) NOT ENFORCED
> >>>>>> ) WITH (
> >>>>>>     'connector' = 'kafka',
> >>>>>>     'model' = 'table',
> >>>>>>     'topic' = 'pageviews_per_region',
> >>>>>>     'properties.bootstrap.servers' = '...',
> >>>>>>     'properties.group.id' = 'testGroup',
> >>>>>>     'scan.startup.mode' = 'earliest',
> >>>>>>     'key.format' = 'csv',
> >>>>>>     'key.fields' = 'user_id',
> >>>>>>     'value.format' = 'avro',
> >>>>>>     'sink.partitioner' = 'hash'
> >>>>>> );
> >>>>>>
> >>>>>> If using a new connector, we can have a different default value for
> >>> the
> >>>>>> options and remove unnecessary options,
> >>>>>> the DDL can look like this which is much more concise:
> >>>>>>
> >>>>>> CREATE TABLE pageviews_per_region (
> >>>>>>     user_id BIGINT,
> >>>>>>     user_name STRING,
> >>>>>>     user_level STRING,
> >>>>>>     region STRING,
> >>>>>>     PRIMARY KEY (user_id) NOT ENFORCED
> >>>>>> ) WITH (
> >>>>>>     'connector' = 'kafka-compacted',
> >>>>>>     'topic' = 'pageviews_per_region',
> >>>>>>     'properties.bootstrap.servers' = '...',
> >>>>>>     'key.format' = 'csv',
> >>>>>>     'value.format' = 'avro'
> >>>>>> );
> >>>>>>
> >>>>>>> When people read `connector=kafka-compacted` they might not know
> >>> that
> >>>> it
> >>>>>>> has ktable semantics. You don't need to enable log compaction in
> >>> order
> >>>>>>> to use a KTable as far as I know.
> >>>>>> We don't need to let users know it has ktable semantics, as
> >>> Konstantin
> >>>>>> mentioned this may carry more implicit
> >>>>>> meaning than we want to imply here. I agree users don't need to
> >>> enable
> >>>>> log
> >>>>>> compaction, but from the production perspective,
> >>>>>> log compaction should always be enabled if it is used in this
> >>> purpose.
> >>>>>> Calling it "kafka-compacted" can even remind users to enable log
> >>>>> compaction.
> >>>>>>
> >>>>>> I don't agree to introduce "model = table/stream" option, or
> >>>>>> "connector=kafka-table",
> >>>>>> because this means we are introducing Table vs Stream concept from
> >>>> KSQL.
> >>>>>> However, we don't have such top-level concept in Flink SQL now,
> >> this
> >>>> will
> >>>>>> further confuse users.
> >>>>>> In Flink SQL, all the things are STREAM, the differences are
> >> whether
> >>> it
> >>>>> is
> >>>>>> bounded or unbounded,
> >>>>>>    whether it is insert-only or changelog.
> >>>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>> Jark
> >>>>>>
> >>>>>>
> >>>>>> On Thu, 22 Oct 2020 at 20:39, Timo Walther <twal...@apache.org>
> >>> wrote:
> >>>>>>
> >>>>>>> Hi Shengkai, Hi Jark,
> >>>>>>>
> >>>>>>> thanks for this great proposal. It is time to finally connect the
> >>>>>>> changelog processor with a compacted Kafka topic.
> >>>>>>>
> >>>>>>> "The operator will produce INSERT rows, or additionally generate
> >>>>>>> UPDATE_BEFORE rows for the previous image, or produce DELETE rows
> >>> with
> >>>>>>> all columns filled with values."
> >>>>>>>
> >>>>>>> Could you elaborate a bit on the implementation details in the
> >> FLIP?
> >>>> How
> >>>>>>> are UPDATE_BEFOREs are generated. How much state is required to
> >>>> perform
> >>>>>>> this operation.
> >>>>>>>
> >>>>>>>    From a conceptual and semantical point of view, I'm fine with
> >> the
> >>>>>>> proposal. But I would like to share my opinion about how we expose
> >>>> this
> >>>>>>> feature:
> >>>>>>>
> >>>>>>> ktable vs kafka-compacted
> >>>>>>>
> >>>>>>> I'm against having an additional connector like `ktable` or
> >>>>>>> `kafka-compacted`. We recently simplified the table options to a
> >>>> minimum
> >>>>>>> amount of characters to be as concise as possible in the DDL.
> >>>> Therefore,
> >>>>>>> I would keep the `connector=kafka` and introduce an additional
> >>> option.
> >>>>>>> Because a user wants to read "from Kafka". And the "how" should be
> >>>>>>> determined in the lower options.
> >>>>>>>
> >>>>>>> When people read `connector=ktable` they might not know that this
> >> is
> >>>>>>> Kafka. Or they wonder where `kstream` is?
> >>>>>>>
> >>>>>>> When people read `connector=kafka-compacted` they might not know
> >>> that
> >>>> it
> >>>>>>> has ktable semantics. You don't need to enable log compaction in
> >>> order
> >>>>>>> to use a KTable as far as I know. Log compaction and table
> >> semantics
> >>>> are
> >>>>>>> orthogonal topics.
> >>>>>>>
> >>>>>>> In the end we will need 3 types of information when declaring a
> >>> Kafka
> >>>>>>> connector:
> >>>>>>>
> >>>>>>> CREATE TABLE ... WITH (
> >>>>>>>      connector=kafka        -- Some information about the connector
> >>>>>>>      end-offset = XXXX      -- Some information about the
> >> boundedness
> >>>>>>>      model = table/stream   -- Some information about
> >> interpretation
> >>>>>>> )
> >>>>>>>
> >>>>>>>
> >>>>>>> We can still apply all the constraints mentioned in the FLIP. When
> >>>>>>> `model` is set to `table`.
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Timo
> >>>>>>>
> >>>>>>>
> >>>>>>> On 21.10.20 14:19, Jark Wu wrote:
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> IMO, if we are going to mix them in one connector,
> >>>>>>>> 1) either users need to set some options to a specific value
> >>>>> explicitly,
> >>>>>>>> e.g. "scan.startup.mode=earliest", "sink.partitioner=hash", etc..
> >>>>>>>> This makes the connector awkward to use. Users may face to fix
> >>>> options
> >>>>>>> one
> >>>>>>>> by one according to the exception.
> >>>>>>>> Besides, in the future, it is still possible to use
> >>>>>>>> "sink.partitioner=fixed" (reduce network cost) if users are aware
> >>> of
> >>>>>>>> the partition routing,
> >>>>>>>> however, it's error-prone to have "fixed" as default for
> >> compacted
> >>>>> mode.
> >>>>>>>>
> >>>>>>>> 2) or make those options a different default value when
> >>>>> "compacted=true".
> >>>>>>>> This would be more confusing and unpredictable if the default
> >> value
> >>>> of
> >>>>>>>> options will change according to other options.
> >>>>>>>> What happens if we have a third mode in the future?
> >>>>>>>>
> >>>>>>>> In terms of usage and options, it's very different from the
> >>>>>>>> original "kafka" connector.
> >>>>>>>> It would be more handy to use and less fallible if separating
> >> them
> >>>> into
> >>>>>>> two
> >>>>>>>> connectors.
> >>>>>>>> In the implementation layer, we can reuse code as much as
> >> possible.
> >>>>>>>>
> >>>>>>>> Therefore, I'm still +1 to have a new connector.
> >>>>>>>> The "kafka-compacted" name sounds good to me.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Jark
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf <
> >> kna...@apache.org>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Kurt, Hi Shengkai,
> >>>>>>>>>
> >>>>>>>>> thanks for answering my questions and the additional
> >>>> clarifications. I
> >>>>>>>>> don't have a strong opinion on whether to extend the "kafka"
> >>>> connector
> >>>>>>> or
> >>>>>>>>> to introduce a new connector. So, from my perspective feel free
> >> to
> >>>> go
> >>>>>>> with
> >>>>>>>>> a separate connector. If we do introduce a new connector I
> >>> wouldn't
> >>>>>>> call it
> >>>>>>>>> "ktable" for aforementioned reasons (In addition, we might
> >> suggest
> >>>>> that
> >>>>>>>>> there is also a "kstreams" connector for symmetry reasons). I
> >>> don't
> >>>>>>> have a
> >>>>>>>>> good alternative name, though, maybe "kafka-compacted" or
> >>>>>>>>> "compacted-kafka".
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>>
> >>>>>>>>> Konstantin
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Wed, Oct 21, 2020 at 4:43 AM Kurt Young <ykt...@gmail.com>
> >>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi all,
> >>>>>>>>>>
> >>>>>>>>>> I want to describe the discussion process which drove us to
> >> have
> >>>> such
> >>>>>>>>>> conclusion, this might make some of
> >>>>>>>>>> the design choices easier to understand and keep everyone on
> >> the
> >>>> same
> >>>>>>>>> page.
> >>>>>>>>>>
> >>>>>>>>>> Back to the motivation, what functionality do we want to
> >> provide
> >>> in
> >>>>> the
> >>>>>>>>>> first place? We got a lot of feedback and
> >>>>>>>>>> questions from mailing lists that people want to write
> >>>>> Not-Insert-Only
> >>>>>>>>>> messages into kafka. They might be
> >>>>>>>>>> intentional or by accident, e.g. wrote an non-windowed
> >> aggregate
> >>>>> query
> >>>>>>> or
> >>>>>>>>>> non-windowed left outer join. And
> >>>>>>>>>> some users from KSQL world also asked about why Flink didn't
> >>>> leverage
> >>>>>>> the
> >>>>>>>>>> Key concept of every kafka topic
> >>>>>>>>>> and make kafka as a dynamic changing keyed table.
> >>>>>>>>>>
> >>>>>>>>>> To work with kafka better, we were thinking to extend the
> >>>>> functionality
> >>>>>>>>> of
> >>>>>>>>>> the current kafka connector by letting it
> >>>>>>>>>> accept updates and deletions. But due to the limitation of
> >> kafka,
> >>>> the
> >>>>>>>>>> update has to be "update by key", aka a table
> >>>>>>>>>> with primary key.
> >>>>>>>>>>
> >>>>>>>>>> This introduces a couple of conflicts with current kafka
> >> table's
> >>>>>>> options:
> >>>>>>>>>> 1. key.fields: as said above, we need the kafka table to have
> >> the
> >>>>>>> primary
> >>>>>>>>>> key constraint. And users can also configure
> >>>>>>>>>> key.fields freely, this might cause friction. (Sure we can do
> >>> some
> >>>>>>> sanity
> >>>>>>>>>> check on this but it also creates friction.)
> >>>>>>>>>> 2. sink.partitioner: to make the semantics right, we need to
> >> make
> >>>>> sure
> >>>>>>>>> all
> >>>>>>>>>> the updates on the same key are written to
> >>>>>>>>>> the same kafka partition, such we should force to use a hash by
> >>> key
> >>>>>>>>>> partition inside such table. Again, this has conflicts
> >>>>>>>>>> and creates friction with current user options.
> >>>>>>>>>>
> >>>>>>>>>> The above things are solvable, though not perfect or most user
> >>>>>>> friendly.
> >>>>>>>>>>
> >>>>>>>>>> Let's take a look at the reading side. The keyed kafka table
> >>>> contains
> >>>>>>> two
> >>>>>>>>>> kinds of messages: upsert or deletion. What upsert
> >>>>>>>>>> means is "If the key doesn't exist yet, it's an insert record.
> >>>>>>> Otherwise
> >>>>>>>>>> it's an update record". For the sake of correctness or
> >>>>>>>>>> simplicity, the Flink SQL engine also needs such information.
> >> If
> >>> we
> >>>>>>>>>> interpret all messages to "update record", some queries or
> >>>>>>>>>> operators may not work properly. It's weird to see an update
> >>> record
> >>>>> but
> >>>>>>>>> you
> >>>>>>>>>> haven't seen the insert record before.
> >>>>>>>>>>
> >>>>>>>>>> So what Flink should do is after reading out the records from
> >>> such
> >>>>>>> table,
> >>>>>>>>>> it needs to create a state to record which messages have
> >>>>>>>>>> been seen and then generate the correct row type
> >> correspondingly.
> >>>>> This
> >>>>>>>>> kind
> >>>>>>>>>> of couples the state and the data of the message
> >>>>>>>>>> queue, and it also creates conflicts with current kafka
> >>> connector.
> >>>>>>>>>>
> >>>>>>>>>> Think about if users suspend a running job (which contains some
> >>>>> reading
> >>>>>>>>>> state now), and then change the start offset of the reader.
> >>>>>>>>>> By changing the reading offset, it actually change the whole
> >>> story
> >>>> of
> >>>>>>>>>> "which records should be insert messages and which records
> >>>>>>>>>> should be update messages). And it will also make Flink to deal
> >>>> with
> >>>>>>>>>> another weird situation that it might receive a deletion
> >>>>>>>>>> on a non existing message.
> >>>>>>>>>>
> >>>>>>>>>> We were unsatisfied with all the frictions and conflicts it
> >> will
> >>>>> create
> >>>>>>>>> if
> >>>>>>>>>> we enable the "upsert & deletion" support to the current kafka
> >>>>>>>>>> connector. And later we begin to realize that we shouldn't
> >> treat
> >>> it
> >>>>> as
> >>>>>>> a
> >>>>>>>>>> normal message queue, but should treat it as a changing keyed
> >>>>>>>>>> table. We should be able to always get the whole data of such
> >>> table
> >>>>> (by
> >>>>>>>>>> disabling the start offset option) and we can also read the
> >>>>>>>>>> changelog out of such table. It's like a HBase table with
> >> binlog
> >>>>>>> support
> >>>>>>>>>> but doesn't have random access capability (which can be
> >> fulfilled
> >>>>>>>>>> by Flink's state).
> >>>>>>>>>>
> >>>>>>>>>> So our intention was instead of telling and persuading users
> >> what
> >>>>> kind
> >>>>>>> of
> >>>>>>>>>> options they should or should not use by extending
> >>>>>>>>>> current kafka connector when enable upsert support, we are
> >>> actually
> >>>>>>>>> create
> >>>>>>>>>> a whole new and different connector that has total
> >>>>>>>>>> different abstractions in SQL layer, and should be treated
> >>> totally
> >>>>>>>>>> different with current kafka connector.
> >>>>>>>>>>
> >>>>>>>>>> Hope this can clarify some of the concerns.
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Kurt
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang <
> >> fskm...@gmail.com
> >>>>
> >>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi devs,
> >>>>>>>>>>>
> >>>>>>>>>>> As many people are still confused about the difference option
> >>>>>>>>> behaviours
> >>>>>>>>>>> between the Kafka connector and KTable connector, Jark and I
> >>> list
> >>>>> the
> >>>>>>>>>>> differences in the doc[1].
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Shengkai
> >>>>>>>>>>>
> >>>>>>>>>>> [1]
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit
> >>>>>>>>>>>
> >>>>>>>>>>> Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二 下午12:05写道:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Konstantin,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for your reply.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> It uses the "kafka" connector and does not specify a primary
> >>>> key.
> >>>>>>>>>>>> The dimensional table `users` is a ktable connector and we
> >> can
> >>>>>>>>> specify
> >>>>>>>>>>> the
> >>>>>>>>>>>> pk on the KTable.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Will it possible to use a "ktable" as a dimensional table in
> >>>>>>>>> FLIP-132
> >>>>>>>>>>>> Yes. We can specify the watermark on the KTable and it can be
> >>>> used
> >>>>>>>>> as a
> >>>>>>>>>>>> dimension table in temporal join.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Introduce a new connector vs introduce a new property
> >>>>>>>>>>>> The main reason behind is that the KTable connector almost
> >> has
> >>> no
> >>>>>>>>>> common
> >>>>>>>>>>>> options with the Kafka connector. The options that can be
> >>> reused
> >>>> by
> >>>>>>>>>>> KTable
> >>>>>>>>>>>> connectors are 'topic', 'properties.bootstrap.servers' and
> >>>>>>>>>>>> 'value.fields-include' . We can't set cdc format for
> >>> 'key.format'
> >>>>> and
> >>>>>>>>>>>> 'value.format' in KTable connector now, which is  available
> >> in
> >>>>> Kafka
> >>>>>>>>>>>> connector. Considering the difference between the options we
> >>> can
> >>>>> use,
> >>>>>>>>>>> it's
> >>>>>>>>>>>> more suitable to introduce an another connector rather than a
> >>>>>>>>> property.
> >>>>>>>>>>>>
> >>>>>>>>>>>> We are also fine to use "compacted-kafka" as the name of the
> >>> new
> >>>>>>>>>>>> connector. What do you think?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best,
> >>>>>>>>>>>> Shengkai
> >>>>>>>>>>>>
> >>>>>>>>>>>> Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一
> >> 下午10:15写道:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Shengkai,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thank you for driving this effort. I believe this a very
> >>>> important
> >>>>>>>>>>> feature
> >>>>>>>>>>>>> for many users who use Kafka and Flink SQL together. A few
> >>>>> questions
> >>>>>>>>>> and
> >>>>>>>>>>>>> thoughts:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> * Is your example "Use KTable as a reference/dimension
> >> table"
> >>>>>>>>> correct?
> >>>>>>>>>>> It
> >>>>>>>>>>>>> uses the "kafka" connector and does not specify a primary
> >> key.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> * Will it be possible to use a "ktable" table directly as a
> >>>>>>>>>> dimensional
> >>>>>>>>>>>>> table in temporal join (*based on event time*) (FLIP-132)?
> >>> This
> >>>> is
> >>>>>>>>> not
> >>>>>>>>>>>>> completely clear to me from the FLIP.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> * I'd personally prefer not to introduce a new connector and
> >>>>> instead
> >>>>>>>>>> to
> >>>>>>>>>>>>> extend the Kafka connector. We could add an additional
> >>> property
> >>>>>>>>>>>>> "compacted"
> >>>>>>>>>>>>> = "true"|"false". If it is set to "true", we can add
> >>> additional
> >>>>>>>>>>> validation
> >>>>>>>>>>>>> logic (e.g. "scan.startup.mode" can not be set, primary key
> >>>>>>>>> required,
> >>>>>>>>>>>>> etc.). If we stick to a separate connector I'd not call it
> >>>>> "ktable",
> >>>>>>>>>> but
> >>>>>>>>>>>>> rather "compacted-kafka" or similar. KTable seems to carry
> >>> more
> >>>>>>>>>> implicit
> >>>>>>>>>>>>> meaning than we want to imply here.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> * I agree that this is not a bounded source. If we want to
> >>>>> support a
> >>>>>>>>>>>>> bounded mode, this is an orthogonal concern that also
> >> applies
> >>> to
> >>>>>>>>> other
> >>>>>>>>>>>>> unbounded sources.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Konstantin
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <imj...@gmail.com>
> >>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Danny,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> First of all, we didn't introduce any concepts from KSQL
> >>> (e.g.
> >>>>>>>>>> Stream
> >>>>>>>>>>> vs
> >>>>>>>>>>>>>> Table notion).
> >>>>>>>>>>>>>> This new connector will produce a changelog stream, so it's
> >>>> still
> >>>>>>>>> a
> >>>>>>>>>>>>> dynamic
> >>>>>>>>>>>>>> table and doesn't conflict with Flink core concepts.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The "ktable" is just a connector name, we can also call it
> >>>>>>>>>>>>>> "compacted-kafka" or something else.
> >>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can migrate
> >> to
> >>>>>>>>> Flink
> >>>>>>>>>>> SQL
> >>>>>>>>>>>>>> easily.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regarding to why introducing a new connector vs a new
> >>> property
> >>>> in
> >>>>>>>>>>>>> existing
> >>>>>>>>>>>>>> kafka connector:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I think the main reason is that we want to have a clear
> >>>>> separation
> >>>>>>>>>> for
> >>>>>>>>>>>>> such
> >>>>>>>>>>>>>> two use cases, because they are very different.
> >>>>>>>>>>>>>> We also listed reasons in the FLIP, including:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1) It's hard to explain what's the behavior when users
> >>> specify
> >>>>> the
> >>>>>>>>>>> start
> >>>>>>>>>>>>>> offset from a middle position (e.g. how to process non
> >> exist
> >>>>>>>>> delete
> >>>>>>>>>>>>>> events).
> >>>>>>>>>>>>>>        It's dangerous if users do that. So we don't provide
> >>> the
> >>>>>>>>> offset
> >>>>>>>>>>>>> option
> >>>>>>>>>>>>>> in the new connector at the moment.
> >>>>>>>>>>>>>> 2) It's a different perspective/abstraction on the same
> >> kafka
> >>>>>>>>> topic
> >>>>>>>>>>>>> (append
> >>>>>>>>>>>>>> vs. upsert). It would be easier to understand if we can
> >>>> separate
> >>>>>>>>>> them
> >>>>>>>>>>>>>>        instead of mixing them in one connector. The new
> >>>> connector
> >>>>>>>>>>> requires
> >>>>>>>>>>>>>> hash sink partitioner, primary key declared, regular
> >> format.
> >>>>>>>>>>>>>>        If we mix them in one connector, it might be
> >> confusing
> >>>> how
> >>>>> to
> >>>>>>>>>> use
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> options correctly.
> >>>>>>>>>>>>>> 3) The semantic of the KTable connector is just the same as
> >>>>> KTable
> >>>>>>>>>> in
> >>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and KSQL users.
> >>>>>>>>>>>>>>        We have seen several questions in the mailing list
> >>> asking
> >>>>> how
> >>>>>>>>> to
> >>>>>>>>>>>>> model
> >>>>>>>>>>>>>> a KTable and how to join a KTable in Flink SQL.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 19:53, Jark Wu <imj...@gmail.com>
> >>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi Jingsong,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> As the FLIP describes, "KTable connector produces a
> >>> changelog
> >>>>>>>>>>> stream,
> >>>>>>>>>>>>>>> where each data record represents an update or delete
> >>> event.".
> >>>>>>>>>>>>>>> Therefore, a ktable source is an unbounded stream source.
> >>>>>>>>>> Selecting
> >>>>>>>>>>> a
> >>>>>>>>>>>>>>> ktable source is similar to selecting a kafka source with
> >>>>>>>>>>>>> debezium-json
> >>>>>>>>>>>>>>> format
> >>>>>>>>>>>>>>> that it never ends and the results are continuously
> >> updated.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> It's possible to have a bounded ktable source in the
> >> future,
> >>>> for
> >>>>>>>>>>>>> example,
> >>>>>>>>>>>>>>> add an option 'bounded=true' or 'end-offset=xxx'.
> >>>>>>>>>>>>>>> In this way, the ktable will produce a bounded changelog
> >>>> stream.
> >>>>>>>>>>>>>>> So I think this can be a compatible feature in the future.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I don't think we should associate with ksql related
> >>> concepts.
> >>>>>>>>>>>>> Actually,
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>> didn't introduce any concepts from KSQL (e.g. Stream vs
> >>> Table
> >>>>>>>>>>> notion).
> >>>>>>>>>>>>>>> The "ktable" is just a connector name, we can also call it
> >>>>>>>>>>>>>>> "compacted-kafka" or something else.
> >>>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can migrate
> >>> to
> >>>>>>>>>> Flink
> >>>>>>>>>>>>> SQL
> >>>>>>>>>>>>>>> easily.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Regarding the "value.fields-include", this is an option
> >>>>>>>>> introduced
> >>>>>>>>>>> in
> >>>>>>>>>>>>>>> FLIP-107 for Kafka connector.
> >>>>>>>>>>>>>>> I think we should keep the same behavior with the Kafka
> >>>>>>>>> connector.
> >>>>>>>>>>> I'm
> >>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>> sure what's the default behavior of KSQL.
> >>>>>>>>>>>>>>> But I guess it also stores the keys in value from this
> >>> example
> >>>>>>>>>> docs
> >>>>>>>>>>>>> (see
> >>>>>>>>>>>>>>> the "users_original" table) [1].
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> [1]:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 18:17, Danny Chan <
> >>>> yuzhao....@gmail.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The concept seems conflicts with the Flink abstraction
> >>>> “dynamic
> >>>>>>>>>>>>> table”,
> >>>>>>>>>>>>>>>> in Flink we see both “stream” and “table” as a dynamic
> >>> table,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I think we should make clear first how to express stream
> >>> and
> >>>>>>>>>> table
> >>>>>>>>>>>>>>>> specific features on one “dynamic table”,
> >>>>>>>>>>>>>>>> it is more natural for KSQL because KSQL takes stream and
> >>>> table
> >>>>>>>>>> as
> >>>>>>>>>>>>>>>> different abstractions for representing collections. In
> >>> KSQL,
> >>>>>>>>>> only
> >>>>>>>>>>>>>> table is
> >>>>>>>>>>>>>>>> mutable and can have a primary key.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Does this connector belongs to the “table” scope or
> >>> “stream”
> >>>>>>>>>> scope
> >>>>>>>>>>> ?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Some of the concepts (such as the primary key on stream)
> >>>> should
> >>>>>>>>>> be
> >>>>>>>>>>>>>>>> suitable for all the connectors, not just Kafka,
> >> Shouldn’t
> >>>> this
> >>>>>>>>>> be
> >>>>>>>>>>> an
> >>>>>>>>>>>>>>>> extension of existing Kafka connector instead of a
> >> totally
> >>>> new
> >>>>>>>>>>>>>> connector ?
> >>>>>>>>>>>>>>>> What about the other connectors ?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Because this touches the core abstraction of Flink, we
> >>> better
> >>>>>>>>>> have
> >>>>>>>>>>> a
> >>>>>>>>>>>>>>>> top-down overall design, following the KSQL directly is
> >> not
> >>>> the
> >>>>>>>>>>>>> answer.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> P.S. For the source
> >>>>>>>>>>>>>>>>> Shouldn’t this be an extension of existing Kafka
> >> connector
> >>>>>>>>>>> instead
> >>>>>>>>>>>>> of
> >>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>> totally new connector ?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> How could we achieve that (e.g. set up the parallelism
> >>>>>>>>>> correctly) ?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>> Danny Chan
> >>>>>>>>>>>>>>>> 在 2020年10月19日 +0800 PM5:17,Jingsong Li <
> >>>> jingsongl...@gmail.com
> >>>>>>>>>>>> ,写道:
> >>>>>>>>>>>>>>>>> Thanks Shengkai for your proposal.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> +1 for this feature.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Future Work: Support bounded KTable source
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I don't think it should be a future work, I think it is
> >>> one
> >>>>>>>>> of
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> important concepts of this FLIP. We need to understand
> >> it
> >>>>>>>>> now.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Intuitively, a ktable in my opinion is a bounded table
> >>>> rather
> >>>>>>>>>>> than
> >>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>> stream, so select should produce a bounded table by
> >>> default.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I think we can list Kafka related knowledge, because the
> >>>> word
> >>>>>>>>>>>>> `ktable`
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> easy to associate with ksql related concepts. (If
> >>> possible,
> >>>>>>>>>> it's
> >>>>>>>>>>>>>> better
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> unify with it)
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> value.fields-include
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> What about the default behavior of KSQL?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>> Jingsong
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang <
> >>>>>>>>>> fskm...@gmail.com
> >>>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi, devs.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Jark and I want to start a new FLIP to introduce the
> >>> KTable
> >>>>>>>>>>>>>>>> connector. The
> >>>>>>>>>>>>>>>>>> KTable is a shortcut of "Kafka Table", it also has the
> >>> same
> >>>>>>>>>>>>>> semantics
> >>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>> the KTable notion in Kafka Stream.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> FLIP-149:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Currently many users have expressed their needs for the
> >>>>>>>>>> upsert
> >>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>> mail lists and issues. The KTable connector has several
> >>>>>>>>>>> benefits
> >>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>> users:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 1. Users are able to interpret a compacted Kafka Topic
> >> as
> >>>>>>>>> an
> >>>>>>>>>>>>> upsert
> >>>>>>>>>>>>>>>> stream
> >>>>>>>>>>>>>>>>>> in Apache Flink. And also be able to write a changelog
> >>>>>>>>> stream
> >>>>>>>>>>> to
> >>>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>>>> (into a compacted topic).
> >>>>>>>>>>>>>>>>>> 2. As a part of the real time pipeline, store join or
> >>>>>>>>>> aggregate
> >>>>>>>>>>>>>>>> result (may
> >>>>>>>>>>>>>>>>>> contain updates) into a Kafka topic for further
> >>>>>>>>> calculation;
> >>>>>>>>>>>>>>>>>> 3. The semantic of the KTable connector is just the
> >> same
> >>> as
> >>>>>>>>>>>>> KTable
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and KSQL
> >>> users.
> >>>>>>>>>> We
> >>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>> seen
> >>>>>>>>>>>>>>>>>> several questions in the mailing list asking how to
> >>> model a
> >>>>>>>>>>>>> KTable
> >>>>>>>>>>>>>>>> and how
> >>>>>>>>>>>>>>>>>> to join a KTable in Flink SQL.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> We hope it can expand the usage of the Flink with
> >> Kafka.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I'm looking forward to your feedback.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>> Shengkai
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>> Best, Jingsong Lee
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> --
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Konstantin Knauf
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> https://twitter.com/snntrable
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> https://github.com/knaufk
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>>
> >>>>>>>>> Konstantin Knauf
> >>>>>>>>>
> >>>>>>>>> https://twitter.com/snntrable
> >>>>>>>>>
> >>>>>>>>> https://github.com/knaufk
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>> --
> >>>>
> >>>> Seth Wiesman | Solutions Architect
> >>>>
> >>>> +1 314 387 1463
> >>>>
> >>>> <https://www.ververica.com/>
> >>>>
> >>>> Follow us @VervericaData
> >>>>
> >>>> --
> >>>>
> >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> >>>> Conference
> >>>>
> >>>> Stream Processing | Event Driven | Real Time
> >>>>
> >>>
> >>
> >
>
>

-- 
Best, Jingsong Lee

Reply via email to