Hi Thomas and yuzhihong, That’s an interesting idea. Can you help think of a use case that isn’t also served by filtering or mapping beforehand?
Thanks for helping to design this feature! -John On Fri, Jan 31, 2020, at 18:56, yuzhih...@gmail.com wrote: > I think this is good idea. > > > On Jan 31, 2020, at 4:49 PM, Thomas Becker <thomas.bec...@tivo.com> wrote: > > > > How do folks feel about allowing the mechanism by which no-ops are > > detected to be pluggable? Meaning use something like a hash by default, but > > you could optionally provide an implementation of something to use instead, > > like a ChangeDetector. This could be useful for example to ignore changes > > to certain fields, which may not be relevant to the operation being > > performed. > > ________________________________ > > From: John Roesler <vvcep...@apache.org> > > Sent: Friday, January 31, 2020 4:51 PM > > To: dev@kafka.apache.org <dev@kafka.apache.org> > > Subject: Re: [KAFKA-557] Add emit on change support for Kafka Streams > > > > [EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT > > CLICK any links or attachments unless you expected them. > > ________________________________ > > > > > > Hello all, > > > > Sorry for my silence. It seems like we are getting close to consensus. > > Hopefully, we could move to a vote soon! > > > > All of the reasoning from Matthias and Bruno around timestamp is > > compelling. I > > would be strongly in favor of stating a few things very clearly in the KIP: > > 1. Streams will drop no-op updates only for KTable operations. > > > > That is, we won't make any changes to KStream aggregations at the moment. > > It > > does seem like we can potentially revisit the time semantics of that > > operation > > in the future, but we don't need to do it now. > > > > On the other hand, the proposed semantics for KTable timestamps (marking > > the > > beginning of the validity of that record) makes sense to me. > > > > 2. Streams will only drop no-op updates for _stateful_ KTable operations. > > > > We don't want to add a hard guarantee that Streams will _never_ emit a > > no-op > > table update because it would require adding state to otherwise stateless > > operations. If someone is really concerned about a particular stateless > > operation producing a lot of no-op results, all they have to do is > > materialize it, and Streams would automatically drop the no-ops. > > > > Additionally, I'm +1 on not adding an opt-out at this time. > > > > Regarding the KIP itself, I would clean it up a bit before calling for a > > vote. > > There is a lot of "discussion"-type language there, which is very natural to > > read, but makes it a bit hard to see what _exactly_ the kip is proposing. > > > > Richard, would you mind just making the "proposed behavior change" a simple > > and > > succinct list of bullet points? I.e., please drop glue phrases like "there > > has > > been some discussion" or "possibly we could do X". For the final version of > > the > > KIP, it should just say, "Streams will do X, Streams will do Y". Feel free > > to > > add an elaboration section to explain more about what X and Y mean, but we > > don't > > need to talk about possibilities or alternatives except in the "rejected > > alternatives" section. > > > > Accordingly, can you also move the options you presented in the intro to the > > "rejected alternatives" section and only mention the final proposal itself? > > > > This just really helps reviewers to know what they are voting for, and it > > helps > > everyone after the fact when they are trying to get clarity on what exactly > > the > > proposal is, versus all the things it could have been. > > > > Thanks, > > -John > > > > > >> On Mon, Jan 27, 2020, at 18:14, Richard Yu wrote: > >> Hello to all, > >> > >> I've finished making some initial modifications to the KIP. > >> I have decided to keep the implementation section in the KIP for > >> record-keeping purposes. > >> > >> For now, we should focus on only the proposed behavior changes instead. > >> > >> See if you have any comments! > >> > >> Cheers, > >> Richard > >> > >> On Sat, Jan 25, 2020 at 11:12 AM Richard Yu <yohan.richard...@gmail.com> > >> wrote: > >> > >>> Hi all, > >>> > >>> Thanks for all the discussion! > >>> > >>> @John and @Bruno I will survey other possible systems and see what I can > >>> do. > >>> Just a question, by systems, I suppose you would mean the pros and cons of > >>> different reporting strategies? > >>> > >>> I'm not completely certain on this point, so it would be great if you can > >>> clarify on that. > >>> > >>> So here's what I got from all the discussion so far: > >>> > >>> - Since both Matthias and John seems to have come to a consensus on > >>> this, then we will go for an all-round behavorial change for KTables. > >>> After > >>> some thought, I decided that for now, an opt-out config will not be > >>> added. > >>> As John have pointed out, no-op changes tend to explode further down the > >>> topology as they are forwarded to more and more processor nodes > >>> downstream. > >>> - About using hash codes, after some explanation from John, it looks > >>> like hash codes might not be as ideal (for implementation). For now, we > >>> will omit that detail, and save it for the PR. > >>> - @Bruno You do have valid concerns. Though, I am not completely > >>> certain if we want to do emit-on-change only for materialized KTables. I > >>> will put it down in the KIP regardless. > >>> > >>> I will do my best to address all points raised so far on the discussion. > >>> Hope we could keep this going! > >>> > >>> Best, > >>> Richard > >>> > >>>> On Fri, Jan 24, 2020 at 6:07 PM Bruno Cadonna <br...@confluent.io> wrote: > >>> > >>>> Thank you Matthias for the use cases! > >>>> > >>>> Looking at both use cases, I think you need to elaborate on them in > >>>> the KIP, Richard. > >>>> > >>>> Emit from plain KTable: > >>>> I agree with Matthias that the lower timestamp makes sense because it > >>>> marks the start of the validity of the record. Idempotent records with > >>>> a higher timestamp can be safely ignored. A corner case that I > >>>> discussed with Matthias offline is when we do not materialize a KTable > >>>> due to optimization. Then we cannot avoid the idempotent records > >>>> because we do not keep the first record with the lower timestamp to > >>>> compare to. > >>>> > >>>> Emit from KTable with aggregations: > >>>> If we specify that an aggregation result should have the highest > >>>> timestamp of the records that participated in the aggregation, we > >>>> cannot ignore any idempotent records. Admittedly, the result of an > >>>> aggregation usually changes, but there are aggregations where the > >>>> result may not change like min and max, or sum when the incoming > >>>> records have a value of zero. In those cases, we could benefit of the > >>>> emit on change, but only if we define the semantics of the > >>>> aggregations to not use the highest timestamp of the participating > >>>> records for the result. In Kafka Streams, we do not have min, max, and > >>>> sum as explicit aggregations, but we need to provide an API to define > >>>> what timestamp should be used for the result of an aggregation if we > >>>> want to go down this path. > >>>> > >>>> All of this does not block this KIP and I just wanted to put this > >>>> aspects up for discussion. The KIP can limit itself to emit from > >>>> materialized KTables. However, the limits should be explicitly stated > >>>> in the KIP. > >>>> > >>>> Best, > >>>> Bruno > >>>> > >>>> > >>>> > >>>> On Fri, Jan 24, 2020 at 10:58 AM Matthias J. Sax <matth...@confluent.io> > >>>> wrote: > >>>>> > >>>>> IMHO, the question about semantics depends on the use case, in > >>>>> particular on the origin of a KTable. > >>>>> > >>>>> If there is a changlog topic that one reads directly into a KTable, > >>>>> emit-on-change does actually make sense, because the timestamp indicates > >>>>> _when_ the update was _effective_. For this case, it is semantically > >>>>> sound to _not_ update the timestamp in the store, because the second > >>>>> update is actually idempotent and advancing the timestamp is not ideal > >>>>> (one could even consider it to be wrong to advance the timestamp) > >>>>> because the "valid time" of the record pair did not change. > >>>>> > >>>>> This reasoning also applies to KTable-KTable joins. > >>>>> > >>>>> However, if the KTable is the result of an aggregation, I think > >>>>> emit-on-update is more natural, because the timestamp reflects the > >>>>> _last_ time (ie, highest timestamp) of all input records the contributed > >>>>> to the result. Hence, updating the timestamp and emitting a new record > >>>>> actually sounds correct to me. This applies to windowed and non-windowed > >>>>> aggregations IMHO. > >>>>> > >>>>> However, considering the argument that the timestamp should not be > >>>>> update in the first case in the store to begin with, both cases are > >>>>> actually the same, and both can be modeled as emit-on-change: if a > >>>>> `table()` operator does not update the timestamp if the value does not > >>>>> change, there is _no_ change and thus nothing is emitted. At the same > >>>>> time, if an aggregation operator does update the timestamp (even if the > >>>>> value does not change) there _is_ a change and we emit. > >>>>> > >>>>> Note that handling out-of-order data for aggregations would also work > >>>>> seamlessly with this approach -- for out-of-order records, the timestamp > >>>>> does never change, and thus, we only emit if the result itself changes. > >>>>> > >>>>> Therefore, I would argue that we might not even need any config, because > >>>>> the emit-on-change behavior is just correct and reduced the downstream > >>>>> load, while our current behavior is not ideal (even if it's also > >>>> correct). > >>>>> > >>>>> Thoughts? > >>>>> > >>>>> -Matthias > >>>>> > >>>>> On 1/24/20 9:37 AM, John Roesler wrote: > >>>>>> Hi Bruno, > >>>>>> > >>>>>> Thanks for that idea. I hadn't considered that > >>>>>> option before, and it does seem like that would be > >>>>>> the right place to put it if we think it might be > >>>>>> semantically important to control on a > >>>>>> table-by-table basis. > >>>>>> > >>>>>> I had been thinking of it less semantically and > >>>>>> more practically. In the context of a large > >>>>>> topology, or more generally, a large software > >>>>>> system that contains many topologies and other > >>>>>> event-driven systems, each no-op result becomes an > >>>>>> input that is destined to itself become a no-op > >>>>>> result, and so on, all the way through the system. > >>>>>> Thus, a single pointless processing result becomes > >>>>>> amplified into a large number of pointless > >>>>>> computations, cache perturbations, and network > >>>>>> and disk I/O operations. If you also consider > >>>>>> operations with fan-out implications, like > >>>>>> branching or foreign-key joins, the wasted > >>>>>> resources are amplified not just in proportion to > >>>>>> the size of the system, but the size of the system > >>>>>> times the average fan-out (to the power of the > >>>>>> number of fan-out operations on the path(s) > >>>>>> through the system). > >>>>>> > >>>>>> In my time operating such systems, I've observed > >>>>>> these effects to be very real, and actually, the > >>>>>> system and use case doesn't have to be very large > >>>>>> before the amplification poses an existential > >>>>>> threat to the system as a whole. > >>>>>> > >>>>>> This is the basis of my advocating for a simple > >>>>>> behavior change, rather than an opt-in config of > >>>>>> any kind. It seems like Streams should "do the > >>>>>> right thing" for the majority use case. My theory > >>>>>> (which may be wrong) is that the majority use case > >>>>>> is more like "relational queries" than "CEP > >>>>>> queries". Even if you were doing some > >>>>>> event-sensitive computation, wouldn't you do them > >>>>>> as Stream operations (where this feature is > >>>>>> inapplicable anyway)? > >>>>>> > >>>>>> In keeping with the "practical" perspective, I > >>>>>> suggested the opt-out config only in the (I think > >>>>>> unlikely) event that filtering out pointless > >>>>>> updates actually harms performance. I'd also be > >>>>>> perfectly fine without the opt-out config. I > >>>>>> really think that (because of the timestamp > >>>>>> semantics work already underway), we're already > >>>>>> pre-fetching the prior result most of the time, so > >>>>>> there would actually be very little extra I/O > >>>>>> involved in implementing emit-on-change. > >>>>>> > >>>>>> However, we should consider whether my experience > >>>>>> is likely to be general. Do you have some use > >>>>>> case in mind for which you'd actually want some > >>>>>> KTable results to be emit-on-update for semantic > >>>>>> reasons? > >>>>>> > >>>>>> Thanks, > >>>>>> -John > >>>>>> > >>>>>> > >>>>>> On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna wrote: > >>>>>>> Hi Richard, > >>>>>>> > >>>>>>> Thank you for the KIP. > >>>>>>> > >>>>>>> I agree with John that we should focus on the interface and behavior > >>>>>>> change in a KIP. We can discuss the implementation later. > >>>>>>> > >>>>>>> I am also +1 for the survey. > >>>>>>> > >>>>>>> I had a thought about this. Couldn't we consider emit-on-change to be > >>>>>>> one config of suppress (like `untilWindowCloses`)? What you basically > >>>>>>> propose is to suppress updates if they do not change the result. > >>>>>>> Considering emit on change as a flavour of suppress would be more > >>>>>>> flexible because it would specify the behavior locally for a KTable > >>>>>>> instead of globally for all KTables. Additionally, specifying the > >>>>>>> behavior in one place instead of multiple places feels more intuitive > >>>>>>> and consistent to me. > >>>>>>> > >>>>>>> Best, > >>>>>>> Bruno > >>>>>>> > >>>>>>> On Fri, Jan 24, 2020 at 7:49 AM John Roesler <vvcep...@apache.org> > >>>> wrote: > >>>>>>>> > >>>>>>>> Hi Richard, > >>>>>>>> > >>>>>>>> Thanks for picking this up! I know of at least one large community > >>>> member > >>>>>>>> for which this feature is absolutely essential. > >>>>>>>> > >>>>>>>> If I understand your two options, it seems like the proposal is to > >>>> implement > >>>>>>>> it as a behavior change regardless, and the question is whether to > >>>> provide > >>>>>>>> an opt-out config or not. > >>>>>>>> > >>>>>>>> Given that any implementation of this feature would have some > >>>> performance > >>>>>>>> impact under some workloads, and also that we don't know if anyone > >>>> really > >>>>>>>> depends on emit-on-update time semantics, it seems like we should > >>>> propose > >>>>>>>> to add an opt-out config. Can you update the KIP to mention the > >>>> exact > >>>>>>>> config key and value(s) you'd propose? > >>>>>>>> > >>>>>>>> Just to move the discussion forward, maybe something like: > >>>>>>>> emit.on := change|update > >>>>>>>> with the new default being "change" > >>>>>>>> > >>>>>>>> Thanks for pointing out the timestamp issue in particular. I agree > >>>> that if > >>>>>>>> we discard the latter update as a no-op, then we also have to > >>>> discard its > >>>>>>>> timestamp (obviously, we don't forward the timestamp update, as > >>>> that's > >>>>>>>> the whole point, but we also can't update the timestamp in the > >>>> store, as > >>>>>>>> the store must remain consistent with what has been emitted). > >>>>>>>> > >>>>>>>> I have to confess that I disagree with your implementation > >>>> proposal, but > >>>>>>>> it's also not necessary to discuss implementation in the KIP. Maybe > >>>> it would > >>>>>>>> be less controversial if you just drop that section for now, so > >>>> that the KIP > >>>>>>>> discussion can focus on the behavior change and config. > >>>>>>>> > >>>>>>>> Just for reference, there is some research into this domain. For > >>>> example, > >>>>>>>> see the "Report" section (3.2.3) of the SECRET paper: > >>>>>>>> > >>>> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeople.csail.mit.edu%2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&data=02%7C01%7CThomas.Becker%40tivo.com%7C63670904ae324e62575508d7a697c3ad%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637161043078978612&sdata=yCdlYShUf2y3mqZQHA8ZGR83%2B99CZp%2B5r0HksqS%2B%2FPc%3D&reserved=0 > >>>>>>>> > >>>>>>>> It might help to round out the proposal if you take a brief survey > >>>> of the > >>>>>>>> behaviors of other systems, along with pros and cons if any are > >>>> reported. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> -John > >>>>>>>> > >>>>>>>> > >>>>>>>> On Fri, Jan 10, 2020, at 22:27, Richard Yu wrote: > >>>>>>>>> Hi everybody! > >>>>>>>>> > >>>>>>>>> I'd like to propose a change that we probably should've added for > >>>> a long > >>>>>>>>> time now. > >>>>>>>>> > >>>>>>>>> The key benefit of this KIP would be reduced traffic in Kafka > >>>> Streams since > >>>>>>>>> a lot of no-op results would no longer be sent downstream. > >>>>>>>>> Here is the KIP for reference. > >>>>>>>>> > >>>>>>>>> > >>>> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit%2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&data=02%7C01%7CThomas.Becker%40tivo.com%7C63670904ae324e62575508d7a697c3ad%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637161043078988604&sdata=eVtuuyDX6aNsYcw8wOmM1HSinOq5ptPPUaTxXqgyA7g%3D&reserved=0 > >>>>>>>>> > >>>>>>>>> Currently, I seek to formalize our approach for this KIP first > >>>> before we > >>>>>>>>> determine concrete API additions / configurations. > >>>>>>>>> Some configs might warrant adding, whiles others are not necessary > >>>> since > >>>>>>>>> adding them would only increase complexity of Kafka Streams. > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Richard > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >>> > >> > > > > ________________________________ > > > > This email and any attachments may contain confidential and privileged > > material for the sole use of the intended recipient. Any review, copying, > > or distribution of this email (or any attachments) by others is prohibited. > > If you are not the intended recipient, please contact the sender > > immediately and permanently delete this email and any attachments. No > > employee or agent of TiVo is authorized to conclude any binding agreement > > on behalf of TiVo by email. Binding agreements with TiVo may only be made > > by a signed written agreement. >