Hi all, Bumping this. If you feel that this KIP is not too urgent. Then let me know. :)
Cheers, Richard On Thu, Feb 6, 2020 at 4:55 PM Richard Yu <yohan.richard...@gmail.com> wrote: > Hi all, > > I've had just a few thoughts regarding the forwarding of <key, > change<old_value, new_value>>. As Matthias already mentioned, there are two > separate priorities by which we can judge this KIP: > > 1. A optimization perspective: In this case, the user would prefer the > impact of this KIP to be as minimal as possible. By such logic, if > stateless operations are performed twice, that could prove unacceptable for > them. (since operations can prove expensive) > > 2. Semantics correctness perspective: Unlike the optimization approach, we > are more concerned with all KTable operations obeying the same emission > policy. i.e. emit on change. In this case, a discrepancy would not be > tolerated, even though an extra performance cost will be incurred. > Therefore, we will follow Matthias's approach, and then perform the > operation once on the old value, and once on the new. > > The issue here I think is more black and white than in between. The second > option in particular would be favorable for users with inexpensive > stateless operations, while for the former option, we are probably dealing > with more expensive ones. So the simplest solution is probably to allow the > user to choose one of the behaviors, and have a config which can switch in > between them. > > Its the simplest compromise I can come up with at the moment, but if you > think you have a better plan which could better balance tradeoffs. Then > please let us know. :) > > Best, > Richard > > On Wed, Feb 5, 2020 at 5:12 PM John Roesler <vvcep...@apache.org> wrote: > >> Hi all, >> >> Thanks for the thoughtful comments! >> >> I need more time to reflect on your thoughts, but just wanted to offer >> a quick clarification about equals(). >> >> I only meant that we can't be sure if a class's equals() implementation >> returns true for two semantically identical instances. I.e., if a class >> doesn't >> override the default equals() implementation, then we would see behavior >> like: >> >> new MyPair("A", 1).equals(new MyPair("A", 1)) returns false >> >> In that case, I would still like to catch no-op updates by comparing the >> serialized form of the records when we happen to have it serialized anyway >> (such as when the operation is stateful, or when we're sending to a >> repartition topic and we have both the "new" and "old" value from >> upstream). >> >> I didn't mean to suggest we'd try to use reflection to detect whether >> equals >> is implemented, although that is a neat trick. I was thinking more of a >> belt-and-suspenders algorithm where we do the check for no-ops based on >> equals() and then _also_ check the serialized bytes for equality. >> >> Thanks, >> -John >> >> On Wed, Feb 5, 2020, at 15:31, Ted Yu wrote: >> > Thanks for the comments, Matthias. >> > >> > w.r.t. requirement of an `equals()` implementation, each template type >> > would have an equals() method. We can use the following code to know >> > whether it is provided by JVM or provided by user. >> > >> > boolean customEquals = false; >> > try { >> > Class cls = value.getClass().getMethod("equals", >> > Object.class).getDeclaringClass(); >> > if (!Object.class.equals(cls)) { >> > customEquals = true; >> > } >> > } catch (NoSuchMethodException nsme) { >> > // equals is always defined, this wouldn't hit >> > } >> > >> > The next question is: what if the user doesn't provide equals() method ? >> > Would we automatically fall back to emit-on-update ? >> > >> > Cheers >> > >> > On Tue, Feb 4, 2020 at 1:37 PM Matthias J. Sax <mj...@apache.org> >> wrote: >> > >> > > -----BEGIN PGP SIGNED MESSAGE----- >> > > Hash: SHA512 >> > > >> > > First a high level comment: >> > > >> > > Overall, I would like to make one step back, and make sure we are >> > > discussion on the same level. Originally, I understood this KIP as a >> > > proposed change of _semantics_, however, given the latest discussion >> > > it seems it's actually not -- it's more an _optimization_ proposal. >> > > Hence, we only need to make sure that this optimization does not break >> > > existing semantics. It this the right way to think about it? >> > > >> > > If yes, than it might actually be ok to have different behavior >> > > depending if there is a materialized KTable or not. So far, we never >> > > defined a public contract about our emit strategy and it seems this >> > > KIP does not define one either. >> > > >> > > Hence, I don't have as strong of an opinion about sending oldValues >> > > for example any longer. I guess the question is really, what can we >> > > implement in a reasonable way. >> > > >> > > >> > > >> > > Other comments: >> > > >> > > >> > > @Richard: >> > > >> > > Can you please add the KIP to the KIP overview table: It's missing >> > > ( >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pro >> > > posals). >> > > >> > > >> > > @Bruno: >> > > >> > > You mentioned caching. I think it's irrelevant (orthogonal) and we can >> > > discuss this KIP without considering it. >> > > >> > > >> > > @John: >> > > >> > > > Even in the source table, we forward the updated record with the >> > > > higher of the two timestamps. So the example is more like: >> > > >> > > That is not correct. Currently, we forward with the smaller >> > > out-of-order timestamp (changing the timestamp would corrupt the data >> > > - -- we don't know, because we don't check, if the value is the same >> or >> > > a different one, hence, we must emit the out-of-order record as-is). >> > > >> > > If we start to do emit-on-change, we also need to emit a new record if >> > > the timestamp changes due to out-of-order data, hence, we would still >> > > need to emit <K,V,T1> because that give us correct semantics: assume >> > > you have a filter() and afterward use the filter KTable in a >> > > stream-table join -- the lower T1 timestamp must be propagated to the >> > > filtered KTable to ensure that that the stream-table join compute the >> > > correct result. >> > > >> > > >> > > >> > > Your point about requiring an `equals()` implementation is actually a >> > > quite interesting one and boils down to my statement from above about >> > > "what can we actually implement". What I don't understand is: >> > > >> > > > This way, we still don't have to rely on the existence of an >> > > > equals() method, but if it is there, we can benefit from it. >> > > >> > > Your bullet point (2) says it uses `equals()` -- hence, it seems we >> > > actually to rely on it? Also, how can we detect if there is an >> > > `equals()` method to do the comparison? Would be fail if we don't have >> > > `equals()` nor corresponding serializes to do the comparison? >> > > >> > > >> > > >> > > > Wow, really good catch! Yes, we absolutely need metrics and logs if >> > > > we're going to drop any records. And, yes, we should propose >> > > > metrics and logs that are similar to the existing ones when we drop >> > > > records for other reasons. >> > > >> > > I am not sure about this point. In fact, we have already some no-ops >> > > in Kafka Streams in our join-operators and don't report any of those >> > > either. Emit-on-change is operator semantics and I don't see why we >> > > would need to have a metric for it? It seems to be quite different >> > > compared to dropping late or malformed records. >> > > >> > > >> > > - -Matthias >> > > >> > > >> > > >> > > On 2/4/20 7:13 AM, Thomas Becker wrote: >> > > > Thanks John for your thoughtful reply. Some comments inline. >> > > > >> > > > >> > > > On Mon, 2020-02-03 at 11:51 -0600, John Roesler wrote: >> > > >> [EXTERNAL EMAIL] Attention: This email was sent from outside >> > > >> TiVo. DO NOT CLICK any links or attachments unless you expected >> > > >> them. ________________________________ >> > > >> >> > > >> >> > > >> Hi Tommy, >> > > >> >> > > >> Thanks for the context. I can see the attraction of considering >> > > >> these use cases together. >> > > >> >> > > >> To answer your question, if a part of the record is not relevant >> > > >> to downstream consumers, I was thinking you could just use a >> > > >> mapValue to remove it. >> > > >> >> > > >> E.g., suppose you wanted to do a join between two tables. >> > > >> >> > > >> employeeInfo.join( employeePayroll, (info, payroll) -> new >> > > >> Result(info.name(), payroll.salary()) ) >> > > >> >> > > >> We only care about one attribute from the Info table (name), and >> > > >> one from the Payroll table (salary), and these attributes change >> > > >> rarely. On the other hand, there might be many other attributes >> > > >> that change frequently of these tables. We can avoid triggering >> > > >> the join unnecessarily by mapping the input tables to drop the >> > > >> unnecessary information before the join: >> > > >> >> > > >> names = employeeInfo.mapValues(info -> info.name()) salaries = >> > > >> employeePayroll.mapValues(payroll -> payroll.salary()) >> > > >> >> > > >> names.join( salaries, (name, salary) -> new Result(name, salary) >> > > >> ) >> > > > >> > > > Ahh yes I see. This works, but in the case where you're using >> > > > schemas as we are (e.g. Avro), it seems like this approach could >> > > > lead to a proliferation of "skinny" record types that just drop >> > > > various fields. >> > > > >> > > >> >> > > >> Especially if we take Matthias's idea to drop non-changes even >> > > >> for stateless operations, this would be quite efficient and is >> > > >> also a very straightforward optimization to understand once you >> > > >> know that Streams provides emit-on-change. >> > > >> >> > > >> From the context that you provided, it seems like a slightly >> > > >> different situation, though. Reading between the lines a little, >> > > >> it sounds like: in contrast to the example above, in which we are >> > > >> filtering out extra _data_, you have some extra _metadata_ that >> > > >> you still wish to pass down with the data when there is a "real" >> > > >> update, but you don't want the metadata itself to cause an >> > > >> update. >> > > > >> > > > Despite my lack of clarity, yes you've got it right ;) This >> > > > particular processor is the first stop for this data after coming >> > > > in from external users, who often simply post the same content each >> > > > time and we're trying to shield downstream consumers from >> > > > unnecessary churn. >> > > > >> > > >> >> > > >> It does seem handy to be able to plug in a custom ChangeDetector >> > > >> for this purpose, but I worry about the API complexity. Maybe you >> > > >> can help think though how to provide the same benefit while >> > > >> limiting user-facing complexity. >> > > >> >> > > >> Here's some extra context to consider: >> > > >> >> > > >> We currently don't make any extra requirements about the nature >> > > >> of data that you can use in Streams. For example, you don't have >> > > >> to implement hashCode and equals, or compareTo, etc. With the >> > > >> current proposal, we can do an airtight comparison based only on >> > > >> the serialized form of the values, and we actually don't have to >> > > >> deserialize the "prior" value at all for a large number of >> > > >> operations. Admitedly, if we extend the proposal to include no-op >> > > >> detection for stateless operations, we'd probably need to rely on >> > > >> equals() for no-op checking, otherwise we'd wind up requiring >> > > >> serdes for stateless operations as well. Actually, I'd probably >> > > >> argue for doing exactly that: >> > > >> >> > > >> 1. In stateful operations, drop if the serialized byte[]s are the >> > > >> same. After deserializing, also drop if the objects are equal >> > > >> according to Object#equals(). >> > > >> >> > > >> 2. In stateless operations, compare the "new" and "old" values >> > > >> (if "old" is available) based on Object#equals(). >> > > >> >> > > >> 3. As a final optimization, after serializing and before sending >> > > >> repartition records, compare the serialized data and drop >> > > >> no-ops. >> > > >> >> > > >> This way, we still don't have to rely on the existence of an >> > > >> equals() method, but if it is there, we can benefit from it. >> > > >> Also, we don't require a serde in any new situations, but we can >> > > >> still leverage it when it is available. >> > > >> >> > > >> For clarity, in my example above, even if the employeeInfo and >> > > >> employeePayroll and Result records all have serdes, we need the >> > > >> "name" field (presumably String) and the "salary" field >> > > >> (presumable a Double) to have serdes as well in the naive >> > > >> implementation. But if we can leverage equals(), then the "right >> > > >> thing" happens automatically. >> > > > >> > > > I still don't totally follow why the individual components (name, >> > > > salary) would have to have serdes here. If Result has one, we >> > > > compare bytes, and if Result additionally has an equals() method >> > > > (which presumably includes equals comparisons on the constituent >> > > > fields), have we not covered our bases? >> > > > >> > > >> >> > > >> This dovetails in with my primary UX concern; where would the >> > > >> ChangeDetector actually be registered? None of the operators in >> > > >> my example have names or topics or any other identifiable >> > > >> characteristic that could be passed to a ChangeDetector class >> > > >> registered via config. You could say that we make ChangeDetector >> > > >> an optional parameter to every operation in Streams, but this >> > > >> seems to carry quite a bit of mental burden with it. People will >> > > >> wonder what it's for and whether or not they should be using it. >> > > >> There would almost certainly be a misconception that it's >> > > >> preferable to implement it always, which would be unfortunate. >> > > >> Plus, to actually implment metadata flowing through the topology >> > > >> as in your use case, you'd have to do two things: 1. make sure >> > > >> that all operations actually preserve the metadata alongside the >> > > >> data (e.g., don't accidentally add a mapValues like I did, or you >> > > >> drop the metadata). 2. implement a ChangeDetector for every >> > > >> single operation in the topology, or you don't get the benefit of >> > > >> dropping non-changes internally 2b. Alternatively, you could just >> > > >> add the ChangeDetector to one operation toward the end of the >> > > >> topology. This would not drop redundant computation internally, >> > > >> but only drop redundant _outputs_. But this is just about the >> > > >> same as your current solution. >> > > > >> > > > I definitely see your point regarding configuration. I was >> > > > originally thinking about this when the deduplication was going to >> > > > be opt-in, and it seemed very natural to say something like: >> > > > >> > > > employeeInfo.join(employeePayroll, (info, payroll) -> new >> > > > Result(info.name(), payroll.salary())) >> > > > .suppress(duplicatesAccordingTo(someChangeDetector)) >> > > > >> > > > Alternatively you can imagine a similar method being on >> > > > Materialized, though obviously this makes less sense if we don't >> > > > want to require materialization. If we're now talking about >> > > > changing the default behavior and not having any configuration >> > > > options, it's harder to find a place for this. >> > > > >> > > > >> > > > >> > > >> A final thought; if it really is a metadata question, can we just >> > > >> plan to finish up the support for headers in Streams? I.e., give >> > > >> you a way to control the way that headers flow through the >> > > >> topology? Then, we could treat headers the same way we treat >> > > >> timestamps in the no-op checking... We completely ignore them >> > > >> for the sake of comparison. Thus, neither the timestamp nor the >> > > >> headers would get updated in internal state or in downstream >> > > >> views as long as the value itself doesn't change. This seems to >> > > >> give us a way to support your use case without adding to the >> > > >> mental overhead of using Streams for simple things. >> > > > >> > > > Agree headers could be a decent fit for this particular case >> > > > because it's mostly metadata, though to be honest we haven't looked >> > > > at headers much (mostly because, and to your point, support seems >> > > > to be lacking). I feel like there would be other cases where this >> > > > feature could be valuable, but I admit I can't come up with >> > > > anything right this second. Perhaps yuzhihong had an example in >> > > > mind? >> > > > >> > > >> >> > > >> I.e., simple things should be easy, and complex things should be >> > > >> possible. >> > > >> >> > > >> What are your thoughts? Thanks, -John >> > > >> >> > > >> >> > > >> On Mon, Feb 3, 2020, at 07:19, Thomas Becker wrote: >> > > >> >> > > >> Hi John, Can you describe how you'd use filtering/mapping to >> > > >> deduplicate records? To give some background on my suggestion we >> > > >> currently have a small stream processor that exists solely to >> > > >> deduplicate, which we do using a process that I assume would be >> > > >> similar to what would be done here (with a store of keys and hash >> > > >> values). But the records we are deduplicating have some metadata >> > > >> fields (such as timestamps of when the record was posted) that we >> > > >> don't consider semantically meaningful for downstream consumers, >> > > >> and therefore we also suppress updates that only touch those >> > > >> fields. >> > > >> >> > > >> -Tommy >> > > >> >> > > >> >> > > >> On Fri, 2020-01-31 at 19:30 -0600, John Roesler wrote: [EXTERNAL >> > > >> EMAIL] Attention: This email was sent from outside TiVo. DO NOT >> > > >> CLICK any links or attachments unless you expected them. >> > > >> ________________________________ >> > > >> >> > > >> Hi Thomas and yuzhihong, That’s an interesting idea. Can you help >> > > >> think of a use case that isn’t also served by filtering or >> > > >> mapping beforehand? Thanks for helping to design this feature! >> > > >> -John On Fri, Jan 31, 2020, at 18:56, yuzhih...@gmail.com >> > > >> <mailto:yuzhih...@gmail.com> wrote: I think this is good idea. On >> > > >> Jan 31, 2020, at 4:49 PM, Thomas Becker <thomas.bec...@tivo.com >> > > >> <mailto:thomas.bec...@tivo.com>> wrote: How do folks feel about >> > > >> allowing the mechanism by which no-ops are detected to be >> > > >> pluggable? Meaning use something like a hash by default, but you >> > > >> could optionally provide an implementation of something to use >> > > >> instead, like a ChangeDetector. This could be useful for example >> > > >> to ignore changes to certain fields, which may not be relevant to >> > > >> the operation being performed. ________________________________ >> > > >> From: John Roesler <vvcep...@apache.org >> > > >> <mailto:vvcep...@apache.org>> Sent: Friday, January 31, 2020 4:51 >> > > >> PM To: dev@kafka.apache.org <mailto:dev@kafka.apache.org> >> > > >> <dev@kafka.apache.org <mailto:dev@kafka.apache.org>> Subject: Re: >> > > >> [KAFKA-557] Add emit on change support for Kafka Streams >> > > >> [EXTERNAL EMAIL] Attention: This email was sent from outside >> > > >> TiVo. DO NOT CLICK any links or attachments unless you expected >> > > >> them. ________________________________ >> > > >> >> > > >> Hello all, Sorry for my silence. It seems like we are getting >> > > >> close to consensus. Hopefully, we could move to a vote soon! All >> > > >> of the reasoning from Matthias and Bruno around timestamp is >> > > >> compelling. I would be strongly in favor of stating a few things >> > > >> very clearly in the KIP: 1. Streams will drop no-op updates only >> > > >> for KTable operations. That is, we won't make any changes to >> > > >> KStream aggregations at the moment. It does seem like we can >> > > >> potentially revisit the time semantics of that operation in the >> > > >> future, but we don't need to do it now. On the other hand, the >> > > >> proposed semantics for KTable timestamps (marking the beginning >> > > >> of the validity of that record) makes sense to me. 2. Streams >> > > >> will only drop no-op updates for _stateful_ KTable operations. We >> > > >> don't want to add a hard guarantee that Streams will _never_ >> > > >> emit a no-op table update because it would require adding state >> > > >> to otherwise stateless operations. If someone is really concerned >> > > >> about a particular stateless operation producing a lot of no-op >> > > >> results, all they have to do is materialize it, and Streams would >> > > >> automatically drop the no-ops. Additionally, I'm +1 on not adding >> > > >> an opt-out at this time. Regarding the KIP itself, I would clean >> > > >> it up a bit before calling for a vote. There is a lot of >> > > >> "discussion"-type language there, which is very natural to read, >> > > >> but makes it a bit hard to see what _exactly_ the kip is >> > > >> proposing. Richard, would you mind just making the "proposed >> > > >> behavior change" a simple and succinct list of bullet points? >> > > >> I.e., please drop glue phrases like "there has been some >> > > >> discussion" or "possibly we could do X". For the final version of >> > > >> the KIP, it should just say, "Streams will do X, Streams will do >> > > >> Y". Feel free to add an elaboration section to explain more about >> > > >> what X and Y mean, but we don't need to talk about possibilities >> > > >> or alternatives except in the "rejected alternatives" section. >> > > >> Accordingly, can you also move the options you presented in the >> > > >> intro to the "rejected alternatives" section and only mention the >> > > >> final proposal itself? This just really helps reviewers to know >> > > >> what they are voting for, and it helps everyone after the fact >> > > >> when they are trying to get clarity on what exactly the proposal >> > > >> is, versus all the things it could have been. Thanks, -John >> > > >> >> > > >> On Mon, Jan 27, 2020, at 18:14, Richard Yu wrote: Hello to all, >> > > >> I've finished making some initial modifications to the KIP. I >> > > >> have decided to keep the implementation section in the KIP for >> > > >> record-keeping purposes. For now, we should focus on only the >> > > >> proposed behavior changes instead. See if you have any comments! >> > > >> Cheers, Richard On Sat, Jan 25, 2020 at 11:12 AM Richard Yu >> > > >> <yohan.richard...@gmail.com <mailto:yohan.richard...@gmail.com>> >> > > >> wrote: Hi all, Thanks for all the discussion! @John and @Bruno I >> > > >> will survey other possible systems and see what I can do. Just a >> > > >> question, by systems, I suppose you would mean the pros and cons >> > > >> of different reporting strategies? I'm not completely certain on >> > > >> this point, so it would be great if you can clarify on that. So >> > > >> here's what I got from all the discussion so far: - Since both >> > > >> Matthias and John seems to have come to a consensus on this, then >> > > >> we will go for an all-round behavorial change for KTables. After >> > > >> some thought, I decided that for now, an opt-out config will not >> > > >> be added. As John have pointed out, no-op changes tend to >> > > >> explode further down the topology as they are forwarded to more >> > > >> and more processor nodes downstream. - About using hash codes, >> > > >> after some explanation from John, it looks like hash codes might >> > > >> not be as ideal (for implementation). For now, we will omit that >> > > >> detail, and save it for the PR. - @Bruno You do have valid >> > > >> concerns. Though, I am not completely certain if we want to do >> > > >> emit-on-change only for materialized KTables. I will put it down >> > > >> in the KIP regardless. I will do my best to address all points >> > > >> raised so far on the discussion. Hope we could keep this going! >> > > >> Best, Richard On Fri, Jan 24, 2020 at 6:07 PM Bruno Cadonna >> > > >> <br...@confluent.io <mailto:br...@confluent.io>> wrote: Thank you >> > > >> Matthias for the use cases! Looking at both use cases, I think >> > > >> you need to elaborate on them in the KIP, Richard. Emit from >> > > >> plain KTable: I agree with Matthias that the lower timestamp >> > > >> makes sense because it marks the start of the validity of the >> > > >> record. Idempotent records with a higher timestamp can be safely >> > > >> ignored. A corner case that I discussed with Matthias offline is >> > > >> when we do not materialize a KTable due to optimization. Then we >> > > >> cannot avoid the idempotent records because we do not keep the >> > > >> first record with the lower timestamp to compare to. Emit from >> > > >> KTable with aggregations: If we specify that an aggregation >> > > >> result should have the highest timestamp of the records that >> > > >> participated in the aggregation, we cannot ignore any idempotent >> > > >> records. Admittedly, the result of an aggregation usually >> > > >> changes, but there are aggregations where the result may not >> > > >> change like min and max, or sum when the incoming records have a >> > > >> value of zero. In those cases, we could benefit of the emit on >> > > >> change, but only if we define the semantics of the aggregations >> > > >> to not use the highest timestamp of the participating records for >> > > >> the result. In Kafka Streams, we do not have min, max, and sum as >> > > >> explicit aggregations, but we need to provide an API to define >> > > >> what timestamp should be used for the result of an aggregation if >> > > >> we want to go down this path. All of this does not block this KIP >> > > >> and I just wanted to put this aspects up for discussion. The KIP >> > > >> can limit itself to emit from materialized KTables. However, the >> > > >> limits should be explicitly stated in the KIP. Best, Bruno >> > > >> >> > > >> >> > > >> On Fri, Jan 24, 2020 at 10:58 AM Matthias J. Sax >> > > >> <matth...@confluent.io <mailto:matth...@confluent.io>> wrote: >> > > >> IMHO, the question about semantics depends on the use case, in >> > > >> particular on the origin of a KTable. If there is a changlog >> > > >> topic that one reads directly into a KTable, emit-on-change does >> > > >> actually make sense, because the timestamp indicates _when_ the >> > > >> update was _effective_. For this case, it is semantically sound >> > > >> to _not_ update the timestamp in the store, because the second >> > > >> update is actually idempotent and advancing the timestamp is not >> > > >> ideal (one could even consider it to be wrong to advance the >> > > >> timestamp) because the "valid time" of the record pair did not >> > > >> change. This reasoning also applies to KTable-KTable joins. >> > > >> However, if the KTable is the result of an aggregation, I think >> > > >> emit-on-update is more natural, because the timestamp reflects >> > > >> the _last_ time (ie, highest timestamp) of all input records the >> > > >> contributed to the result. Hence, updating the timestamp and >> > > >> emitting a new record actually sounds correct to me. This applies >> > > >> to windowed and non-windowed aggregations IMHO. However, >> > > >> considering the argument that the timestamp should not be update >> > > >> in the first case in the store to begin with, both cases are >> > > >> actually the same, and both can be modeled as emit-on-change: if >> > > >> a `table()` operator does not update the timestamp if the value >> > > >> does not change, there is _no_ change and thus nothing is >> > > >> emitted. At the same time, if an aggregation operator does update >> > > >> the timestamp (even if the value does not change) there _is_ a >> > > >> change and we emit. Note that handling out-of-order data for >> > > >> aggregations would also work seamlessly with this approach -- for >> > > >> out-of-order records, the timestamp does never change, and thus, >> > > >> we only emit if the result itself changes. Therefore, I would >> > > >> argue that we might not even need any config, because the >> > > >> emit-on-change behavior is just correct and reduced the >> > > >> downstream load, while our current behavior is not ideal (even if >> > > >> it's also correct). Thoughts? -Matthias On 1/24/20 9:37 AM, John >> > > >> Roesler wrote: Hi Bruno, Thanks for that idea. I hadn't >> > > >> considered that option before, and it does seem like that would >> > > >> be the right place to put it if we think it might be semantically >> > > >> important to control on a table-by-table basis. I had been >> > > >> thinking of it less semantically and more practically. In the >> > > >> context of a large topology, or more generally, a large software >> > > >> system that contains many topologies and other event-driven >> > > >> systems, each no-op result becomes an input that is destined to >> > > >> itself become a no-op result, and so on, all the way through the >> > > >> system. Thus, a single pointless processing result becomes >> > > >> amplified into a large number of pointless computations, cache >> > > >> perturbations, and network and disk I/O operations. If you also >> > > >> consider operations with fan-out implications, like branching or >> > > >> foreign-key joins, the wasted resources are amplified not just in >> > > >> proportion to the size of the system, but the size of the system >> > > >> times the average fan-out (to the power of the number of fan-out >> > > >> operations on the path(s) through the system). In my time >> > > >> operating such systems, I've observed these effects to be very >> > > >> real, and actually, the system and use case doesn't have to be >> > > >> very large before the amplification poses an existential threat >> > > >> to the system as a whole. This is the basis of my advocating for >> > > >> a simple behavior change, rather than an opt-in config of any >> > > >> kind. It seems like Streams should "do the right thing" for the >> > > >> majority use case. My theory (which may be wrong) is that the >> > > >> majority use case is more like "relational queries" than "CEP >> > > >> queries". Even if you were doing some event-sensitive >> > > >> computation, wouldn't you do them as Stream operations (where >> > > >> this feature is inapplicable anyway)? In keeping with the >> > > >> "practical" perspective, I suggested the opt-out config only in >> > > >> the (I think unlikely) event that filtering out pointless updates >> > > >> actually harms performance. I'd also be perfectly fine without >> > > >> the opt-out config. I really think that (because of the timestamp >> > > >> semantics work already underway), we're already pre-fetching the >> > > >> prior result most of the time, so there would actually be very >> > > >> little extra I/O involved in implementing emit-on-change. >> > > >> However, we should consider whether my experience is likely to >> > > >> be general. Do you have some use case in mind for which you'd >> > > >> actually want some KTable results to be emit-on-update for >> > > >> semantic reasons? Thanks, -John >> > > >> >> > > >> On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna wrote: Hi Richard, >> > > >> Thank you for the KIP. I agree with John that we should focus on >> > > >> the interface and behavior change in a KIP. We can discuss the >> > > >> implementation later. I am also +1 for the survey. I had a >> > > >> thought about this. Couldn't we consider emit-on-change to be one >> > > >> config of suppress (like `untilWindowCloses`)? What you >> > > >> basically propose is to suppress updates if they do not change >> > > >> the result. Considering emit on change as a flavour of suppress >> > > >> would be more flexible because it would specify the behavior >> > > >> locally for a KTable instead of globally for all KTables. >> > > >> Additionally, specifying the behavior in one place instead of >> > > >> multiple places feels more intuitive and consistent to me. Best, >> > > >> Bruno On Fri, Jan 24, 2020 at 7:49 AM John Roesler >> > > >> <vvcep...@apache.org <mailto:vvcep...@apache.org>> wrote: Hi >> > > >> Richard, Thanks for picking this up! I know of at least one large >> > > >> community member for which this feature is absolutely essential. >> > > >> If I understand your two options, it seems like the proposal is >> > > >> to implement it as a behavior change regardless, and the question >> > > >> is whether to provide an opt-out config or not. Given that any >> > > >> implementation of this feature would have some performance impact >> > > >> under some workloads, and also that we don't know if anyone >> > > >> really depends on emit-on-update time semantics, it seems like we >> > > >> should propose to add an opt-out config. Can you update the KIP >> > > >> to mention the exact config key and value(s) you'd propose? Just >> > > >> to move the discussion forward, maybe something like: emit.on := >> > > >> change|update with the new default being "change" Thanks for >> > > >> pointing out the timestamp issue in particular. I agree that if >> > > >> we discard the latter update as a no-op, then we also have to >> > > >> discard its timestamp (obviously, we don't forward the timestamp >> > > >> update, as that's the whole point, but we also can't update the >> > > >> timestamp in the store, as the store must remain consistent with >> > > >> what has been emitted). I have to confess that I disagree with >> > > >> your implementation proposal, but it's also not necessary to >> > > >> discuss implementation in the KIP. Maybe it would be less >> > > >> controversial if you just drop that section for now, so that the >> > > >> KIP discussion can focus on the behavior change and config. Just >> > > >> for reference, there is some research into this domain. For >> > > >> example, see the "Report" section (3.2.3) of the SECRET paper: >> > > >> >> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeop >> > > le.csail.mit.edu >> %2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&data >> > > =02%7C01%7CThomas.Becker%40tivo.com >> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7 >> > > >> Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637163491160859282&sdata >> > > =4dSGIS8jNPAPP7B48r9e%2BUgFh3WdmzVyXhyT63eP8dI%3D&reserved=0 >> > > >> >> > > >> >> > > It might help to round out the proposal if you take a brief survey of >> > > >> the behaviors of other systems, along with pros and cons if any >> > > >> are reported. Thanks, -John >> > > >> >> > > >> On Fri, Jan 10, 2020, at 22:27, Richard Yu wrote: Hi everybody! >> > > >> I'd like to propose a change that we probably should've added for >> > > >> a long time now. The key benefit of this KIP would be reduced >> > > >> traffic in Kafka Streams since a lot of no-op results would no >> > > >> longer be sent downstream. Here is the KIP for reference. >> > > >> >> > > >> >> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwi >> > > ki.apache.org >> %2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit >> > > >> %2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&data=02%7C01%7CThom >> > > as.Becker%40tivo.com >> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7Cd05b7c6912014c >> > > >> 0db45d7f1dcc227e4d%7C1%7C1%7C637163491160869277&sdata=zYpCSFOsyN4%2B >> > > 4rKRZBQ%2FZvcGQ4EINR9Qm6PLsB7EKrc%3D&reserved=0 >> > > >> >> > > >> >> > > Currently, I seek to formalize our approach for this KIP first before >> > > >> we determine concrete API additions / configurations. Some >> > > >> configs might warrant adding, whiles others are not necessary >> > > >> since adding them would only increase complexity of Kafka >> > > >> Streams. Cheers, Richard >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> ________________________________ This email and any attachments >> > > >> may contain confidential and privileged material for the sole use >> > > >> of the intended recipient. Any review, copying, or distribution >> > > >> of this email (or any attachments) by others is prohibited. If >> > > >> you are not the intended recipient, please contact the sender >> > > >> immediately and permanently delete this email and any >> > > >> attachments. No employee or agent of TiVo is authorized to >> > > >> conclude any binding agreement on behalf of TiVo by email. >> > > >> Binding agreements with TiVo may only be made by a signed written >> > > >> agreement. -- *Tommy Becker* *Principal Engineer * >> > > >> >> > > >> *Personalized Content Discovery* >> > > >> >> > > >> *O* +1 919.460.4747 *tivo.com* <http://www.tivo.com/> >> > > >> >> > > >> >> > > >> >> > > >> This email and any attachments may contain confidential and >> > > >> privileged material for the sole use of the intended recipient. >> > > >> Any review, copying, or distribution of this email (or any >> > > >> attachments) by others is prohibited. If you are not the intended >> > > >> recipient, please contact the sender immediately and permanently >> > > >> delete this email and any attachments. No employee or agent of >> > > >> TiVo is authorized to conclude any binding agreement on behalf of >> > > >> TiVo by email. Binding agreements with TiVo may only be made by a >> > > >> signed written agreement. >> > > > >> > > > -- >> > > > >> > > > *Tommy Becker* /Principal Engineer / >> > > > >> > > > /Personalized Content Discovery/ >> > > > >> > > > *O* +1 919.460.4747 *tivo.com* <http://www.tivo.com/> >> > > > >> > > > >> > > > >> ---------------------------------------------------------------------- >> > > - -- >> > > > >> > > > This email and any attachments may contain confidential and >> > > > privileged material for the sole use of the intended recipient. Any >> > > > review, copying, or distribution of this email (or any attachments) >> > > > by others is prohibited. If you are not the intended recipient, >> > > > please contact the sender immediately and permanently delete this >> > > > email and any attachments. No employee or agent of TiVo is >> > > > authorized to conclude any binding agreement on behalf of TiVo by >> > > > email. Binding agreements with TiVo may only be made by a signed >> > > > written agreement. >> > > -----BEGIN PGP SIGNATURE----- >> > > >> > > iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl454+oACgkQO4miYXKq >> > > /OjU9xAAgdnk1JIXS+cKSepgz95o04M57DkhDlINU6XB30gvXHDhs4Flh+Z36Jei >> > > g+ch6QDbt0OSD5qkq/gJ4xkZmsS2odYFzkOq8A3ROfsY6delDw3KOpD7JTJiy0g+ >> > > TPUeFadzZFeh8t/+c2aIJq/HscWbsVwR5B4k/p85kpSDRkk8Hy3RwFF9/BB/yOss >> > > Nmfs+JSe6xPiIQG8NwWLy+4yfQJ/j+r3JF6S9EbRtWIUUlIjSzhcCHraB6QbhObS >> > > BYNtZEaGFcxuxwg45fywHo7Q5CyUNCulZ7NPzvTHxX1vuxQ6hHjcoEu4SU7gyP0B >> > > 5f0f4DfGR7o5bz+E3Bu8Q6xYVDNo86bCp0/1R557R+eESbLIL5q8EAgVYE8JO+89 >> > > V3oVr1NiJ4slMQ5AZKNBke9J3IdUrDQCkB2i4w6FUkGtIb1XaEanX9ETg/4bWK/D >> > > yb5UZH6tN50jFF/cTCoT39Wp6QdJnX2tKlgp9GT90dSG9ELJJNcFhzg/7+D0kVkt >> > > VSNkg57NUg/KcIFhfT4/MXeuaawU7wYXD8a+OaJqBSapDc26oK9IExScltuY+PVX >> > > ltp1pKvAibHLWDJaAeX61jN48ukpZxHFWgGaNs2wYmwR17xE4xVgTnBTfJOd+5qk >> > > /J/Re36UlHgDGLPCXtrdrlGNhL/sn8zg2XaR3Bt9VBYHSVpL+H8= >> > > =CzUe >> > > -----END PGP SIGNATURE----- >> > > >> > >> >