The `kafka-cdc` looks good to me. We can even give options to indicate whether to turn on compact, because compact is just an optimization?
- ktable let me think about KSQL. - kafka-compacted it is not just compacted, more than that, it still has the ability of CDC - upsert-kafka , upsert is back, and I don't really want to see it again since we have CDC Best, Jingsong On Fri, Oct 23, 2020 at 2:21 AM Timo Walther <twal...@apache.org> wrote: > Hi Jark, > > I would be fine with `connector=upsert-kafka`. Another idea would be to > align the name to other available Flink connectors [1]: > > `connector=kafka-cdc`. > > Regards, > Timo > > [1] https://github.com/ververica/flink-cdc-connectors > > On 22.10.20 17:17, Jark Wu wrote: > > Another name is "connector=upsert-kafka', I think this can solve Timo's > > concern on the "compacted" word. > > > > Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify such > kafka > > sources. > > I think "upsert" is a well-known terminology widely used in many systems > > and matches the > > behavior of how we handle the kafka messages. > > > > What do you think? > > > > Best, > > Jark > > > > [1]: > > > https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic > > > > > > > > > > On Thu, 22 Oct 2020 at 22:53, Kurt Young <ykt...@gmail.com> wrote: > > > >> Good validation messages can't solve the broken user experience, > especially > >> that > >> such update mode option will implicitly make half of current kafka > options > >> invalid or doesn't > >> make sense. > >> > >> Best, > >> Kurt > >> > >> > >> On Thu, Oct 22, 2020 at 10:31 PM Jark Wu <imj...@gmail.com> wrote: > >> > >>> Hi Timo, Seth, > >>> > >>> The default value "inserting" of "mode" might be not suitable, > >>> because "debezium-json" emits changelog messages which include updates. > >>> > >>> On Thu, 22 Oct 2020 at 22:10, Seth Wiesman <s...@ververica.com> wrote: > >>> > >>>> +1 for supporting upsert results into Kafka. > >>>> > >>>> I have no comments on the implementation details. > >>>> > >>>> As far as configuration goes, I tend to favor Timo's option where we > >> add > >>> a > >>>> "mode" property to the existing Kafka table with default value > >>> "inserting". > >>>> If the mode is set to "updating" then the validation changes to the > new > >>>> requirements. I personally find it more intuitive than a seperate > >>>> connector, my fear is users won't understand its the same physical > >> kafka > >>>> sink under the hood and it will lead to other confusion like does it > >>> offer > >>>> the same persistence guarantees? I think we are capable of adding good > >>>> valdiation messaging that solves Jark and Kurts concerns. > >>>> > >>>> > >>>> On Thu, Oct 22, 2020 at 8:51 AM Timo Walther <twal...@apache.org> > >> wrote: > >>>> > >>>>> Hi Jark, > >>>>> > >>>>> "calling it "kafka-compacted" can even remind users to enable log > >>>>> compaction" > >>>>> > >>>>> But sometimes users like to store a lineage of changes in their > >> topics. > >>>>> Indepent of any ktable/kstream interpretation. > >>>>> > >>>>> I let the majority decide on this topic to not further block this > >>>>> effort. But we might find a better name like: > >>>>> > >>>>> connector = kafka > >>>>> mode = updating/inserting > >>>>> > >>>>> OR > >>>>> > >>>>> connector = kafka-updating > >>>>> > >>>>> ... > >>>>> > >>>>> Regards, > >>>>> Timo > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> On 22.10.20 15:24, Jark Wu wrote: > >>>>>> Hi Timo, > >>>>>> > >>>>>> Thanks for your opinions. > >>>>>> > >>>>>> 1) Implementation > >>>>>> We will have an stateful operator to generate INSERT and > >>> UPDATE_BEFORE. > >>>>>> This operator is keyby-ed (primary key as the shuffle key) after > >> the > >>>>> source > >>>>>> operator. > >>>>>> The implementation of this operator is very similar to the existing > >>>>>> `DeduplicateKeepLastRowFunction`. > >>>>>> The operator will register a value state using the primary key > >> fields > >>>> as > >>>>>> keys. > >>>>>> When the value state is empty under current key, we will emit > >> INSERT > >>>> for > >>>>>> the input row. > >>>>>> When the value state is not empty under current key, we will emit > >>>>>> UPDATE_BEFORE using the row in state, > >>>>>> and emit UPDATE_AFTER using the input row. > >>>>>> When the input row is DELETE, we will clear state and emit DELETE > >>> row. > >>>>>> > >>>>>> 2) new option vs new connector > >>>>>>> We recently simplified the table options to a minimum amount of > >>>>>> characters to be as concise as possible in the DDL. > >>>>>> I think this is the reason why we want to introduce a new > >> connector, > >>>>>> because we can simplify the options in DDL. > >>>>>> For example, if using a new option, the DDL may look like this: > >>>>>> > >>>>>> CREATE TABLE users ( > >>>>>> user_id BIGINT, > >>>>>> user_name STRING, > >>>>>> user_level STRING, > >>>>>> region STRING, > >>>>>> PRIMARY KEY (user_id) NOT ENFORCED > >>>>>> ) WITH ( > >>>>>> 'connector' = 'kafka', > >>>>>> 'model' = 'table', > >>>>>> 'topic' = 'pageviews_per_region', > >>>>>> 'properties.bootstrap.servers' = '...', > >>>>>> 'properties.group.id' = 'testGroup', > >>>>>> 'scan.startup.mode' = 'earliest', > >>>>>> 'key.format' = 'csv', > >>>>>> 'key.fields' = 'user_id', > >>>>>> 'value.format' = 'avro', > >>>>>> 'sink.partitioner' = 'hash' > >>>>>> ); > >>>>>> > >>>>>> If using a new connector, we can have a different default value for > >>> the > >>>>>> options and remove unnecessary options, > >>>>>> the DDL can look like this which is much more concise: > >>>>>> > >>>>>> CREATE TABLE pageviews_per_region ( > >>>>>> user_id BIGINT, > >>>>>> user_name STRING, > >>>>>> user_level STRING, > >>>>>> region STRING, > >>>>>> PRIMARY KEY (user_id) NOT ENFORCED > >>>>>> ) WITH ( > >>>>>> 'connector' = 'kafka-compacted', > >>>>>> 'topic' = 'pageviews_per_region', > >>>>>> 'properties.bootstrap.servers' = '...', > >>>>>> 'key.format' = 'csv', > >>>>>> 'value.format' = 'avro' > >>>>>> ); > >>>>>> > >>>>>>> When people read `connector=kafka-compacted` they might not know > >>> that > >>>> it > >>>>>>> has ktable semantics. You don't need to enable log compaction in > >>> order > >>>>>>> to use a KTable as far as I know. > >>>>>> We don't need to let users know it has ktable semantics, as > >>> Konstantin > >>>>>> mentioned this may carry more implicit > >>>>>> meaning than we want to imply here. I agree users don't need to > >>> enable > >>>>> log > >>>>>> compaction, but from the production perspective, > >>>>>> log compaction should always be enabled if it is used in this > >>> purpose. > >>>>>> Calling it "kafka-compacted" can even remind users to enable log > >>>>> compaction. > >>>>>> > >>>>>> I don't agree to introduce "model = table/stream" option, or > >>>>>> "connector=kafka-table", > >>>>>> because this means we are introducing Table vs Stream concept from > >>>> KSQL. > >>>>>> However, we don't have such top-level concept in Flink SQL now, > >> this > >>>> will > >>>>>> further confuse users. > >>>>>> In Flink SQL, all the things are STREAM, the differences are > >> whether > >>> it > >>>>> is > >>>>>> bounded or unbounded, > >>>>>> whether it is insert-only or changelog. > >>>>>> > >>>>>> > >>>>>> Best, > >>>>>> Jark > >>>>>> > >>>>>> > >>>>>> On Thu, 22 Oct 2020 at 20:39, Timo Walther <twal...@apache.org> > >>> wrote: > >>>>>> > >>>>>>> Hi Shengkai, Hi Jark, > >>>>>>> > >>>>>>> thanks for this great proposal. It is time to finally connect the > >>>>>>> changelog processor with a compacted Kafka topic. > >>>>>>> > >>>>>>> "The operator will produce INSERT rows, or additionally generate > >>>>>>> UPDATE_BEFORE rows for the previous image, or produce DELETE rows > >>> with > >>>>>>> all columns filled with values." > >>>>>>> > >>>>>>> Could you elaborate a bit on the implementation details in the > >> FLIP? > >>>> How > >>>>>>> are UPDATE_BEFOREs are generated. How much state is required to > >>>> perform > >>>>>>> this operation. > >>>>>>> > >>>>>>> From a conceptual and semantical point of view, I'm fine with > >> the > >>>>>>> proposal. But I would like to share my opinion about how we expose > >>>> this > >>>>>>> feature: > >>>>>>> > >>>>>>> ktable vs kafka-compacted > >>>>>>> > >>>>>>> I'm against having an additional connector like `ktable` or > >>>>>>> `kafka-compacted`. We recently simplified the table options to a > >>>> minimum > >>>>>>> amount of characters to be as concise as possible in the DDL. > >>>> Therefore, > >>>>>>> I would keep the `connector=kafka` and introduce an additional > >>> option. > >>>>>>> Because a user wants to read "from Kafka". And the "how" should be > >>>>>>> determined in the lower options. > >>>>>>> > >>>>>>> When people read `connector=ktable` they might not know that this > >> is > >>>>>>> Kafka. Or they wonder where `kstream` is? > >>>>>>> > >>>>>>> When people read `connector=kafka-compacted` they might not know > >>> that > >>>> it > >>>>>>> has ktable semantics. You don't need to enable log compaction in > >>> order > >>>>>>> to use a KTable as far as I know. Log compaction and table > >> semantics > >>>> are > >>>>>>> orthogonal topics. > >>>>>>> > >>>>>>> In the end we will need 3 types of information when declaring a > >>> Kafka > >>>>>>> connector: > >>>>>>> > >>>>>>> CREATE TABLE ... WITH ( > >>>>>>> connector=kafka -- Some information about the connector > >>>>>>> end-offset = XXXX -- Some information about the > >> boundedness > >>>>>>> model = table/stream -- Some information about > >> interpretation > >>>>>>> ) > >>>>>>> > >>>>>>> > >>>>>>> We can still apply all the constraints mentioned in the FLIP. When > >>>>>>> `model` is set to `table`. > >>>>>>> > >>>>>>> What do you think? > >>>>>>> > >>>>>>> Regards, > >>>>>>> Timo > >>>>>>> > >>>>>>> > >>>>>>> On 21.10.20 14:19, Jark Wu wrote: > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> IMO, if we are going to mix them in one connector, > >>>>>>>> 1) either users need to set some options to a specific value > >>>>> explicitly, > >>>>>>>> e.g. "scan.startup.mode=earliest", "sink.partitioner=hash", etc.. > >>>>>>>> This makes the connector awkward to use. Users may face to fix > >>>> options > >>>>>>> one > >>>>>>>> by one according to the exception. > >>>>>>>> Besides, in the future, it is still possible to use > >>>>>>>> "sink.partitioner=fixed" (reduce network cost) if users are aware > >>> of > >>>>>>>> the partition routing, > >>>>>>>> however, it's error-prone to have "fixed" as default for > >> compacted > >>>>> mode. > >>>>>>>> > >>>>>>>> 2) or make those options a different default value when > >>>>> "compacted=true". > >>>>>>>> This would be more confusing and unpredictable if the default > >> value > >>>> of > >>>>>>>> options will change according to other options. > >>>>>>>> What happens if we have a third mode in the future? > >>>>>>>> > >>>>>>>> In terms of usage and options, it's very different from the > >>>>>>>> original "kafka" connector. > >>>>>>>> It would be more handy to use and less fallible if separating > >> them > >>>> into > >>>>>>> two > >>>>>>>> connectors. > >>>>>>>> In the implementation layer, we can reuse code as much as > >> possible. > >>>>>>>> > >>>>>>>> Therefore, I'm still +1 to have a new connector. > >>>>>>>> The "kafka-compacted" name sounds good to me. > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Jark > >>>>>>>> > >>>>>>>> > >>>>>>>> On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf < > >> kna...@apache.org> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Kurt, Hi Shengkai, > >>>>>>>>> > >>>>>>>>> thanks for answering my questions and the additional > >>>> clarifications. I > >>>>>>>>> don't have a strong opinion on whether to extend the "kafka" > >>>> connector > >>>>>>> or > >>>>>>>>> to introduce a new connector. So, from my perspective feel free > >> to > >>>> go > >>>>>>> with > >>>>>>>>> a separate connector. If we do introduce a new connector I > >>> wouldn't > >>>>>>> call it > >>>>>>>>> "ktable" for aforementioned reasons (In addition, we might > >> suggest > >>>>> that > >>>>>>>>> there is also a "kstreams" connector for symmetry reasons). I > >>> don't > >>>>>>> have a > >>>>>>>>> good alternative name, though, maybe "kafka-compacted" or > >>>>>>>>> "compacted-kafka". > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> > >>>>>>>>> Konstantin > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Wed, Oct 21, 2020 at 4:43 AM Kurt Young <ykt...@gmail.com> > >>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi all, > >>>>>>>>>> > >>>>>>>>>> I want to describe the discussion process which drove us to > >> have > >>>> such > >>>>>>>>>> conclusion, this might make some of > >>>>>>>>>> the design choices easier to understand and keep everyone on > >> the > >>>> same > >>>>>>>>> page. > >>>>>>>>>> > >>>>>>>>>> Back to the motivation, what functionality do we want to > >> provide > >>> in > >>>>> the > >>>>>>>>>> first place? We got a lot of feedback and > >>>>>>>>>> questions from mailing lists that people want to write > >>>>> Not-Insert-Only > >>>>>>>>>> messages into kafka. They might be > >>>>>>>>>> intentional or by accident, e.g. wrote an non-windowed > >> aggregate > >>>>> query > >>>>>>> or > >>>>>>>>>> non-windowed left outer join. And > >>>>>>>>>> some users from KSQL world also asked about why Flink didn't > >>>> leverage > >>>>>>> the > >>>>>>>>>> Key concept of every kafka topic > >>>>>>>>>> and make kafka as a dynamic changing keyed table. > >>>>>>>>>> > >>>>>>>>>> To work with kafka better, we were thinking to extend the > >>>>> functionality > >>>>>>>>> of > >>>>>>>>>> the current kafka connector by letting it > >>>>>>>>>> accept updates and deletions. But due to the limitation of > >> kafka, > >>>> the > >>>>>>>>>> update has to be "update by key", aka a table > >>>>>>>>>> with primary key. > >>>>>>>>>> > >>>>>>>>>> This introduces a couple of conflicts with current kafka > >> table's > >>>>>>> options: > >>>>>>>>>> 1. key.fields: as said above, we need the kafka table to have > >> the > >>>>>>> primary > >>>>>>>>>> key constraint. And users can also configure > >>>>>>>>>> key.fields freely, this might cause friction. (Sure we can do > >>> some > >>>>>>> sanity > >>>>>>>>>> check on this but it also creates friction.) > >>>>>>>>>> 2. sink.partitioner: to make the semantics right, we need to > >> make > >>>>> sure > >>>>>>>>> all > >>>>>>>>>> the updates on the same key are written to > >>>>>>>>>> the same kafka partition, such we should force to use a hash by > >>> key > >>>>>>>>>> partition inside such table. Again, this has conflicts > >>>>>>>>>> and creates friction with current user options. > >>>>>>>>>> > >>>>>>>>>> The above things are solvable, though not perfect or most user > >>>>>>> friendly. > >>>>>>>>>> > >>>>>>>>>> Let's take a look at the reading side. The keyed kafka table > >>>> contains > >>>>>>> two > >>>>>>>>>> kinds of messages: upsert or deletion. What upsert > >>>>>>>>>> means is "If the key doesn't exist yet, it's an insert record. > >>>>>>> Otherwise > >>>>>>>>>> it's an update record". For the sake of correctness or > >>>>>>>>>> simplicity, the Flink SQL engine also needs such information. > >> If > >>> we > >>>>>>>>>> interpret all messages to "update record", some queries or > >>>>>>>>>> operators may not work properly. It's weird to see an update > >>> record > >>>>> but > >>>>>>>>> you > >>>>>>>>>> haven't seen the insert record before. > >>>>>>>>>> > >>>>>>>>>> So what Flink should do is after reading out the records from > >>> such > >>>>>>> table, > >>>>>>>>>> it needs to create a state to record which messages have > >>>>>>>>>> been seen and then generate the correct row type > >> correspondingly. > >>>>> This > >>>>>>>>> kind > >>>>>>>>>> of couples the state and the data of the message > >>>>>>>>>> queue, and it also creates conflicts with current kafka > >>> connector. > >>>>>>>>>> > >>>>>>>>>> Think about if users suspend a running job (which contains some > >>>>> reading > >>>>>>>>>> state now), and then change the start offset of the reader. > >>>>>>>>>> By changing the reading offset, it actually change the whole > >>> story > >>>> of > >>>>>>>>>> "which records should be insert messages and which records > >>>>>>>>>> should be update messages). And it will also make Flink to deal > >>>> with > >>>>>>>>>> another weird situation that it might receive a deletion > >>>>>>>>>> on a non existing message. > >>>>>>>>>> > >>>>>>>>>> We were unsatisfied with all the frictions and conflicts it > >> will > >>>>> create > >>>>>>>>> if > >>>>>>>>>> we enable the "upsert & deletion" support to the current kafka > >>>>>>>>>> connector. And later we begin to realize that we shouldn't > >> treat > >>> it > >>>>> as > >>>>>>> a > >>>>>>>>>> normal message queue, but should treat it as a changing keyed > >>>>>>>>>> table. We should be able to always get the whole data of such > >>> table > >>>>> (by > >>>>>>>>>> disabling the start offset option) and we can also read the > >>>>>>>>>> changelog out of such table. It's like a HBase table with > >> binlog > >>>>>>> support > >>>>>>>>>> but doesn't have random access capability (which can be > >> fulfilled > >>>>>>>>>> by Flink's state). > >>>>>>>>>> > >>>>>>>>>> So our intention was instead of telling and persuading users > >> what > >>>>> kind > >>>>>>> of > >>>>>>>>>> options they should or should not use by extending > >>>>>>>>>> current kafka connector when enable upsert support, we are > >>> actually > >>>>>>>>> create > >>>>>>>>>> a whole new and different connector that has total > >>>>>>>>>> different abstractions in SQL layer, and should be treated > >>> totally > >>>>>>>>>> different with current kafka connector. > >>>>>>>>>> > >>>>>>>>>> Hope this can clarify some of the concerns. > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Kurt > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang < > >> fskm...@gmail.com > >>>> > >>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi devs, > >>>>>>>>>>> > >>>>>>>>>>> As many people are still confused about the difference option > >>>>>>>>> behaviours > >>>>>>>>>>> between the Kafka connector and KTable connector, Jark and I > >>> list > >>>>> the > >>>>>>>>>>> differences in the doc[1]. > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Shengkai > >>>>>>>>>>> > >>>>>>>>>>> [1] > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >>> > >> > https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit > >>>>>>>>>>> > >>>>>>>>>>> Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二 下午12:05写道: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Konstantin, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for your reply. > >>>>>>>>>>>> > >>>>>>>>>>>>> It uses the "kafka" connector and does not specify a primary > >>>> key. > >>>>>>>>>>>> The dimensional table `users` is a ktable connector and we > >> can > >>>>>>>>> specify > >>>>>>>>>>> the > >>>>>>>>>>>> pk on the KTable. > >>>>>>>>>>>> > >>>>>>>>>>>>> Will it possible to use a "ktable" as a dimensional table in > >>>>>>>>> FLIP-132 > >>>>>>>>>>>> Yes. We can specify the watermark on the KTable and it can be > >>>> used > >>>>>>>>> as a > >>>>>>>>>>>> dimension table in temporal join. > >>>>>>>>>>>> > >>>>>>>>>>>>> Introduce a new connector vs introduce a new property > >>>>>>>>>>>> The main reason behind is that the KTable connector almost > >> has > >>> no > >>>>>>>>>> common > >>>>>>>>>>>> options with the Kafka connector. The options that can be > >>> reused > >>>> by > >>>>>>>>>>> KTable > >>>>>>>>>>>> connectors are 'topic', 'properties.bootstrap.servers' and > >>>>>>>>>>>> 'value.fields-include' . We can't set cdc format for > >>> 'key.format' > >>>>> and > >>>>>>>>>>>> 'value.format' in KTable connector now, which is available > >> in > >>>>> Kafka > >>>>>>>>>>>> connector. Considering the difference between the options we > >>> can > >>>>> use, > >>>>>>>>>>> it's > >>>>>>>>>>>> more suitable to introduce an another connector rather than a > >>>>>>>>> property. > >>>>>>>>>>>> > >>>>>>>>>>>> We are also fine to use "compacted-kafka" as the name of the > >>> new > >>>>>>>>>>>> connector. What do you think? > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> Shengkai > >>>>>>>>>>>> > >>>>>>>>>>>> Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一 > >> 下午10:15写道: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Shengkai, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thank you for driving this effort. I believe this a very > >>>> important > >>>>>>>>>>> feature > >>>>>>>>>>>>> for many users who use Kafka and Flink SQL together. A few > >>>>> questions > >>>>>>>>>> and > >>>>>>>>>>>>> thoughts: > >>>>>>>>>>>>> > >>>>>>>>>>>>> * Is your example "Use KTable as a reference/dimension > >> table" > >>>>>>>>> correct? > >>>>>>>>>>> It > >>>>>>>>>>>>> uses the "kafka" connector and does not specify a primary > >> key. > >>>>>>>>>>>>> > >>>>>>>>>>>>> * Will it be possible to use a "ktable" table directly as a > >>>>>>>>>> dimensional > >>>>>>>>>>>>> table in temporal join (*based on event time*) (FLIP-132)? > >>> This > >>>> is > >>>>>>>>> not > >>>>>>>>>>>>> completely clear to me from the FLIP. > >>>>>>>>>>>>> > >>>>>>>>>>>>> * I'd personally prefer not to introduce a new connector and > >>>>> instead > >>>>>>>>>> to > >>>>>>>>>>>>> extend the Kafka connector. We could add an additional > >>> property > >>>>>>>>>>>>> "compacted" > >>>>>>>>>>>>> = "true"|"false". If it is set to "true", we can add > >>> additional > >>>>>>>>>>> validation > >>>>>>>>>>>>> logic (e.g. "scan.startup.mode" can not be set, primary key > >>>>>>>>> required, > >>>>>>>>>>>>> etc.). If we stick to a separate connector I'd not call it > >>>>> "ktable", > >>>>>>>>>> but > >>>>>>>>>>>>> rather "compacted-kafka" or similar. KTable seems to carry > >>> more > >>>>>>>>>> implicit > >>>>>>>>>>>>> meaning than we want to imply here. > >>>>>>>>>>>>> > >>>>>>>>>>>>> * I agree that this is not a bounded source. If we want to > >>>>> support a > >>>>>>>>>>>>> bounded mode, this is an orthogonal concern that also > >> applies > >>> to > >>>>>>>>> other > >>>>>>>>>>>>> unbounded sources. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Best, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Konstantin > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <imj...@gmail.com> > >>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Danny, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> First of all, we didn't introduce any concepts from KSQL > >>> (e.g. > >>>>>>>>>> Stream > >>>>>>>>>>> vs > >>>>>>>>>>>>>> Table notion). > >>>>>>>>>>>>>> This new connector will produce a changelog stream, so it's > >>>> still > >>>>>>>>> a > >>>>>>>>>>>>> dynamic > >>>>>>>>>>>>>> table and doesn't conflict with Flink core concepts. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> The "ktable" is just a connector name, we can also call it > >>>>>>>>>>>>>> "compacted-kafka" or something else. > >>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can migrate > >> to > >>>>>>>>> Flink > >>>>>>>>>>> SQL > >>>>>>>>>>>>>> easily. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Regarding to why introducing a new connector vs a new > >>> property > >>>> in > >>>>>>>>>>>>> existing > >>>>>>>>>>>>>> kafka connector: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I think the main reason is that we want to have a clear > >>>>> separation > >>>>>>>>>> for > >>>>>>>>>>>>> such > >>>>>>>>>>>>>> two use cases, because they are very different. > >>>>>>>>>>>>>> We also listed reasons in the FLIP, including: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1) It's hard to explain what's the behavior when users > >>> specify > >>>>> the > >>>>>>>>>>> start > >>>>>>>>>>>>>> offset from a middle position (e.g. how to process non > >> exist > >>>>>>>>> delete > >>>>>>>>>>>>>> events). > >>>>>>>>>>>>>> It's dangerous if users do that. So we don't provide > >>> the > >>>>>>>>> offset > >>>>>>>>>>>>> option > >>>>>>>>>>>>>> in the new connector at the moment. > >>>>>>>>>>>>>> 2) It's a different perspective/abstraction on the same > >> kafka > >>>>>>>>> topic > >>>>>>>>>>>>> (append > >>>>>>>>>>>>>> vs. upsert). It would be easier to understand if we can > >>>> separate > >>>>>>>>>> them > >>>>>>>>>>>>>> instead of mixing them in one connector. The new > >>>> connector > >>>>>>>>>>> requires > >>>>>>>>>>>>>> hash sink partitioner, primary key declared, regular > >> format. > >>>>>>>>>>>>>> If we mix them in one connector, it might be > >> confusing > >>>> how > >>>>> to > >>>>>>>>>> use > >>>>>>>>>>>>> the > >>>>>>>>>>>>>> options correctly. > >>>>>>>>>>>>>> 3) The semantic of the KTable connector is just the same as > >>>>> KTable > >>>>>>>>>> in > >>>>>>>>>>>>> Kafka > >>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and KSQL users. > >>>>>>>>>>>>>> We have seen several questions in the mailing list > >>> asking > >>>>> how > >>>>>>>>> to > >>>>>>>>>>>>> model > >>>>>>>>>>>>>> a KTable and how to join a KTable in Flink SQL. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>> Jark > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 19:53, Jark Wu <imj...@gmail.com> > >>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi Jingsong, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> As the FLIP describes, "KTable connector produces a > >>> changelog > >>>>>>>>>>> stream, > >>>>>>>>>>>>>>> where each data record represents an update or delete > >>> event.". > >>>>>>>>>>>>>>> Therefore, a ktable source is an unbounded stream source. > >>>>>>>>>> Selecting > >>>>>>>>>>> a > >>>>>>>>>>>>>>> ktable source is similar to selecting a kafka source with > >>>>>>>>>>>>> debezium-json > >>>>>>>>>>>>>>> format > >>>>>>>>>>>>>>> that it never ends and the results are continuously > >> updated. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> It's possible to have a bounded ktable source in the > >> future, > >>>> for > >>>>>>>>>>>>> example, > >>>>>>>>>>>>>>> add an option 'bounded=true' or 'end-offset=xxx'. > >>>>>>>>>>>>>>> In this way, the ktable will produce a bounded changelog > >>>> stream. > >>>>>>>>>>>>>>> So I think this can be a compatible feature in the future. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I don't think we should associate with ksql related > >>> concepts. > >>>>>>>>>>>>> Actually, > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>> didn't introduce any concepts from KSQL (e.g. Stream vs > >>> Table > >>>>>>>>>>> notion). > >>>>>>>>>>>>>>> The "ktable" is just a connector name, we can also call it > >>>>>>>>>>>>>>> "compacted-kafka" or something else. > >>>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can migrate > >>> to > >>>>>>>>>> Flink > >>>>>>>>>>>>> SQL > >>>>>>>>>>>>>>> easily. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Regarding the "value.fields-include", this is an option > >>>>>>>>> introduced > >>>>>>>>>>> in > >>>>>>>>>>>>>>> FLIP-107 for Kafka connector. > >>>>>>>>>>>>>>> I think we should keep the same behavior with the Kafka > >>>>>>>>> connector. > >>>>>>>>>>> I'm > >>>>>>>>>>>>>> not > >>>>>>>>>>>>>>> sure what's the default behavior of KSQL. > >>>>>>>>>>>>>>> But I guess it also stores the keys in value from this > >>> example > >>>>>>>>>> docs > >>>>>>>>>>>>> (see > >>>>>>>>>>>>>>> the "users_original" table) [1]. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>> Jark > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> [1]: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >>> > >> > https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 18:17, Danny Chan < > >>>> yuzhao....@gmail.com> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> The concept seems conflicts with the Flink abstraction > >>>> “dynamic > >>>>>>>>>>>>> table”, > >>>>>>>>>>>>>>>> in Flink we see both “stream” and “table” as a dynamic > >>> table, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I think we should make clear first how to express stream > >>> and > >>>>>>>>>> table > >>>>>>>>>>>>>>>> specific features on one “dynamic table”, > >>>>>>>>>>>>>>>> it is more natural for KSQL because KSQL takes stream and > >>>> table > >>>>>>>>>> as > >>>>>>>>>>>>>>>> different abstractions for representing collections. In > >>> KSQL, > >>>>>>>>>> only > >>>>>>>>>>>>>> table is > >>>>>>>>>>>>>>>> mutable and can have a primary key. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Does this connector belongs to the “table” scope or > >>> “stream” > >>>>>>>>>> scope > >>>>>>>>>>> ? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Some of the concepts (such as the primary key on stream) > >>>> should > >>>>>>>>>> be > >>>>>>>>>>>>>>>> suitable for all the connectors, not just Kafka, > >> Shouldn’t > >>>> this > >>>>>>>>>> be > >>>>>>>>>>> an > >>>>>>>>>>>>>>>> extension of existing Kafka connector instead of a > >> totally > >>>> new > >>>>>>>>>>>>>> connector ? > >>>>>>>>>>>>>>>> What about the other connectors ? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Because this touches the core abstraction of Flink, we > >>> better > >>>>>>>>>> have > >>>>>>>>>>> a > >>>>>>>>>>>>>>>> top-down overall design, following the KSQL directly is > >> not > >>>> the > >>>>>>>>>>>>> answer. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> P.S. For the source > >>>>>>>>>>>>>>>>> Shouldn’t this be an extension of existing Kafka > >> connector > >>>>>>>>>>> instead > >>>>>>>>>>>>> of > >>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>> totally new connector ? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> How could we achieve that (e.g. set up the parallelism > >>>>>>>>>> correctly) ? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>> Danny Chan > >>>>>>>>>>>>>>>> 在 2020年10月19日 +0800 PM5:17,Jingsong Li < > >>>> jingsongl...@gmail.com > >>>>>>>>>>>> ,写道: > >>>>>>>>>>>>>>>>> Thanks Shengkai for your proposal. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> +1 for this feature. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Future Work: Support bounded KTable source > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I don't think it should be a future work, I think it is > >>> one > >>>>>>>>> of > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>> important concepts of this FLIP. We need to understand > >> it > >>>>>>>>> now. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Intuitively, a ktable in my opinion is a bounded table > >>>> rather > >>>>>>>>>>> than > >>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>> stream, so select should produce a bounded table by > >>> default. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I think we can list Kafka related knowledge, because the > >>>> word > >>>>>>>>>>>>> `ktable` > >>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> easy to associate with ksql related concepts. (If > >>> possible, > >>>>>>>>>> it's > >>>>>>>>>>>>>> better > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> unify with it) > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> What do you think? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> value.fields-include > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> What about the default behavior of KSQL? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>> Jingsong > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang < > >>>>>>>>>> fskm...@gmail.com > >>>>>>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi, devs. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Jark and I want to start a new FLIP to introduce the > >>> KTable > >>>>>>>>>>>>>>>> connector. The > >>>>>>>>>>>>>>>>>> KTable is a shortcut of "Kafka Table", it also has the > >>> same > >>>>>>>>>>>>>> semantics > >>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>> the KTable notion in Kafka Stream. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> FLIP-149: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Currently many users have expressed their needs for the > >>>>>>>>>> upsert > >>>>>>>>>>>>> Kafka > >>>>>>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>> mail lists and issues. The KTable connector has several > >>>>>>>>>>> benefits > >>>>>>>>>>>>> for > >>>>>>>>>>>>>>>> users: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 1. Users are able to interpret a compacted Kafka Topic > >> as > >>>>>>>>> an > >>>>>>>>>>>>> upsert > >>>>>>>>>>>>>>>> stream > >>>>>>>>>>>>>>>>>> in Apache Flink. And also be able to write a changelog > >>>>>>>>> stream > >>>>>>>>>>> to > >>>>>>>>>>>>>> Kafka > >>>>>>>>>>>>>>>>>> (into a compacted topic). > >>>>>>>>>>>>>>>>>> 2. As a part of the real time pipeline, store join or > >>>>>>>>>> aggregate > >>>>>>>>>>>>>>>> result (may > >>>>>>>>>>>>>>>>>> contain updates) into a Kafka topic for further > >>>>>>>>> calculation; > >>>>>>>>>>>>>>>>>> 3. The semantic of the KTable connector is just the > >> same > >>> as > >>>>>>>>>>>>> KTable > >>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>> Kafka > >>>>>>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and KSQL > >>> users. > >>>>>>>>>> We > >>>>>>>>>>>>> have > >>>>>>>>>>>>>>>> seen > >>>>>>>>>>>>>>>>>> several questions in the mailing list asking how to > >>> model a > >>>>>>>>>>>>> KTable > >>>>>>>>>>>>>>>> and how > >>>>>>>>>>>>>>>>>> to join a KTable in Flink SQL. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> We hope it can expand the usage of the Flink with > >> Kafka. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I'm looking forward to your feedback. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>> Shengkai > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>> Best, Jingsong Lee > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> -- > >>>>>>>>>>>>> > >>>>>>>>>>>>> Konstantin Knauf > >>>>>>>>>>>>> > >>>>>>>>>>>>> https://twitter.com/snntrable > >>>>>>>>>>>>> > >>>>>>>>>>>>> https://github.com/knaufk > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> > >>>>>>>>> Konstantin Knauf > >>>>>>>>> > >>>>>>>>> https://twitter.com/snntrable > >>>>>>>>> > >>>>>>>>> https://github.com/knaufk > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>> > >>>> -- > >>>> > >>>> Seth Wiesman | Solutions Architect > >>>> > >>>> +1 314 387 1463 > >>>> > >>>> <https://www.ververica.com/> > >>>> > >>>> Follow us @VervericaData > >>>> > >>>> -- > >>>> > >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink > >>>> Conference > >>>> > >>>> Stream Processing | Event Driven | Real Time > >>>> > >>> > >> > > > > -- Best, Jingsong Lee