Hi all, Thanks for the thoughtful comments!
I need more time to reflect on your thoughts, but just wanted to offer a quick clarification about equals(). I only meant that we can't be sure if a class's equals() implementation returns true for two semantically identical instances. I.e., if a class doesn't override the default equals() implementation, then we would see behavior like: new MyPair("A", 1).equals(new MyPair("A", 1)) returns false In that case, I would still like to catch no-op updates by comparing the serialized form of the records when we happen to have it serialized anyway (such as when the operation is stateful, or when we're sending to a repartition topic and we have both the "new" and "old" value from upstream). I didn't mean to suggest we'd try to use reflection to detect whether equals is implemented, although that is a neat trick. I was thinking more of a belt-and-suspenders algorithm where we do the check for no-ops based on equals() and then _also_ check the serialized bytes for equality. Thanks, -John On Wed, Feb 5, 2020, at 15:31, Ted Yu wrote: > Thanks for the comments, Matthias. > > w.r.t. requirement of an `equals()` implementation, each template type > would have an equals() method. We can use the following code to know > whether it is provided by JVM or provided by user. > > boolean customEquals = false; > try { > Class cls = value.getClass().getMethod("equals", > Object.class).getDeclaringClass(); > if (!Object.class.equals(cls)) { > customEquals = true; > } > } catch (NoSuchMethodException nsme) { > // equals is always defined, this wouldn't hit > } > > The next question is: what if the user doesn't provide equals() method ? > Would we automatically fall back to emit-on-update ? > > Cheers > > On Tue, Feb 4, 2020 at 1:37 PM Matthias J. Sax <mj...@apache.org> wrote: > > > -----BEGIN PGP SIGNED MESSAGE----- > > Hash: SHA512 > > > > First a high level comment: > > > > Overall, I would like to make one step back, and make sure we are > > discussion on the same level. Originally, I understood this KIP as a > > proposed change of _semantics_, however, given the latest discussion > > it seems it's actually not -- it's more an _optimization_ proposal. > > Hence, we only need to make sure that this optimization does not break > > existing semantics. It this the right way to think about it? > > > > If yes, than it might actually be ok to have different behavior > > depending if there is a materialized KTable or not. So far, we never > > defined a public contract about our emit strategy and it seems this > > KIP does not define one either. > > > > Hence, I don't have as strong of an opinion about sending oldValues > > for example any longer. I guess the question is really, what can we > > implement in a reasonable way. > > > > > > > > Other comments: > > > > > > @Richard: > > > > Can you please add the KIP to the KIP overview table: It's missing > > (https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pro > > posals). > > > > > > @Bruno: > > > > You mentioned caching. I think it's irrelevant (orthogonal) and we can > > discuss this KIP without considering it. > > > > > > @John: > > > > > Even in the source table, we forward the updated record with the > > > higher of the two timestamps. So the example is more like: > > > > That is not correct. Currently, we forward with the smaller > > out-of-order timestamp (changing the timestamp would corrupt the data > > - -- we don't know, because we don't check, if the value is the same or > > a different one, hence, we must emit the out-of-order record as-is). > > > > If we start to do emit-on-change, we also need to emit a new record if > > the timestamp changes due to out-of-order data, hence, we would still > > need to emit <K,V,T1> because that give us correct semantics: assume > > you have a filter() and afterward use the filter KTable in a > > stream-table join -- the lower T1 timestamp must be propagated to the > > filtered KTable to ensure that that the stream-table join compute the > > correct result. > > > > > > > > Your point about requiring an `equals()` implementation is actually a > > quite interesting one and boils down to my statement from above about > > "what can we actually implement". What I don't understand is: > > > > > This way, we still don't have to rely on the existence of an > > > equals() method, but if it is there, we can benefit from it. > > > > Your bullet point (2) says it uses `equals()` -- hence, it seems we > > actually to rely on it? Also, how can we detect if there is an > > `equals()` method to do the comparison? Would be fail if we don't have > > `equals()` nor corresponding serializes to do the comparison? > > > > > > > > > Wow, really good catch! Yes, we absolutely need metrics and logs if > > > we're going to drop any records. And, yes, we should propose > > > metrics and logs that are similar to the existing ones when we drop > > > records for other reasons. > > > > I am not sure about this point. In fact, we have already some no-ops > > in Kafka Streams in our join-operators and don't report any of those > > either. Emit-on-change is operator semantics and I don't see why we > > would need to have a metric for it? It seems to be quite different > > compared to dropping late or malformed records. > > > > > > - -Matthias > > > > > > > > On 2/4/20 7:13 AM, Thomas Becker wrote: > > > Thanks John for your thoughtful reply. Some comments inline. > > > > > > > > > On Mon, 2020-02-03 at 11:51 -0600, John Roesler wrote: > > >> [EXTERNAL EMAIL] Attention: This email was sent from outside > > >> TiVo. DO NOT CLICK any links or attachments unless you expected > > >> them. ________________________________ > > >> > > >> > > >> Hi Tommy, > > >> > > >> Thanks for the context. I can see the attraction of considering > > >> these use cases together. > > >> > > >> To answer your question, if a part of the record is not relevant > > >> to downstream consumers, I was thinking you could just use a > > >> mapValue to remove it. > > >> > > >> E.g., suppose you wanted to do a join between two tables. > > >> > > >> employeeInfo.join( employeePayroll, (info, payroll) -> new > > >> Result(info.name(), payroll.salary()) ) > > >> > > >> We only care about one attribute from the Info table (name), and > > >> one from the Payroll table (salary), and these attributes change > > >> rarely. On the other hand, there might be many other attributes > > >> that change frequently of these tables. We can avoid triggering > > >> the join unnecessarily by mapping the input tables to drop the > > >> unnecessary information before the join: > > >> > > >> names = employeeInfo.mapValues(info -> info.name()) salaries = > > >> employeePayroll.mapValues(payroll -> payroll.salary()) > > >> > > >> names.join( salaries, (name, salary) -> new Result(name, salary) > > >> ) > > > > > > Ahh yes I see. This works, but in the case where you're using > > > schemas as we are (e.g. Avro), it seems like this approach could > > > lead to a proliferation of "skinny" record types that just drop > > > various fields. > > > > > >> > > >> Especially if we take Matthias's idea to drop non-changes even > > >> for stateless operations, this would be quite efficient and is > > >> also a very straightforward optimization to understand once you > > >> know that Streams provides emit-on-change. > > >> > > >> From the context that you provided, it seems like a slightly > > >> different situation, though. Reading between the lines a little, > > >> it sounds like: in contrast to the example above, in which we are > > >> filtering out extra _data_, you have some extra _metadata_ that > > >> you still wish to pass down with the data when there is a "real" > > >> update, but you don't want the metadata itself to cause an > > >> update. > > > > > > Despite my lack of clarity, yes you've got it right ;) This > > > particular processor is the first stop for this data after coming > > > in from external users, who often simply post the same content each > > > time and we're trying to shield downstream consumers from > > > unnecessary churn. > > > > > >> > > >> It does seem handy to be able to plug in a custom ChangeDetector > > >> for this purpose, but I worry about the API complexity. Maybe you > > >> can help think though how to provide the same benefit while > > >> limiting user-facing complexity. > > >> > > >> Here's some extra context to consider: > > >> > > >> We currently don't make any extra requirements about the nature > > >> of data that you can use in Streams. For example, you don't have > > >> to implement hashCode and equals, or compareTo, etc. With the > > >> current proposal, we can do an airtight comparison based only on > > >> the serialized form of the values, and we actually don't have to > > >> deserialize the "prior" value at all for a large number of > > >> operations. Admitedly, if we extend the proposal to include no-op > > >> detection for stateless operations, we'd probably need to rely on > > >> equals() for no-op checking, otherwise we'd wind up requiring > > >> serdes for stateless operations as well. Actually, I'd probably > > >> argue for doing exactly that: > > >> > > >> 1. In stateful operations, drop if the serialized byte[]s are the > > >> same. After deserializing, also drop if the objects are equal > > >> according to Object#equals(). > > >> > > >> 2. In stateless operations, compare the "new" and "old" values > > >> (if "old" is available) based on Object#equals(). > > >> > > >> 3. As a final optimization, after serializing and before sending > > >> repartition records, compare the serialized data and drop > > >> no-ops. > > >> > > >> This way, we still don't have to rely on the existence of an > > >> equals() method, but if it is there, we can benefit from it. > > >> Also, we don't require a serde in any new situations, but we can > > >> still leverage it when it is available. > > >> > > >> For clarity, in my example above, even if the employeeInfo and > > >> employeePayroll and Result records all have serdes, we need the > > >> "name" field (presumably String) and the "salary" field > > >> (presumable a Double) to have serdes as well in the naive > > >> implementation. But if we can leverage equals(), then the "right > > >> thing" happens automatically. > > > > > > I still don't totally follow why the individual components (name, > > > salary) would have to have serdes here. If Result has one, we > > > compare bytes, and if Result additionally has an equals() method > > > (which presumably includes equals comparisons on the constituent > > > fields), have we not covered our bases? > > > > > >> > > >> This dovetails in with my primary UX concern; where would the > > >> ChangeDetector actually be registered? None of the operators in > > >> my example have names or topics or any other identifiable > > >> characteristic that could be passed to a ChangeDetector class > > >> registered via config. You could say that we make ChangeDetector > > >> an optional parameter to every operation in Streams, but this > > >> seems to carry quite a bit of mental burden with it. People will > > >> wonder what it's for and whether or not they should be using it. > > >> There would almost certainly be a misconception that it's > > >> preferable to implement it always, which would be unfortunate. > > >> Plus, to actually implment metadata flowing through the topology > > >> as in your use case, you'd have to do two things: 1. make sure > > >> that all operations actually preserve the metadata alongside the > > >> data (e.g., don't accidentally add a mapValues like I did, or you > > >> drop the metadata). 2. implement a ChangeDetector for every > > >> single operation in the topology, or you don't get the benefit of > > >> dropping non-changes internally 2b. Alternatively, you could just > > >> add the ChangeDetector to one operation toward the end of the > > >> topology. This would not drop redundant computation internally, > > >> but only drop redundant _outputs_. But this is just about the > > >> same as your current solution. > > > > > > I definitely see your point regarding configuration. I was > > > originally thinking about this when the deduplication was going to > > > be opt-in, and it seemed very natural to say something like: > > > > > > employeeInfo.join(employeePayroll, (info, payroll) -> new > > > Result(info.name(), payroll.salary())) > > > .suppress(duplicatesAccordingTo(someChangeDetector)) > > > > > > Alternatively you can imagine a similar method being on > > > Materialized, though obviously this makes less sense if we don't > > > want to require materialization. If we're now talking about > > > changing the default behavior and not having any configuration > > > options, it's harder to find a place for this. > > > > > > > > > > > >> A final thought; if it really is a metadata question, can we just > > >> plan to finish up the support for headers in Streams? I.e., give > > >> you a way to control the way that headers flow through the > > >> topology? Then, we could treat headers the same way we treat > > >> timestamps in the no-op checking... We completely ignore them > > >> for the sake of comparison. Thus, neither the timestamp nor the > > >> headers would get updated in internal state or in downstream > > >> views as long as the value itself doesn't change. This seems to > > >> give us a way to support your use case without adding to the > > >> mental overhead of using Streams for simple things. > > > > > > Agree headers could be a decent fit for this particular case > > > because it's mostly metadata, though to be honest we haven't looked > > > at headers much (mostly because, and to your point, support seems > > > to be lacking). I feel like there would be other cases where this > > > feature could be valuable, but I admit I can't come up with > > > anything right this second. Perhaps yuzhihong had an example in > > > mind? > > > > > >> > > >> I.e., simple things should be easy, and complex things should be > > >> possible. > > >> > > >> What are your thoughts? Thanks, -John > > >> > > >> > > >> On Mon, Feb 3, 2020, at 07:19, Thomas Becker wrote: > > >> > > >> Hi John, Can you describe how you'd use filtering/mapping to > > >> deduplicate records? To give some background on my suggestion we > > >> currently have a small stream processor that exists solely to > > >> deduplicate, which we do using a process that I assume would be > > >> similar to what would be done here (with a store of keys and hash > > >> values). But the records we are deduplicating have some metadata > > >> fields (such as timestamps of when the record was posted) that we > > >> don't consider semantically meaningful for downstream consumers, > > >> and therefore we also suppress updates that only touch those > > >> fields. > > >> > > >> -Tommy > > >> > > >> > > >> On Fri, 2020-01-31 at 19:30 -0600, John Roesler wrote: [EXTERNAL > > >> EMAIL] Attention: This email was sent from outside TiVo. DO NOT > > >> CLICK any links or attachments unless you expected them. > > >> ________________________________ > > >> > > >> Hi Thomas and yuzhihong, That’s an interesting idea. Can you help > > >> think of a use case that isn’t also served by filtering or > > >> mapping beforehand? Thanks for helping to design this feature! > > >> -John On Fri, Jan 31, 2020, at 18:56, yuzhih...@gmail.com > > >> <mailto:yuzhih...@gmail.com> wrote: I think this is good idea. On > > >> Jan 31, 2020, at 4:49 PM, Thomas Becker <thomas.bec...@tivo.com > > >> <mailto:thomas.bec...@tivo.com>> wrote: How do folks feel about > > >> allowing the mechanism by which no-ops are detected to be > > >> pluggable? Meaning use something like a hash by default, but you > > >> could optionally provide an implementation of something to use > > >> instead, like a ChangeDetector. This could be useful for example > > >> to ignore changes to certain fields, which may not be relevant to > > >> the operation being performed. ________________________________ > > >> From: John Roesler <vvcep...@apache.org > > >> <mailto:vvcep...@apache.org>> Sent: Friday, January 31, 2020 4:51 > > >> PM To: dev@kafka.apache.org <mailto:dev@kafka.apache.org> > > >> <dev@kafka.apache.org <mailto:dev@kafka.apache.org>> Subject: Re: > > >> [KAFKA-557] Add emit on change support for Kafka Streams > > >> [EXTERNAL EMAIL] Attention: This email was sent from outside > > >> TiVo. DO NOT CLICK any links or attachments unless you expected > > >> them. ________________________________ > > >> > > >> Hello all, Sorry for my silence. It seems like we are getting > > >> close to consensus. Hopefully, we could move to a vote soon! All > > >> of the reasoning from Matthias and Bruno around timestamp is > > >> compelling. I would be strongly in favor of stating a few things > > >> very clearly in the KIP: 1. Streams will drop no-op updates only > > >> for KTable operations. That is, we won't make any changes to > > >> KStream aggregations at the moment. It does seem like we can > > >> potentially revisit the time semantics of that operation in the > > >> future, but we don't need to do it now. On the other hand, the > > >> proposed semantics for KTable timestamps (marking the beginning > > >> of the validity of that record) makes sense to me. 2. Streams > > >> will only drop no-op updates for _stateful_ KTable operations. We > > >> don't want to add a hard guarantee that Streams will _never_ > > >> emit a no-op table update because it would require adding state > > >> to otherwise stateless operations. If someone is really concerned > > >> about a particular stateless operation producing a lot of no-op > > >> results, all they have to do is materialize it, and Streams would > > >> automatically drop the no-ops. Additionally, I'm +1 on not adding > > >> an opt-out at this time. Regarding the KIP itself, I would clean > > >> it up a bit before calling for a vote. There is a lot of > > >> "discussion"-type language there, which is very natural to read, > > >> but makes it a bit hard to see what _exactly_ the kip is > > >> proposing. Richard, would you mind just making the "proposed > > >> behavior change" a simple and succinct list of bullet points? > > >> I.e., please drop glue phrases like "there has been some > > >> discussion" or "possibly we could do X". For the final version of > > >> the KIP, it should just say, "Streams will do X, Streams will do > > >> Y". Feel free to add an elaboration section to explain more about > > >> what X and Y mean, but we don't need to talk about possibilities > > >> or alternatives except in the "rejected alternatives" section. > > >> Accordingly, can you also move the options you presented in the > > >> intro to the "rejected alternatives" section and only mention the > > >> final proposal itself? This just really helps reviewers to know > > >> what they are voting for, and it helps everyone after the fact > > >> when they are trying to get clarity on what exactly the proposal > > >> is, versus all the things it could have been. Thanks, -John > > >> > > >> On Mon, Jan 27, 2020, at 18:14, Richard Yu wrote: Hello to all, > > >> I've finished making some initial modifications to the KIP. I > > >> have decided to keep the implementation section in the KIP for > > >> record-keeping purposes. For now, we should focus on only the > > >> proposed behavior changes instead. See if you have any comments! > > >> Cheers, Richard On Sat, Jan 25, 2020 at 11:12 AM Richard Yu > > >> <yohan.richard...@gmail.com <mailto:yohan.richard...@gmail.com>> > > >> wrote: Hi all, Thanks for all the discussion! @John and @Bruno I > > >> will survey other possible systems and see what I can do. Just a > > >> question, by systems, I suppose you would mean the pros and cons > > >> of different reporting strategies? I'm not completely certain on > > >> this point, so it would be great if you can clarify on that. So > > >> here's what I got from all the discussion so far: - Since both > > >> Matthias and John seems to have come to a consensus on this, then > > >> we will go for an all-round behavorial change for KTables. After > > >> some thought, I decided that for now, an opt-out config will not > > >> be added. As John have pointed out, no-op changes tend to > > >> explode further down the topology as they are forwarded to more > > >> and more processor nodes downstream. - About using hash codes, > > >> after some explanation from John, it looks like hash codes might > > >> not be as ideal (for implementation). For now, we will omit that > > >> detail, and save it for the PR. - @Bruno You do have valid > > >> concerns. Though, I am not completely certain if we want to do > > >> emit-on-change only for materialized KTables. I will put it down > > >> in the KIP regardless. I will do my best to address all points > > >> raised so far on the discussion. Hope we could keep this going! > > >> Best, Richard On Fri, Jan 24, 2020 at 6:07 PM Bruno Cadonna > > >> <br...@confluent.io <mailto:br...@confluent.io>> wrote: Thank you > > >> Matthias for the use cases! Looking at both use cases, I think > > >> you need to elaborate on them in the KIP, Richard. Emit from > > >> plain KTable: I agree with Matthias that the lower timestamp > > >> makes sense because it marks the start of the validity of the > > >> record. Idempotent records with a higher timestamp can be safely > > >> ignored. A corner case that I discussed with Matthias offline is > > >> when we do not materialize a KTable due to optimization. Then we > > >> cannot avoid the idempotent records because we do not keep the > > >> first record with the lower timestamp to compare to. Emit from > > >> KTable with aggregations: If we specify that an aggregation > > >> result should have the highest timestamp of the records that > > >> participated in the aggregation, we cannot ignore any idempotent > > >> records. Admittedly, the result of an aggregation usually > > >> changes, but there are aggregations where the result may not > > >> change like min and max, or sum when the incoming records have a > > >> value of zero. In those cases, we could benefit of the emit on > > >> change, but only if we define the semantics of the aggregations > > >> to not use the highest timestamp of the participating records for > > >> the result. In Kafka Streams, we do not have min, max, and sum as > > >> explicit aggregations, but we need to provide an API to define > > >> what timestamp should be used for the result of an aggregation if > > >> we want to go down this path. All of this does not block this KIP > > >> and I just wanted to put this aspects up for discussion. The KIP > > >> can limit itself to emit from materialized KTables. However, the > > >> limits should be explicitly stated in the KIP. Best, Bruno > > >> > > >> > > >> On Fri, Jan 24, 2020 at 10:58 AM Matthias J. Sax > > >> <matth...@confluent.io <mailto:matth...@confluent.io>> wrote: > > >> IMHO, the question about semantics depends on the use case, in > > >> particular on the origin of a KTable. If there is a changlog > > >> topic that one reads directly into a KTable, emit-on-change does > > >> actually make sense, because the timestamp indicates _when_ the > > >> update was _effective_. For this case, it is semantically sound > > >> to _not_ update the timestamp in the store, because the second > > >> update is actually idempotent and advancing the timestamp is not > > >> ideal (one could even consider it to be wrong to advance the > > >> timestamp) because the "valid time" of the record pair did not > > >> change. This reasoning also applies to KTable-KTable joins. > > >> However, if the KTable is the result of an aggregation, I think > > >> emit-on-update is more natural, because the timestamp reflects > > >> the _last_ time (ie, highest timestamp) of all input records the > > >> contributed to the result. Hence, updating the timestamp and > > >> emitting a new record actually sounds correct to me. This applies > > >> to windowed and non-windowed aggregations IMHO. However, > > >> considering the argument that the timestamp should not be update > > >> in the first case in the store to begin with, both cases are > > >> actually the same, and both can be modeled as emit-on-change: if > > >> a `table()` operator does not update the timestamp if the value > > >> does not change, there is _no_ change and thus nothing is > > >> emitted. At the same time, if an aggregation operator does update > > >> the timestamp (even if the value does not change) there _is_ a > > >> change and we emit. Note that handling out-of-order data for > > >> aggregations would also work seamlessly with this approach -- for > > >> out-of-order records, the timestamp does never change, and thus, > > >> we only emit if the result itself changes. Therefore, I would > > >> argue that we might not even need any config, because the > > >> emit-on-change behavior is just correct and reduced the > > >> downstream load, while our current behavior is not ideal (even if > > >> it's also correct). Thoughts? -Matthias On 1/24/20 9:37 AM, John > > >> Roesler wrote: Hi Bruno, Thanks for that idea. I hadn't > > >> considered that option before, and it does seem like that would > > >> be the right place to put it if we think it might be semantically > > >> important to control on a table-by-table basis. I had been > > >> thinking of it less semantically and more practically. In the > > >> context of a large topology, or more generally, a large software > > >> system that contains many topologies and other event-driven > > >> systems, each no-op result becomes an input that is destined to > > >> itself become a no-op result, and so on, all the way through the > > >> system. Thus, a single pointless processing result becomes > > >> amplified into a large number of pointless computations, cache > > >> perturbations, and network and disk I/O operations. If you also > > >> consider operations with fan-out implications, like branching or > > >> foreign-key joins, the wasted resources are amplified not just in > > >> proportion to the size of the system, but the size of the system > > >> times the average fan-out (to the power of the number of fan-out > > >> operations on the path(s) through the system). In my time > > >> operating such systems, I've observed these effects to be very > > >> real, and actually, the system and use case doesn't have to be > > >> very large before the amplification poses an existential threat > > >> to the system as a whole. This is the basis of my advocating for > > >> a simple behavior change, rather than an opt-in config of any > > >> kind. It seems like Streams should "do the right thing" for the > > >> majority use case. My theory (which may be wrong) is that the > > >> majority use case is more like "relational queries" than "CEP > > >> queries". Even if you were doing some event-sensitive > > >> computation, wouldn't you do them as Stream operations (where > > >> this feature is inapplicable anyway)? In keeping with the > > >> "practical" perspective, I suggested the opt-out config only in > > >> the (I think unlikely) event that filtering out pointless updates > > >> actually harms performance. I'd also be perfectly fine without > > >> the opt-out config. I really think that (because of the timestamp > > >> semantics work already underway), we're already pre-fetching the > > >> prior result most of the time, so there would actually be very > > >> little extra I/O involved in implementing emit-on-change. > > >> However, we should consider whether my experience is likely to > > >> be general. Do you have some use case in mind for which you'd > > >> actually want some KTable results to be emit-on-update for > > >> semantic reasons? Thanks, -John > > >> > > >> On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna wrote: Hi Richard, > > >> Thank you for the KIP. I agree with John that we should focus on > > >> the interface and behavior change in a KIP. We can discuss the > > >> implementation later. I am also +1 for the survey. I had a > > >> thought about this. Couldn't we consider emit-on-change to be one > > >> config of suppress (like `untilWindowCloses`)? What you > > >> basically propose is to suppress updates if they do not change > > >> the result. Considering emit on change as a flavour of suppress > > >> would be more flexible because it would specify the behavior > > >> locally for a KTable instead of globally for all KTables. > > >> Additionally, specifying the behavior in one place instead of > > >> multiple places feels more intuitive and consistent to me. Best, > > >> Bruno On Fri, Jan 24, 2020 at 7:49 AM John Roesler > > >> <vvcep...@apache.org <mailto:vvcep...@apache.org>> wrote: Hi > > >> Richard, Thanks for picking this up! I know of at least one large > > >> community member for which this feature is absolutely essential. > > >> If I understand your two options, it seems like the proposal is > > >> to implement it as a behavior change regardless, and the question > > >> is whether to provide an opt-out config or not. Given that any > > >> implementation of this feature would have some performance impact > > >> under some workloads, and also that we don't know if anyone > > >> really depends on emit-on-update time semantics, it seems like we > > >> should propose to add an opt-out config. Can you update the KIP > > >> to mention the exact config key and value(s) you'd propose? Just > > >> to move the discussion forward, maybe something like: emit.on := > > >> change|update with the new default being "change" Thanks for > > >> pointing out the timestamp issue in particular. I agree that if > > >> we discard the latter update as a no-op, then we also have to > > >> discard its timestamp (obviously, we don't forward the timestamp > > >> update, as that's the whole point, but we also can't update the > > >> timestamp in the store, as the store must remain consistent with > > >> what has been emitted). I have to confess that I disagree with > > >> your implementation proposal, but it's also not necessary to > > >> discuss implementation in the KIP. Maybe it would be less > > >> controversial if you just drop that section for now, so that the > > >> KIP discussion can focus on the behavior change and config. Just > > >> for reference, there is some research into this domain. For > > >> example, see the "Report" section (3.2.3) of the SECRET paper: > > >> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeop > > le.csail.mit.edu%2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&data > > =02%7C01%7CThomas.Becker%40tivo.com%7Ce0235483b1eb4f259c5c08d7a8d1c16b%7 > > Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637163491160859282&sdata > > =4dSGIS8jNPAPP7B48r9e%2BUgFh3WdmzVyXhyT63eP8dI%3D&reserved=0 > > >> > > >> > > It might help to round out the proposal if you take a brief survey of > > >> the behaviors of other systems, along with pros and cons if any > > >> are reported. Thanks, -John > > >> > > >> On Fri, Jan 10, 2020, at 22:27, Richard Yu wrote: Hi everybody! > > >> I'd like to propose a change that we probably should've added for > > >> a long time now. The key benefit of this KIP would be reduced > > >> traffic in Kafka Streams since a lot of no-op results would no > > >> longer be sent downstream. Here is the KIP for reference. > > >> > > >> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwi > > ki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit > > %2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&data=02%7C01%7CThom > > as.Becker%40tivo.com%7Ce0235483b1eb4f259c5c08d7a8d1c16b%7Cd05b7c6912014c > > 0db45d7f1dcc227e4d%7C1%7C1%7C637163491160869277&sdata=zYpCSFOsyN4%2B > > 4rKRZBQ%2FZvcGQ4EINR9Qm6PLsB7EKrc%3D&reserved=0 > > >> > > >> > > Currently, I seek to formalize our approach for this KIP first before > > >> we determine concrete API additions / configurations. Some > > >> configs might warrant adding, whiles others are not necessary > > >> since adding them would only increase complexity of Kafka > > >> Streams. Cheers, Richard > > >> > > >> > > >> > > >> > > >> > > >> > > >> ________________________________ This email and any attachments > > >> may contain confidential and privileged material for the sole use > > >> of the intended recipient. Any review, copying, or distribution > > >> of this email (or any attachments) by others is prohibited. If > > >> you are not the intended recipient, please contact the sender > > >> immediately and permanently delete this email and any > > >> attachments. No employee or agent of TiVo is authorized to > > >> conclude any binding agreement on behalf of TiVo by email. > > >> Binding agreements with TiVo may only be made by a signed written > > >> agreement. -- *Tommy Becker* *Principal Engineer * > > >> > > >> *Personalized Content Discovery* > > >> > > >> *O* +1 919.460.4747 *tivo.com* <http://www.tivo.com/> > > >> > > >> > > >> > > >> This email and any attachments may contain confidential and > > >> privileged material for the sole use of the intended recipient. > > >> Any review, copying, or distribution of this email (or any > > >> attachments) by others is prohibited. If you are not the intended > > >> recipient, please contact the sender immediately and permanently > > >> delete this email and any attachments. No employee or agent of > > >> TiVo is authorized to conclude any binding agreement on behalf of > > >> TiVo by email. Binding agreements with TiVo may only be made by a > > >> signed written agreement. > > > > > > -- > > > > > > *Tommy Becker* /Principal Engineer / > > > > > > /Personalized Content Discovery/ > > > > > > *O* +1 919.460.4747 *tivo.com* <http://www.tivo.com/> > > > > > > > > > ---------------------------------------------------------------------- > > - -- > > > > > > This email and any attachments may contain confidential and > > > privileged material for the sole use of the intended recipient. Any > > > review, copying, or distribution of this email (or any attachments) > > > by others is prohibited. If you are not the intended recipient, > > > please contact the sender immediately and permanently delete this > > > email and any attachments. No employee or agent of TiVo is > > > authorized to conclude any binding agreement on behalf of TiVo by > > > email. Binding agreements with TiVo may only be made by a signed > > > written agreement. > > -----BEGIN PGP SIGNATURE----- > > > > iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl454+oACgkQO4miYXKq > > /OjU9xAAgdnk1JIXS+cKSepgz95o04M57DkhDlINU6XB30gvXHDhs4Flh+Z36Jei > > g+ch6QDbt0OSD5qkq/gJ4xkZmsS2odYFzkOq8A3ROfsY6delDw3KOpD7JTJiy0g+ > > TPUeFadzZFeh8t/+c2aIJq/HscWbsVwR5B4k/p85kpSDRkk8Hy3RwFF9/BB/yOss > > Nmfs+JSe6xPiIQG8NwWLy+4yfQJ/j+r3JF6S9EbRtWIUUlIjSzhcCHraB6QbhObS > > BYNtZEaGFcxuxwg45fywHo7Q5CyUNCulZ7NPzvTHxX1vuxQ6hHjcoEu4SU7gyP0B > > 5f0f4DfGR7o5bz+E3Bu8Q6xYVDNo86bCp0/1R557R+eESbLIL5q8EAgVYE8JO+89 > > V3oVr1NiJ4slMQ5AZKNBke9J3IdUrDQCkB2i4w6FUkGtIb1XaEanX9ETg/4bWK/D > > yb5UZH6tN50jFF/cTCoT39Wp6QdJnX2tKlgp9GT90dSG9ELJJNcFhzg/7+D0kVkt > > VSNkg57NUg/KcIFhfT4/MXeuaawU7wYXD8a+OaJqBSapDc26oK9IExScltuY+PVX > > ltp1pKvAibHLWDJaAeX61jN48ukpZxHFWgGaNs2wYmwR17xE4xVgTnBTfJOd+5qk > > /J/Re36UlHgDGLPCXtrdrlGNhL/sn8zg2XaR3Bt9VBYHSVpL+H8= > > =CzUe > > -----END PGP SIGNATURE----- > > >