Hi all, Just some updates. Below is the vote thread: https://sematext.com/opensee/m/Kafka/uyzND1h1NPW1tLVQR?subj=+VOTE+KIP+557+Add+emit+on+change+support+for+Kafka+Streams
It would be great if we can include this change to Kafka. :) Cheers, Richard On Thu, Feb 27, 2020 at 6:45 PM Richard Yu <yohan.richard...@gmail.com> wrote: > Hi all, > > @John Will add some notes accordingly. > > To all: Thanks for all your input! > It looks like we can wrap up this discussion thread then. > > I've started a vote thread, so please feel free to cast your vote there! > > We should be pretty close. :) > > Cheers, > Richard > > On Thu, Feb 27, 2020 at 2:34 PM John Roesler <vvcep...@apache.org> wrote: > >> Hi Richard, >> >> Thanks for the update! >> >> I read it over, and overall it looks good! >> >> I have only a minor concern about the rate metric definition: >> > The rate option indicates the ratio of records dropped to actual volume >> of records passing through the task >> That's not the definition of a "rate". It should be something more like >> "the average number of dropped idempotent updates per second". >> >> Incidentally, I mentioned this KIP to Guozhang, and he brought up an >> interesting concern I'd like to share with the list. He noted that if we >> filter >> out idempotent table updates, stream time will not advance with every >> input event anymore. He asked if this would have a negative impact on >> operations that depend on stream time. >> >> I think this is a valid concern. For example, you can use Suppress to >> buffer KTable updates until a certain amount of stream time passes. >> Specifically, >> the Suppress processor itself advances stream time as it receives new >> records >> to its `process` method. In the pathological case, all updates are >> idempotent, >> get dropped, and the Suppress operator never emits anything, even though >> to >> an outside observer, stream time should have advanced. >> >> Example scenario: >> > inputTable = builder.table(input) >> > suppressed = inputTable.suppress(untilTimeLimit(10)) >> > >> > input: key=A, timestamp=0, value=X >> > inputTable: key=A, timestamp=0, value=X >> > suppressed buffer: key=A, timestamp=0, value=X (observed stream time = >> 0) >> > output: (nothing) >> > >> > input: key=A, timestamp=11, value=X >> > // update is idempotent, so it gets dropped >> > inputTable: key=A, timestamp=0, value=X >> > suppressed buffer: key=A, timestamp=0, value=X (observed stream time = >> 0) >> > output: (nothing) >> >> Note that even though stream time has technically advanced beyond the >> suppression config of 10, the suppression buffer doesn't see it because >> the >> KTable source already dropped the idempotent update. >> >> I'm thinking that this situation is indeed concerning, and we should be >> aware >> and make a note of it. However, I don't think that it should change our >> proposal. >> To understand why, I'll enumerate all the usages of stream time in Kafka >> Streams: >> >> windowed operations >> ----------------------------- >> KStreamSessionWindowAggregate & KStreamWindowAggregate: >> - used to determine if an incoming record belongs to a window that is >> already closed >> Suppressed.untilWindowCloses(): >> - used to flush out results for windows, once they are closed >> AbstractRocksDBSegmentedBytesStore & InMemorySessionStore & >> InMemoryWindowStore: >> - used to create new segments and drop old ones that are out of >> retention >> >> non-windowed operations >> ----------------------------------- >> Suppressed.untilTimeLimit >> - used to flush out prior updates that have passed the configured age, >> in stream time >> >> Note that most of the usages are in windowed operations. One interesting >> thing >> about this context is that records with higher timestamps belong to >> different windows. >> In order to advance stream time far enough to close a window or push it >> out of retention, the new records must have timestamps that in later >> windows, which means >> that they are updates to _new_ keys, which means they would not be >> suppressed as >> idempotent. >> >> Updates within a window could still be suppressed, though: >> For example, if the window size is 10, and the grace period is 5, and we >> get updates all >> for the same key with timestamps 0, 11, and 16, today, we would emit the >> record for >> the [0,10) window as soon as we got the update at time 16 (since stream >> time is now >> past the window end + grace period time of 15). But if we drop the >> time=16 update, we >> wouldn't emit the [0,10) window results until we get a record with time >> >= 20. >> >> You can see that the maximum amount of (stream) time that dropping >> idempotent >> updates could delay updates is one window. This might be surprising, but >> it doesn't >> seem too bad, especially since Suppress explicitly does _not_ promise to >> emit the >> results at the earliest possible time, just at some time after the window >> closes. >> >> Even better, though, all the windowed aggregations are _stream_ >> aggregations >> that _produce_ a KTable, and we already decided that (at least for now), >> we would >> include the timestamp in the idempotence check for stream aggregation >> results. >> With this strategy, we would actually not suppress _any_ update to a >> stream aggregation (windowed or otherwise) that advances stream time. >> >> So there's no problem at all with windowed operations. >> >> This only leaves non-windowed operations, of which there's only one. I >> have to admit >> that I regret Suppressed.untilTimeLimit. It turns out that everyone I've >> heard of who >> used this API actually wanted it to be wall-clock based, not stream-time >> based. So, >> I'm not sure in practice if this sharp edge will actually cut anyone. >> >> I think a "proper" fix would be to introduce some kind of control message >> to advance >> stream time independently of records. We've already talked about this a >> little in >> KAFKA-8769, and it would also be necessary for global consistency >> markers. Basically, >> once a record is dropped as an idempotent update, we could still forward >> a time >> marker through the topology, which could trigger suppressions, as well as >> window >> boundaries, but wouldn't result in any computations. >> >> But practically speaking, I'm really not confident that there's anyone >> really using the >> untilTimeLimit suppression, and even if they are, if they would really >> see consecutive >> idempotent updates for long enough to really have an observable impact on >> what gets emitted from the suppression buffer. In fact, if some >> hypothetical person did find >> themselves on the wrong end of my assumption, they could _remove_ the >> suppression, and rely instead on idempotence-dropping to get the same >> level of >> traffic reduction that they enjoy from the suppression. >> >> Anyway, long story short, I'm advocating to continue with the current >> proposal >> and just make a note of the situations I've outlined above. >> >> Thanks for your time, >> -John >> >> >> On Thu, Feb 27, 2020, at 14:33, Richard Yu wrote: >> > Hi all, >> > >> > I might've made a minor mistake. The processor node level is level 3, >> not >> > level 1. >> > I will correct the KIP accordingly. >> > >> > After looking over things, I decided to start the voting thread this >> > afternoon. >> > >> > Cheers, >> > Richard >> > >> > On Thu, Feb 27, 2020 at 12:29 PM Richard Yu <yohan.richard...@gmail.com >> > >> > wrote: >> > >> > > Hi Bruno, Hi John, >> > > >> > > Thanks for your comments! I updated the KIP accordingly, and it looks >> like >> > > for quite a few points. I was doing some beating around the bush which >> > > could've been avoided. >> > > >> > > Looks like we can reduce the metric to Level 1 (per processor node) >> then. >> > > >> > > I've cleaned up most of the unnecessary info, and we should be fairly >> > > close. >> > > I will start working on a PR soon for this KIP. (although we might >> split >> > > that up into stages) >> > > >> > > Cheers, >> > > Richard >> > > >> > > On Thu, Feb 27, 2020 at 6:06 AM Bruno Cadonna <br...@confluent.io> >> wrote: >> > > >> > >> Hi John, >> > >> >> > >> I agree with you. It is better to measure the metric on processor >> node >> > >> level. The users can do the rollup to task-level by themselves. >> > >> >> > >> Best, >> > >> Bruno >> > >> >> > >> On Thu, Feb 27, 2020 at 12:09 AM John Roesler <vvcep...@apache.org> >> > >> wrote: >> > >> > >> > >> > Hi Richard, >> > >> > >> > >> > I've been making a final pass over the KIP. >> > >> > >> > >> > Re: Proposed Behavior Change: >> > >> > >> > >> > I think this point is controversial and probably doesn't need to be >> > >> there at all: >> > >> > > 2.b. In certain situations where there is a high volume of >> idempotent >> > >> > > updates throughout the Streams DAG, it will be recommended >> practice >> > >> > > to materialize all operations to reduce traffic overall across >> the >> > >> entire >> > >> > > network of nodes. >> > >> > >> > >> > Re-reading all the points, it seems like we can sum them up in a >> way >> > >> that's >> > >> > a little more straight to the point, and gives us the right amount >> of >> > >> flexibility: >> > >> > >> > >> > > Proposed Behavior Changes >> > >> > > >> > >> > > Definition: "idempotent update" is one in which the new result >> and >> > >> prior >> > >> > > result, when serialized, are identical byte arrays >> > >> > > >> > >> > > Note: an "update" is a concept that only applies to Table >> operations, >> > >> so >> > >> > > the concept of an "idempotent update" also only applies to Table >> > >> operations. >> > >> > > See >> > >> >> https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable >> > >> > > for more information. >> > >> > > >> > >> > > Given that definition, we propose for Streams to drop idempotent >> > >> updates >> > >> > > in any situation where it's possible and convenient to do so. For >> > >> example, >> > >> > > any time we already have both the prior and new results >> serialized, we >> > >> > > may compare them, and drop the update if it is idempotent. >> > >> > > >> > >> > > Note that under this proposal, we can implement idempotence >> checking >> > >> > > in the following situations: >> > >> > > 1. Any aggregation (for example, KGroupedStream, KGroupedTable, >> > >> > > TimeWindowedKStream, and SessionWindowedKStream operations) >> > >> > > 2. Any Materialized KTable operation >> > >> > > 3. Repartition operations, when we need to send both prior and >> new >> > >> results >> > >> > >> > >> > Notice that in my proposed wording, we neither limit ourselves to >> just >> > >> the >> > >> > situations enumerated, nor promise to implement the optimization in >> > >> every >> > >> > possible situation. IMHO, this is the best way to propose such a >> > >> feature. >> > >> > That way, we have the flexibility to implement it in stages, and >> also >> > >> to add >> > >> > on to the implementation in the future. >> > >> > >> > >> > >> > >> > Re: Metrics >> > >> > >> > >> > I agree with Bruno, although, I think it might just be a confusing >> > >> statement. >> > >> > It might be clearer to drop all the "discussion", and just say: "We >> > >> will add a >> > >> > metric to count the number of idempotent updates that we have >> dropped". >> > >> > >> > >> > Also, with respect to the metric, I'm wondering if the metric >> should be >> > >> task- >> > >> > level or processor-node-level. Since the interesting action takes >> place >> > >> inside >> > >> > individual processor nodes, I _think_ it would be higher leverage >> to >> > >> just >> > >> > measure it at the node level. WDYT? >> > >> > >> > >> > Re: Design Reasoning >> > >> > >> > >> > This section seems to be a little bit outdated. I also just >> noticed a >> > >> "surprise" >> > >> > configuration "timestamp.aggregation.selection.policy" hidden in >> point >> > >> 1.a. >> > >> > Is that part of the proposal? We haven't discussed it, and I think >> we >> > >> were >> > >> > talking about this KIP being "configuration free". >> > >> > >> > >> > There is also some discussion of discarded alternative in the >> Design >> > >> Reasoning >> > >> > section, which is confusing. Finally, there was a point there I >> didn't >> > >> understand >> > >> > at all, about stateless operators not being intended to load prior >> > >> results. >> > >> > This statement doesn't seem to be true, but it also doesn't seem >> to be >> > >> relevant, >> > >> > so maybe we can just drop it. >> > >> > >> > >> > Overall, it might help if you make a pass on this section, and just >> > >> discuss as >> > >> > briefly as possible the justification for the proposed behavior >> change, >> > >> and >> > >> > not adding a configuration. Try to avoid talking about things that >> we >> > >> are not >> > >> > proposing, since that will just lead to confusion. >> > >> > >> > >> > Similarly, I'd just completely remove the "Implementation >> [discarded]" >> > >> section. >> > >> > It was good to have this as part of the discussion initially, but >> as we >> > >> move >> > >> > toward a vote, it's better to just streamline the KIP document as >> much >> > >> as >> > >> > possible. Keeping a "discarded" section in the document will just >> make >> > >> it >> > >> > harder for new people to understand the proposal. We did the same >> thing >> > >> > with KIP-441, where there were two prior drafts included at the >> end of >> > >> the >> > >> > document, and we just deleted them for clarity. >> > >> > >> > >> > I liked the "Compatibility" and "Rejected Alternatives" section. >> Very >> > >> clear >> > >> > and to the point. >> > >> > >> > >> > Thanks again for the contribution! I think once the KIP document is >> > >> cleaned >> > >> > up, we'll be in good shape to finalize the discussion. >> > >> > -John >> > >> > >> > >> > >> > >> > On Wed, Feb 26, 2020, at 07:27, Bruno Cadonna wrote: >> > >> > > Hi Richard, >> > >> > > >> > >> > > 1. Could you change "idempotent update operations will only be >> dropped >> > >> > > from KTables, not from other classes." -> idempotent update >> operations >> > >> > > will only be dropped from materialized KTables? For >> non-materialized >> > >> > > KTables -- as they can occur after optimization of the topology >> -- we >> > >> > > cannot drop idempotent updates. >> > >> > > >> > >> > > 2. I cannot completely follow the metrics section. Do you want to >> > >> > > record all idempotent updates or only the dropped ones? In >> particular, >> > >> > > I do not understand the following sentences: >> > >> > > "For that matter, even if we don't drop idempotent updates, we >> should >> > >> > > at the very least record the number of idempotent updates that >> has >> > >> > > been seen go through a particular processor." >> > >> > > "Therefore, we should add some metrics which will count the >> number of >> > >> > > idempotent updates that each node has seen." >> > >> > > I do not see how we can record idempotent updates that we do not >> drop. >> > >> > > If we see them, we should drop them. If we do not see them, we >> cannot >> > >> > > drop them and we cannot record them. >> > >> > > >> > >> > > Best, >> > >> > > Bruno >> > >> > > >> > >> > > On Wed, Feb 26, 2020 at 4:57 AM Richard Yu < >> > >> yohan.richard...@gmail.com> wrote: >> > >> > > > >> > >> > > > Hi John, >> > >> > > > >> > >> > > > Sounds goods. It looks like we are close to wrapping things >> up. If >> > >> there >> > >> > > > isn't any other revisions which needs to be made. (If so, >> please >> > >> comment in >> > >> > > > the thread) >> > >> > > > I will start the voting process this Thursday (Pacific Standard >> > >> Time). >> > >> > > > >> > >> > > > Cheers, >> > >> > > > Richard >> > >> > > > >> > >> > > > On Tue, Feb 25, 2020 at 11:59 AM John Roesler < >> vvcep...@apache.org> >> > >> wrote: >> > >> > > > >> > >> > > > > Hi Richard, >> > >> > > > > >> > >> > > > > Sorry for the slow reply. I actually think we should avoid >> > >> checking >> > >> > > > > equals() for now. Your reasoning is good, but the truth is >> that >> > >> > > > > depending on the implementation of equals() is non-trivial, >> > >> > > > > semantically, and (though I proposed it before), I'm not >> convinced >> > >> > > > > it's worth the risk. Much better to start with exactly one >> kind of >> > >> > > > > "idempotence detection". >> > >> > > > > >> > >> > > > > Even if someone does update their serdes, we know that the >> new >> > >> > > > > serde would still be able to _de_serialize the old format, >> or the >> > >> whole >> > >> > > > > app would break. The situation is that the new result gets >> encoded >> > >> > > > > in the new binary format, which means we don't detect an >> > >> idempotent >> > >> > > > > update for what it is. In this case, we'd write the new >> binary >> > >> format to >> > >> > > > > disk and the changelog, and forward it downstream. However, >> we >> > >> only >> > >> > > > > do this once. Now that the binary format for that record has >> been >> > >> updated, >> > >> > > > > we would correctly detect idempotence of any subsequent >> updates. >> > >> > > > > >> > >> > > > > Plus, we would still be able to filter out idempotent >> updates in >> > >> > > > > repartition >> > >> > > > > sinks, since for those, we use the new serde to serialize >> both >> > >> the "old" >> > >> > > > > and >> > >> > > > > "new" result. >> > >> > > > > >> > >> > > > > It's certainly a good observation, but I think we can just >> make a >> > >> note of >> > >> > > > > it >> > >> > > > > in "rejected alternatives" for now, and plan to refine it >> later, >> > >> if it does >> > >> > > > > pose a big performance problem. >> > >> > > > > >> > >> > > > > Thanks! >> > >> > > > > -John >> > >> > > > > >> > >> > > > > On Sat, Feb 22, 2020, at 18:14, Richard Yu wrote: >> > >> > > > > > Hi all, >> > >> > > > > > >> > >> > > > > > Updated the KIP. >> > >> > > > > > >> > >> > > > > > Just a question: do you think it would be a good idea if we >> > >> check for >> > >> > > > > both >> > >> > > > > > Object#equals() and binary equality? >> > >> > > > > > Because there might be some subtle changes in the >> serialization >> > >> (for >> > >> > > > > > example, if the user decides to upgrade their serialization >> > >> procedure to >> > >> > > > > a >> > >> > > > > > new one), but the underlying values of the result might be >> the >> > >> same. >> > >> > > > > > (therefore equals() might return true) >> > >> > > > > > >> > >> > > > > > Do you think this would be plausible? >> > >> > > > > > >> > >> > > > > > Cheers, >> > >> > > > > > Richard >> > >> > > > > > >> > >> > > > > > On Fri, Feb 21, 2020 at 2:37 PM Richard Yu < >> > >> yohan.richard...@gmail.com> >> > >> > > > > > wrote: >> > >> > > > > > >> > >> > > > > > > Hello, >> > >> > > > > > > >> > >> > > > > > > Just to make some updates. I changed the name of the >> metric >> > >> so that it >> > >> > > > > was >> > >> > > > > > > more in line with usual Kafka naming conventions for >> metrics >> > >> / sensors. >> > >> > > > > > > Below is the updated description of the metric: >> > >> > > > > > > >> > >> > > > > > > dropped-idempotent-updates : (Level 2 - Per Task) DEBUG >> (rate >> > >> | total) >> > >> > > > > > > >> > >> > > > > > > Description: This metric will record the number of >> updates >> > >> that have >> > >> > > > > been >> > >> > > > > > > dropped since they are essentially re-performing an >> earlier >> > >> operation. >> > >> > > > > > > >> > >> > > > > > > Note: >> > >> > > > > > > >> > >> > > > > > > - The rate option indicates the ratio of records >> dropped >> > >> to actual >> > >> > > > > > > volume of records passing through the task. >> > >> > > > > > > - The total option will just give a raw count of the >> > >> number of >> > >> > > > > records >> > >> > > > > > > dropped. >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > I hope that this is more on point. >> > >> > > > > > > >> > >> > > > > > > Best, >> > >> > > > > > > Richard >> > >> > > > > > > >> > >> > > > > > > On Fri, Feb 21, 2020 at 2:20 PM Richard Yu < >> > >> yohan.richard...@gmail.com >> > >> > > > > > >> > >> > > > > > > wrote: >> > >> > > > > > > >> > >> > > > > > >> Hi all, >> > >> > > > > > >> >> > >> > > > > > >> Thanks for the clarification. I was just confused a >> little >> > >> on what was >> > >> > > > > > >> going on. >> > >> > > > > > >> >> > >> > > > > > >> So I guess then that for the actual proposal. We got the >> > >> following: >> > >> > > > > > >> >> > >> > > > > > >> 1. We check for binary equality, and perform no extra >> look >> > >> ups. >> > >> > > > > > >> 2. Emphasize that this applies only to materialized >> tables. >> > >> > > > > > >> 3. We drop aggregation updates if key, value and >> timestamp >> > >> is the >> > >> > > > > same. >> > >> > > > > > >> >> > >> > > > > > >> Then that settles the behavior changes. So it looks >> like the >> > >> Metric >> > >> > > > > that >> > >> > > > > > >> is the only thing that is left. In this case, I think >> the >> > >> metric >> > >> > > > > would be >> > >> > > > > > >> named the following: IdempotentUpdateMetric. This is >> mostly >> > >> off the >> > >> > > > > top of >> > >> > > > > > >> my head. So if you think that we change it, feel free >> to say >> > >> so. >> > >> > > > > > >> The metric will report the number of dropped operations >> > >> inherently. >> > >> > > > > > >> >> > >> > > > > > >> It will probably be added as a Sensor, similar to the >> > >> dropped records >> > >> > > > > > >> sensor we already have. >> > >> > > > > > >> >> > >> > > > > > >> If there isn't anything else, I will probably start the >> > >> voting process >> > >> > > > > > >> next week! >> > >> > > > > > >> >> > >> > > > > > >> Cheers, >> > >> > > > > > >> Richard >> > >> > > > > > >> >> > >> > > > > > >> >> > >> > > > > > >> On Fri, Feb 21, 2020 at 11:23 AM John Roesler < >> > >> vvcep...@apache.org> >> > >> > > > > > >> wrote: >> > >> > > > > > >> >> > >> > > > > > >>> Hi Bruno, >> > >> > > > > > >>> >> > >> > > > > > >>> Thanks for the clarification. Indeed, I was thinking >> two >> > >> things: >> > >> > > > > > >>> 1. For the initial implementation, we can just avoid >> adding >> > >> any extra >> > >> > > > > > >>> lookups, but only do the comparison when we already >> happen >> > >> to have >> > >> > > > > > >>> the prior value. >> > >> > > > > > >>> 2. I think, as a result of the timestamp semantics, we >> > >> actually _do_ >> > >> > > > > look >> > >> > > > > > >>> up the prior value approximately all the time, so the >> > >> idempotence >> > >> > > > > check >> > >> > > > > > >>> should be quite effective. >> > >> > > > > > >>> >> > >> > > > > > >>> I think that second point is the same thing you're >> > >> referring to >> > >> > > > > > >>> potentially >> > >> > > > > > >>> being unnecessary. It does mean that we do fetch the >> whole >> > >> value in a >> > >> > > > > > >>> lot of cases where we really only need the timestamp, >> so it >> > >> could >> > >> > > > > > >>> certainly >> > >> > > > > > >>> be optimized in the future. In that future, we would >> need >> > >> to weigh >> > >> > > > > that >> > >> > > > > > >>> optimization against losing the idempotence check. But, >> > >> that's a >> > >> > > > > problem >> > >> > > > > > >>> for tomorrow :) >> > >> > > > > > >>> >> > >> > > > > > >>> I'm 100% on board with scrutinizing the performance as >> we >> > >> implement >> > >> > > > > > >>> this feature. >> > >> > > > > > >>> >> > >> > > > > > >>> Thanks again, >> > >> > > > > > >>> -John >> > >> > > > > > >>> >> > >> > > > > > >>> On Thu, Feb 20, 2020, at 03:25, Bruno Cadonna wrote: >> > >> > > > > > >>> > Hi John, >> > >> > > > > > >>> > >> > >> > > > > > >>> > I am glad to help you with your imagination. With >> > >> overhead, I >> > >> > > > > mainly >> > >> > > > > > >>> > meant the additional lookup into the state store to >> get >> > >> the current >> > >> > > > > > >>> > value, but I see now in the code that we do that >> lookup >> > >> anyways >> > >> > > > > > >>> > (although I think we could avoid that in the cases >> where >> > >> we do not >> > >> > > > > > >>> > need the old value). With or without config, we need >> to >> > >> evaluate >> > >> > > > > the >> > >> > > > > > >>> > performance benefits of this change, in any case. >> > >> > > > > > >>> > >> > >> > > > > > >>> > Best, >> > >> > > > > > >>> > Bruno >> > >> > > > > > >>> > >> > >> > > > > > >>> > On Wed, Feb 19, 2020 at 7:48 PM John Roesler < >> > >> vvcep...@apache.org> >> > >> > > > > > >>> wrote: >> > >> > > > > > >>> > > >> > >> > > > > > >>> > > Thanks for your remarks, Bruno! >> > >> > > > > > >>> > > >> > >> > > > > > >>> > > I'm in favor of standardizing on terminology like >> "not >> > >> forwarding >> > >> > > > > > >>> > > idempotent updates" or "dropping idempotent >> updates". >> > >> Maybe we >> > >> > > > > > >>> > > should make a pass on the KIP and just convert >> > >> everything to this >> > >> > > > > > >>> > > phrasing. In retrospect, even the term >> "emit-on-change" >> > >> has too >> > >> > > > > much >> > >> > > > > > >>> > > semantic baggage, since it implies the semantics >> from >> > >> the SECRET >> > >> > > > > > >>> > > paper, which we don't really want to imply here. >> > >> > > > > > >>> > > >> > >> > > > > > >>> > > I'm also in favor of the metric as you propose. >> > >> > > > > > >>> > > >> > >> > > > > > >>> > > Likewise with stream aggregations, I was also >> under the >> > >> > > > > impression >> > >> > > > > > >>> > > that we agreed on dropping idempotent updates to >> the >> > >> aggregation >> > >> > > > > > >>> > > result, any time we find that our "new" (key, >> value, >> > >> timestamp) >> > >> > > > > > >>> result >> > >> > > > > > >>> > > is identical to the prior one. >> > >> > > > > > >>> > > >> > >> > > > > > >>> > > Also, I'm +1 on all your recommendations for >> updating >> > >> the KIP >> > >> > > > > > >>> document >> > >> > > > > > >>> > > for clarity. >> > >> > > > > > >>> > > >> > >> > > > > > >>> > > Regarding the opt-out config. Perhaps I'm suffering >> > >> from a >> > >> > > > > failure of >> > >> > > > > > >>> > > imagination, but I don't see how the current >> proposal >> > >> could >> > >> > > > > really >> > >> > > > > > >>> have >> > >> > > > > > >>> > > a measurable impact on latency. If all we do is >> make a >> > >> single >> > >> > > > > extra >> > >> > > > > > >>> pass >> > >> > > > > > >>> > > to compare two byte arrays for equality, only in >> the >> > >> cases where >> > >> > > > > we >> > >> > > > > > >>> already >> > >> > > > > > >>> > > have the byte arrays available, it seems unlikely >> to >> > >> measurably >> > >> > > > > > >>> affect the >> > >> > > > > > >>> > > processing of non-idempotent updates. It seems >> > >> guaranteed to >> > >> > > > > > >>> _decrease_ >> > >> > > > > > >>> > > the latency of processing idempotent updates, >> since we >> > >> get to >> > >> > > > > skip a >> > >> > > > > > >>> > > store#put, at least one producer#send, and also all >> > >> downstream >> > >> > > > > > >>> processing, >> > >> > > > > > >>> > > including all the disk and network operations >> > >> associated with >> > >> > > > > > >>> downstream >> > >> > > > > > >>> > > operations. >> > >> > > > > > >>> > > >> > >> > > > > > >>> > > It seems like if we're pretty sure this change >> would >> > >> only >> > >> > > > > _help_, we >> > >> > > > > > >>> shouldn't >> > >> > > > > > >>> > > introduce the operational burden of an extra >> > >> configuration. If we >> > >> > > > > > >>> want to >> > >> > > > > > >>> > > be more aggressive about dropping idempotent >> operations >> > >> in the >> > >> > > > > > >>> future, >> > >> > > > > > >>> > > such as depending on equals() or adding a >> ChangeDetector >> > >> > > > > interface, >> > >> > > > > > >>> then >> > >> > > > > > >>> > > we should consider adding a configuration as part >> of >> > >> that future >> > >> > > > > > >>> work. In >> > >> > > > > > >>> > > fact, if we add a simple "opt-in/opt-out" switch >> right >> > >> now, we >> > >> > > > > might >> > >> > > > > > >>> find >> > >> > > > > > >>> > > that it's actually insufficient for whatever future >> > >> feature we >> > >> > > > > might >> > >> > > > > > >>> propose, >> > >> > > > > > >>> > > then we have a mess of deprecating the opt-out >> config >> > >> and >> > >> > > > > replacing >> > >> > > > > > >>> it. >> > >> > > > > > >>> > > >> > >> > > > > > >>> > > What do you think? >> > >> > > > > > >>> > > -John >> > >> > > > > > >>> > > >> > >> > > > > > >>> > > On Wed, Feb 19, 2020, at 09:50, Bruno Cadonna >> wrote: >> > >> > > > > > >>> > > > Hi all, >> > >> > > > > > >>> > > > >> > >> > > > > > >>> > > > Sorry for the late reply! >> > >> > > > > > >>> > > > >> > >> > > > > > >>> > > > I am also in favour of baby steps. >> > >> > > > > > >>> > > > >> > >> > > > > > >>> > > > I am undecided whether the KIP should contain a >> > >> opt-out config >> > >> > > > > or >> > >> > > > > > >>> not. >> > >> > > > > > >>> > > > The overhead of emit-on-change might affect >> latency. >> > >> For >> > >> > > > > > >>> applications >> > >> > > > > > >>> > > > where low latency is crucial and there are not >> too >> > >> many >> > >> > > > > idempotent >> > >> > > > > > >>> > > > updates, it would be better to fall back to >> > >> emit-on-update. >> > >> > > > > > >>> However, >> > >> > > > > > >>> > > > we do not know how much emit-on-change impacts >> > >> latency. We >> > >> > > > > would >> > >> > > > > > >>> first >> > >> > > > > > >>> > > > need to benchmark that before we can decide >> about the >> > >> > > > > > >>> opt-out-config. >> > >> > > > > > >>> > > > >> > >> > > > > > >>> > > > A metric of dropped idempotent updates seems >> useful >> > >> to me to be >> > >> > > > > > >>> > > > informed about potential upstream applications or >> > >> upstream >> > >> > > > > > >>> operators >> > >> > > > > > >>> > > > that produce too many idempotent updates. The KIP >> > >> should state >> > >> > > > > the >> > >> > > > > > >>> > > > name of the metric, its group, its tags, and its >> > >> recording >> > >> > > > > level >> > >> > > > > > >>> (see >> > >> > > > > > >>> > > > KIP-444 or KIP-471 for examples). I propose >> DEBUG as >> > >> reporting >> > >> > > > > > >>> level. >> > >> > > > > > >>> > > > >> > >> > > > > > >>> > > > Richard, what competing proposals for >> emit-on-change >> > >> for >> > >> > > > > > >>> aggregations >> > >> > > > > > >>> > > > do you mean? I have the feeling that we agreed >> to get >> > >> rid of >> > >> > > > > > >>> > > > idempotent updates if the aggregate is updated >> with >> > >> the same >> > >> > > > > key, >> > >> > > > > > >>> > > > value, AND timestamp. I am also fine if we do not >> > >> include this >> > >> > > > > into >> > >> > > > > > >>> > > > this KIP (remember: baby steps). >> > >> > > > > > >>> > > > >> > >> > > > > > >>> > > > You write that "emit-on-change is more correct". >> > >> Since we >> > >> > > > > agreed >> > >> > > > > > >>> that >> > >> > > > > > >>> > > > this is an optimization, IMO you cannot argue >> this >> > >> way. >> > >> > > > > > >>> > > > >> > >> > > > > > >>> > > > Please put "Alternative Approaches" under >> "Rejected >> > >> > > > > Alternatives", >> > >> > > > > > >>> so >> > >> > > > > > >>> > > > that it becomes clear that we are not going to >> > >> implement them. >> > >> > > > > In >> > >> > > > > > >>> > > > general, I think the KIP needs a bit of clean-up >> > >> (probably, you >> > >> > > > > > >>> > > > already planned for it). "Design Reasoning" is a >> bit >> > >> of >> > >> > > > > behavior >> > >> > > > > > >>> > > > changes, rejected alternatives and duplicates a >> bit >> > >> the >> > >> > > > > content in >> > >> > > > > > >>> > > > those sections. >> > >> > > > > > >>> > > > >> > >> > > > > > >>> > > > I do not like the name "no-op operations" or >> > >> "no-ops", because >> > >> > > > > they >> > >> > > > > > >>> > > > are rather generic. I like more "idempotent >> updates". >> > >> > > > > > >>> > > > >> > >> > > > > > >>> > > > Best, >> > >> > > > > > >>> > > > Bruno >> > >> > > > > > >>> > > > >> > >> > > > > > >>> > > > >> > >> > > > > > >>> > > > On Tue, Feb 18, 2020 at 7:25 PM Richard Yu < >> > >> > > > > > >>> yohan.richard...@gmail.com> wrote: >> > >> > > > > > >>> > > > > >> > >> > > > > > >>> > > > > Hi all, >> > >> > > > > > >>> > > > > >> > >> > > > > > >>> > > > > We are definitely making progress! >> > >> > > > > > >>> > > > > >> > >> > > > > > >>> > > > > @John should I emphasize in the proposed >> behavior >> > >> changes >> > >> > > > > that >> > >> > > > > > >>> we are only >> > >> > > > > > >>> > > > > doing binary equality checks for stateful >> operators? >> > >> > > > > > >>> > > > > It looks like we have come close to finalizing >> this >> > >> part of >> > >> > > > > the >> > >> > > > > > >>> KIP. (I >> > >> > > > > > >>> > > > > will note in the KIP that this proposal is >> intended >> > >> for >> > >> > > > > > >>> optimization, not >> > >> > > > > > >>> > > > > semantics correctness) >> > >> > > > > > >>> > > > > >> > >> > > > > > >>> > > > > I do think maybe we still have one other >> detail we >> > >> need to >> > >> > > > > > >>> discuss. So far, >> > >> > > > > > >>> > > > > there has been quite a bit of back and forth >> about >> > >> what the >> > >> > > > > > >>> behavior of >> > >> > > > > > >>> > > > > aggregations should look like in emit on >> change. I >> > >> have seen >> > >> > > > > > >>> > > > > multiple competing proposals, so I am not >> > >> completely certain >> > >> > > > > > >>> which one we >> > >> > > > > > >>> > > > > should go with, or how we will be able to >> > >> compromise in >> > >> > > > > between >> > >> > > > > > >>> them. >> > >> > > > > > >>> > > > > >> > >> > > > > > >>> > > > > Let me know what your thoughts are on this >> matter, >> > >> since we >> > >> > > > > are >> > >> > > > > > >>> probably >> > >> > > > > > >>> > > > > close to wrapping up most other stuff. >> > >> > > > > > >>> > > > > @Matthias J. Sax <matth...@confluent.io> and >> > >> @Bruno, see >> > >> > > > > what >> > >> > > > > > >>> you think >> > >> > > > > > >>> > > > > about this. >> > >> > > > > > >>> > > > > >> > >> > > > > > >>> > > > > Best, >> > >> > > > > > >>> > > > > Richard >> > >> > > > > > >>> > > > > >> > >> > > > > > >>> > > > > >> > >> > > > > > >>> > > > > >> > >> > > > > > >>> > > > > On Tue, Feb 18, 2020 at 9:06 AM John Roesler < >> > >> > > > > > >>> vvcep...@apache.org> wrote: >> > >> > > > > > >>> > > > > >> > >> > > > > > >>> > > > > > Thanks, Matthias! >> > >> > > > > > >>> > > > > > >> > >> > > > > > >>> > > > > > Regarding numbers, it would be hard to know >> how >> > >> many >> > >> > > > > > >>> applications >> > >> > > > > > >>> > > > > > would benefit, since we don't know how many >> > >> applications >> > >> > > > > there >> > >> > > > > > >>> are, >> > >> > > > > > >>> > > > > > or anything about their data sets or >> topologies. >> > >> We could >> > >> > > > > do a >> > >> > > > > > >>> survey, >> > >> > > > > > >>> > > > > > but it seems overkill if we take the >> conservative >> > >> approach. >> > >> > > > > > >>> > > > > > >> > >> > > > > > >>> > > > > > I have my own practical stream processing >> > >> experience that >> > >> > > > > > >>> tells me this >> > >> > > > > > >>> > > > > > is absolutely critical for any >> moderate-to-large >> > >> relational >> > >> > > > > > >>> stream >> > >> > > > > > >>> > > > > > processing use cases. I'll leave it to you to >> > >> decide if you >> > >> > > > > > >>> find that >> > >> > > > > > >>> > > > > > convincing, but it's definitely not an >> > >> _assumption_. I've >> > >> > > > > also >> > >> > > > > > >>> heard from >> > >> > > > > > >>> > > > > > a few Streams users who have already had to >> > >> implement >> > >> > > > > their own >> > >> > > > > > >>> > > > > > noop-suppression transformers in order to >> get to >> > >> production >> > >> > > > > > >>> scale. >> > >> > > > > > >>> > > > > > >> > >> > > > > > >>> > > > > > Regardless, it sounds like we can agree on >> taking >> > >> an >> > >> > > > > > >>> opportunistic approach >> > >> > > > > > >>> > > > > > and targeting the optimization just to use a >> > >> > > > > binary-equality >> > >> > > > > > >>> check at >> > >> > > > > > >>> > > > > > stateful operators. (I'd also suggest in sink >> > >> nodes, when >> > >> > > > > we >> > >> > > > > > >>> are about to >> > >> > > > > > >>> > > > > > send old and new values, since they are also >> > >> already >> > >> > > > > present >> > >> > > > > > >>> and serialized >> > >> > > > > > >>> > > > > > at that point.) We could make the KIP even >> more >> > >> vague, and >> > >> > > > > > >>> just say that >> > >> > > > > > >>> > > > > > we'll drop no-op updates "when possible". >> > >> > > > > > >>> > > > > > >> > >> > > > > > >>> > > > > > I'm curious what Bruno and the others think >> about >> > >> this. If >> > >> > > > > it >> > >> > > > > > >>> seems like >> > >> > > > > > >>> > > > > > a good starting point, perhaps we could move >> to a >> > >> vote soon >> > >> > > > > > >>> and get to >> > >> > > > > > >>> > > > > > work on the implementation! >> > >> > > > > > >>> > > > > > >> > >> > > > > > >>> > > > > > Thanks, >> > >> > > > > > >>> > > > > > -John >> > >> > > > > > >>> > > > > > >> > >> > > > > > >>> > > > > > On Mon, Feb 17, 2020, at 20:54, Matthias J. >> Sax >> > >> wrote: >> > >> > > > > > >>> > > > > > > Talking about optimizations and reducing >> > >> downstream load: >> > >> > > > > > >>> > > > > > > >> > >> > > > > > >>> > > > > > > Do we actually have any numbers? I have the >> > >> impression >> > >> > > > > that >> > >> > > > > > >>> this KIP is >> > >> > > > > > >>> > > > > > > more or less build on the _assumption_ that >> > >> there is a >> > >> > > > > > >>> problem. Yes, >> > >> > > > > > >>> > > > > > > there are some use cases that would benefit >> > >> from this; >> > >> > > > > But >> > >> > > > > > >>> how many >> > >> > > > > > >>> > > > > > > applications would actually benefit? And >> how >> > >> much load >> > >> > > > > > >>> reduction would >> > >> > > > > > >>> > > > > > > they get? >> > >> > > > > > >>> > > > > > > >> > >> > > > > > >>> > > > > > > The simplest approach (following John idea >> to >> > >> make baby >> > >> > > > > > >>> steps) would be >> > >> > > > > > >>> > > > > > > to apply the emit-on-change pattern only if >> > >> there is a >> > >> > > > > > >>> store. For this >> > >> > > > > > >>> > > > > > > case we need to serialize old and new >> result >> > >> anyway and >> > >> > > > > thus >> > >> > > > > > >>> a simple >> > >> > > > > > >>> > > > > > > byte-array comparison is no overhead. >> > >> > > > > > >>> > > > > > > >> > >> > > > > > >>> > > > > > > Sending `oldValues` by default would become >> > >> expensive >> > >> > > > > > >>> because we would >> > >> > > > > > >>> > > > > > > need to serialize the recomputed old >> result, as >> > >> well as >> > >> > > > > the >> > >> > > > > > >>> new result, >> > >> > > > > > >>> > > > > > > to make the comparison (and we now the >> > >> serialization is >> > >> > > > > not >> > >> > > > > > >>> cheap). We >> > >> > > > > > >>> > > > > > > are facing a trade-off between CPU >> overhead and >> > >> > > > > downstream >> > >> > > > > > >>> load and I am >> > >> > > > > > >>> > > > > > > not sure if we should hard code this. My >> > >> original >> > >> > > > > argument >> > >> > > > > > >>> for sending >> > >> > > > > > >>> > > > > > > `oldValues` was about semantics; but for an >> > >> > > > > optimization, I >> > >> > > > > > >>> am not sure >> > >> > > > > > >>> > > > > > > if this would be the right choice. >> > >> > > > > > >>> > > > > > > >> > >> > > > > > >>> > > > > > > For now, users who want to opt-in can >> force a >> > >> > > > > > >>> materialization. A >> > >> > > > > > >>> > > > > > > materialization may be expensive and if we >> see >> > >> future >> > >> > > > > > >>> demand, we could >> > >> > > > > > >>> > > > > > > still add an option to send `oldValues` >> instead >> > >> of >> > >> > > > > > >>> materialization (this >> > >> > > > > > >>> > > > > > > would at least save the store overhead). >> As we >> > >> consider >> > >> > > > > the >> > >> > > > > > >>> KIP an >> > >> > > > > > >>> > > > > > > optimization, a "config" seems to make >> sense. >> > >> > > > > > >>> > > > > > > >> > >> > > > > > >>> > > > > > > >> > >> > > > > > >>> > > > > > > -Matthias >> > >> > > > > > >>> > > > > > > >> > >> > > > > > >>> > > > > > > >> > >> > > > > > >>> > > > > > > On 2/17/20 5:21 PM, Richard Yu wrote: >> > >> > > > > > >>> > > > > > > > Hi John! >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > Thanks for the reply. >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > About the changes we have discussed so >> far. I >> > >> think >> > >> > > > > upon >> > >> > > > > > >>> further >> > >> > > > > > >>> > > > > > > > consideration, we have been mostly >> talking >> > >> about this >> > >> > > > > from >> > >> > > > > > >>> the >> > >> > > > > > >>> > > > > > perspective >> > >> > > > > > >>> > > > > > > > that no stop-gap effort is acceptable. >> > >> However, in >> > >> > > > > recent >> > >> > > > > > >>> discussion, >> > >> > > > > > >>> > > > > > > if we >> > >> > > > > > >>> > > > > > > > consider optimization, then it appears >> that >> > >> the >> > >> > > > > > >>> perspective I >> > >> > > > > > >>> > > > > > mentioned no >> > >> > > > > > >>> > > > > > > > longer applies. After all, we are no >> longer >> > >> concerned >> > >> > > > > so >> > >> > > > > > >>> much about >> > >> > > > > > >>> > > > > > > > semantics correctness, then reducing >> traffic >> > >> as much as >> > >> > > > > > >>> possible >> > >> > > > > > >>> > > > > > without >> > >> > > > > > >>> > > > > > > > performance tradeoffs. >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > In this case, I think a cache would be a >> good >> > >> idea for >> > >> > > > > > >>> stateless >> > >> > > > > > >>> > > > > > > > operations. This cache will not be >> backed by >> > >> a store >> > >> > > > > > >>> obviously. We can >> > >> > > > > > >>> > > > > > > > probably use Kafka's ThreadCache. We >> should >> > >> be able to >> > >> > > > > > >>> catch a large >> > >> > > > > > >>> > > > > > > > portion of the no-ops if we at least >> store >> > >> some >> > >> > > > > results in >> > >> > > > > > >>> the cache. >> > >> > > > > > >>> > > > > > Not >> > >> > > > > > >>> > > > > > > > all will be caught, but I think the >> impact >> > >> will be >> > >> > > > > > >>> significant. >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > On another note, I think that we should >> > >> implement >> > >> > > > > > >>> competing proposals >> > >> > > > > > >>> > > > > > i.e. >> > >> > > > > > >>> > > > > > > > one where we forward both old and new >> values >> > >> with a >> > >> > > > > > >>> reasonable >> > >> > > > > > >>> > > > > > proportion >> > >> > > > > > >>> > > > > > > > of artificial no-ops (we do not >> necessarily >> > >> have to >> > >> > > > > rely >> > >> > > > > > >>> on equals so >> > >> > > > > > >>> > > > > > much >> > >> > > > > > >>> > > > > > > > as comparing the serialized binary data >> after >> > >> the >> > >> > > > > > >>> operation), and in >> > >> > > > > > >>> > > > > > > > another scenario, the cache for stateless >> > >> ops. It >> > >> > > > > would be >> > >> > > > > > >>> > > > > > unreasonable if >> > >> > > > > > >>> > > > > > > > we completely disregard either approach, >> > >> since they >> > >> > > > > both >> > >> > > > > > >>> have merit. >> > >> > > > > > >>> > > > > > The >> > >> > > > > > >>> > > > > > > > reason for implementing both is to >> perform >> > >> benchmark >> > >> > > > > tests >> > >> > > > > > >>> on them, and >> > >> > > > > > >>> > > > > > > > compare them with the original. This >> way, we >> > >> can more >> > >> > > > > > >>> clearly see what >> > >> > > > > > >>> > > > > > is >> > >> > > > > > >>> > > > > > > > the drawbacks and the gains. So far, we >> have >> > >> been >> > >> > > > > > >>> discussing only >> > >> > > > > > >>> > > > > > > > hypotheticals, and if we continue to do >> so, I >> > >> think it >> > >> > > > > is >> > >> > > > > > >>> likely no >> > >> > > > > > >>> > > > > > ground >> > >> > > > > > >>> > > > > > > > will be gained. >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > After all, what we seek is optimization, >> and >> > >> > > > > performance >> > >> > > > > > >>> benchmarks >> > >> > > > > > >>> > > > > > > will be >> > >> > > > > > >>> > > > > > > > mandatory for a KIP of this nature. >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > Hope this helps, >> > >> > > > > > >>> > > > > > > > Richard >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > On Mon, Feb 17, 2020 at 2:12 PM John >> Roesler < >> > >> > > > > > >>> vvcep...@apache.org> >> > >> > > > > > >>> > > > > > wrote: >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > >> Hi again, all, >> > >> > > > > > >>> > > > > > > >> >> > >> > > > > > >>> > > > > > > >> Sorry on my part for my silence. >> > >> > > > > > >>> > > > > > > >> >> > >> > > > > > >>> > > > > > > >> I've just taken another look over the >> recent >> > >> history >> > >> > > > > of >> > >> > > > > > >>> this >> > >> > > > > > >>> > > > > > > discussion. It >> > >> > > > > > >>> > > > > > > >> seems like the #1 point to clarify >> (because >> > >> it affect >> > >> > > > > > >>> everything >> > >> > > > > > >>> > > > > > else) is >> > >> > > > > > >>> > > > > > > >> that, >> > >> > > > > > >>> > > > > > > >> yes, I was 100% envisioning this as an >> > >> _optimization_. >> > >> > > > > > >>> > > > > > > >> >> > >> > > > > > >>> > > > > > > >> As a consequence, I don't think it's >> > >> critical to make >> > >> > > > > any >> > >> > > > > > >>> hard >> > >> > > > > > >>> > > > > > guarantees >> > >> > > > > > >>> > > > > > > >> about >> > >> > > > > > >>> > > > > > > >> what results get forwarded and what >> (no-op >> > >> updates) >> > >> > > > > get >> > >> > > > > > >>> dropped. I'd >> > >> > > > > > >>> > > > > > > >> initially >> > >> > > > > > >>> > > > > > > >> just been thinking about doing this >> > >> opportunistically >> > >> > > > > in >> > >> > > > > > >>> cases where >> > >> > > > > > >>> > > > > > we >> > >> > > > > > >>> > > > > > > >> already >> > >> > > > > > >>> > > > > > > >> had the "old" and "new" result in >> memory, >> > >> thanks to a >> > >> > > > > > >>> request to >> > >> > > > > > >>> > > > > > > "emit old >> > >> > > > > > >>> > > > > > > >> values", or to the implementation of >> > >> timestamp >> > >> > > > > semantics. >> > >> > > > > > >>> > > > > > > >> >> > >> > > > > > >>> > > > > > > >> However, whether or not it's >> semantically >> > >> critical, I >> > >> > > > > do >> > >> > > > > > >>> think that >> > >> > > > > > >>> > > > > > > >> Matthias's >> > >> > > > > > >>> > > > > > > >> idea to use the change-forwarding >> mechanism >> > >> to check >> > >> > > > > for >> > >> > > > > > >>> no-ops even >> > >> > > > > > >>> > > > > > on >> > >> > > > > > >>> > > > > > > >> stateless operations is pretty >> interesting. >> > >> > > > > Specifically, >> > >> > > > > > >>> this would >> > >> > > > > > >>> > > > > > > >> _really_ >> > >> > > > > > >>> > > > > > > >> let you pare down useless updates by >> using >> > >> mapValues >> > >> > > > > to >> > >> > > > > > >>> strip down >> > >> > > > > > >>> > > > > > > records >> > >> > > > > > >>> > > > > > > >> only >> > >> > > > > > >>> > > > > > > >> to what you really need. However, the >> > >> dependence on >> > >> > > > > the >> > >> > > > > > >>> > > > > > implementation of >> > >> > > > > > >>> > > > > > > >> equals() is troubling. >> > >> > > > > > >>> > > > > > > >> >> > >> > > > > > >>> > > > > > > >> It might make sense to table this idea, >> as >> > >> well as my >> > >> > > > > > >>> complicated >> > >> > > > > > >>> > > > > > no-op >> > >> > > > > > >>> > > > > > > >> detection algorithm, and initially >> propose >> > >> just a >> > >> > > > > > >>> nonconfigurable >> > >> > > > > > >>> > > > > > feature >> > >> > > > > > >>> > > > > > > >> to >> > >> > > > > > >>> > > > > > > >> check "old" and "new" results for binary >> > >> equality >> > >> > > > > before >> > >> > > > > > >>> forwarding. >> > >> > > > > > >>> > > > > > > I.e., >> > >> > > > > > >>> > > > > > > >> if >> > >> > > > > > >>> > > > > > > >> any operation determines that the old >> and >> > >> new results >> > >> > > > > are >> > >> > > > > > >>> > > > > > > >> binary-identical, we >> > >> > > > > > >>> > > > > > > >> would not forward. >> > >> > > > > > >>> > > > > > > >> >> > >> > > > > > >>> > > > > > > >> I'll admit that this doesn't serve >> Tommy's >> > >> use case >> > >> > > > > very >> > >> > > > > > >>> well, but it >> > >> > > > > > >>> > > > > > > >> might be >> > >> > > > > > >>> > > > > > > >> better to take baby steps with an >> > >> optimization like >> > >> > > > > this >> > >> > > > > > >>> and not risk >> > >> > > > > > >>> > > > > > > >> over-reaching in a way that actually >> harms >> > >> > > > > performance or >> > >> > > > > > >>> > > > > > correctness. We >> > >> > > > > > >>> > > > > > > >> could >> > >> > > > > > >>> > > > > > > >> always expand the feature to use >> equals() or >> > >> some >> > >> > > > > kind of >> > >> > > > > > >>> > > > > > ChangeDetector >> > >> > > > > > >>> > > > > > > >> later >> > >> > > > > > >>> > > > > > > >> on, in a more focused discussion. >> > >> > > > > > >>> > > > > > > >> >> > >> > > > > > >>> > > > > > > >> Regarding metrics or debug logs, I >> guess I >> > >> don't feel >> > >> > > > > > >>> strongly, but it >> > >> > > > > > >>> > > > > > > >> feels >> > >> > > > > > >>> > > > > > > >> like two things will happen that make it >> > >> nicer to add >> > >> > > > > > >>> them: >> > >> > > > > > >>> > > > > > > >> >> > >> > > > > > >>> > > > > > > >> 1. This feature is going to >> surprise/annoy >> > >> _somebody_, >> > >> > > > > > >>> and it would be >> > >> > > > > > >>> > > > > > > >> nice to >> > >> > > > > > >>> > > > > > > >> be able to definitively say the reason >> that >> > >> updates >> > >> > > > > are >> > >> > > > > > >>> dropped is >> > >> > > > > > >>> > > > > > that >> > >> > > > > > >>> > > > > > > >> they >> > >> > > > > > >>> > > > > > > >> were no-ops. The easiest smoking gun is >> if >> > >> there are >> > >> > > > > > >>> debug-logs that >> > >> > > > > > >>> > > > > > > can be >> > >> > > > > > >>> > > > > > > >> enabled. This person might just be >> looking >> > >> at the >> > >> > > > > > >>> dashboards, >> > >> > > > > > >>> > > > > > > wondering why >> > >> > > > > > >>> > > > > > > >> there are 100K updates per second going >> into >> > >> their >> > >> > > > > app, >> > >> > > > > > >>> but only 1K >> > >> > > > > > >>> > > > > > > >> results per >> > >> > > > > > >>> > > > > > > >> second coming out. Having the metric >> there >> > >> makes the >> > >> > > > > > >>> accounting >> > >> > > > > > >>> > > > > > easier. >> > >> > > > > > >>> > > > > > > >> >> > >> > > > > > >>> > > > > > > >> 2. Somebody is going to struggle with >> > >> high-volume >> > >> > > > > > >>> updates, and it >> > >> > > > > > >>> > > > > > > would be >> > >> > > > > > >>> > > > > > > >> nice >> > >> > > > > > >>> > > > > > > >> for them to know that this feature is >> saving >> > >> them >> > >> > > > > > >>> X-thousand updates >> > >> > > > > > >>> > > > > > per >> > >> > > > > > >>> > > > > > > >> second, >> > >> > > > > > >>> > > > > > > >> etc. >> > >> > > > > > >>> > > > > > > >> >> > >> > > > > > >>> > > > > > > >> What does everyone think about this? >> Note, >> > >> as I read >> > >> > > > > it, >> > >> > > > > > >>> what I've >> > >> > > > > > >>> > > > > > said >> > >> > > > > > >>> > > > > > > >> above is >> > >> > > > > > >>> > > > > > > >> already reflected in the text of the >> KIP. >> > >> > > > > > >>> > > > > > > >> >> > >> > > > > > >>> > > > > > > >> Thanks, >> > >> > > > > > >>> > > > > > > >> -John >> > >> > > > > > >>> > > > > > > >> >> > >> > > > > > >>> > > > > > > >> >> > >> > > > > > >>> > > > > > > >> On Tue, Feb 11, 2020, at 18:27, Richard >> Yu >> > >> wrote: >> > >> > > > > > >>> > > > > > > >>> Hi all, >> > >> > > > > > >>> > > > > > > >>> >> > >> > > > > > >>> > > > > > > >>> Bumping this. If you feel that this >> KIP is >> > >> not too >> > >> > > > > > >>> urgent. Then let >> > >> > > > > > >>> > > > > > me >> > >> > > > > > >>> > > > > > > >>> know. :) >> > >> > > > > > >>> > > > > > > >>> >> > >> > > > > > >>> > > > > > > >>> Cheers, >> > >> > > > > > >>> > > > > > > >>> Richard >> > >> > > > > > >>> > > > > > > >>> >> > >> > > > > > >>> > > > > > > >>> On Thu, Feb 6, 2020 at 4:55 PM Richard >> Yu < >> > >> > > > > > >>> > > > > > yohan.richard...@gmail.com> >> > >> > > > > > >>> > > > > > > >>> wrote: >> > >> > > > > > >>> > > > > > > >>> >> > >> > > > > > >>> > > > > > > >>>> Hi all, >> > >> > > > > > >>> > > > > > > >>>> >> > >> > > > > > >>> > > > > > > >>>> I've had just a few thoughts >> regarding the >> > >> > > > > forwarding >> > >> > > > > > >>> of <key, >> > >> > > > > > >>> > > > > > > >>>> change<old_value, new_value>>. As >> Matthias >> > >> already >> > >> > > > > > >>> mentioned, there >> > >> > > > > > >>> > > > > > > >> are two >> > >> > > > > > >>> > > > > > > >>>> separate priorities by which we can >> judge >> > >> this KIP: >> > >> > > > > > >>> > > > > > > >>>> >> > >> > > > > > >>> > > > > > > >>>> 1. A optimization perspective: In this >> > >> case, the >> > >> > > > > user >> > >> > > > > > >>> would prefer >> > >> > > > > > >>> > > > > > the >> > >> > > > > > >>> > > > > > > >>>> impact of this KIP to be as minimal as >> > >> possible. By >> > >> > > > > > >>> such logic, if >> > >> > > > > > >>> > > > > > > >>>> stateless operations are performed >> twice, >> > >> that could >> > >> > > > > > >>> prove >> > >> > > > > > >>> > > > > > > >> unacceptable for >> > >> > > > > > >>> > > > > > > >>>> them. (since operations can prove >> > >> expensive) >> > >> > > > > > >>> > > > > > > >>>> >> > >> > > > > > >>> > > > > > > >>>> 2. Semantics correctness perspective: >> > >> Unlike the >> > >> > > > > > >>> optimization >> > >> > > > > > >>> > > > > > > >> approach, we >> > >> > > > > > >>> > > > > > > >>>> are more concerned with all KTable >> > >> operations >> > >> > > > > obeying >> > >> > > > > > >>> the same >> > >> > > > > > >>> > > > > > emission >> > >> > > > > > >>> > > > > > > >>>> policy. i.e. emit on change. In this >> case, >> > >> a >> > >> > > > > > >>> discrepancy would not >> > >> > > > > > >>> > > > > > be >> > >> > > > > > >>> > > > > > > >>>> tolerated, even though an extra >> > >> performance cost >> > >> > > > > will >> > >> > > > > > >>> be incurred. >> > >> > > > > > >>> > > > > > > >>>> Therefore, we will follow Matthias's >> > >> approach, and >> > >> > > > > then >> > >> > > > > > >>> perform the >> > >> > > > > > >>> > > > > > > >>>> operation once on the old value, and >> once >> > >> on the >> > >> > > > > new. >> > >> > > > > > >>> > > > > > > >>>> >> > >> > > > > > >>> > > > > > > >>>> The issue here I think is more black >> and >> > >> white than >> > >> > > > > in >> > >> > > > > > >>> between. The >> > >> > > > > > >>> > > > > > > >> second >> > >> > > > > > >>> > > > > > > >>>> option in particular would be >> favorable >> > >> for users >> > >> > > > > with >> > >> > > > > > >>> inexpensive >> > >> > > > > > >>> > > > > > > >>>> stateless operations, while for the >> former >> > >> option, >> > >> > > > > we >> > >> > > > > > >>> are probably >> > >> > > > > > >>> > > > > > > >> dealing >> > >> > > > > > >>> > > > > > > >>>> with more expensive ones. So the >> simplest >> > >> solution >> > >> > > > > is >> > >> > > > > > >>> probably to >> > >> > > > > > >>> > > > > > > >> allow the >> > >> > > > > > >>> > > > > > > >>>> user to choose one of the behaviors, >> and >> > >> have a >> > >> > > > > config >> > >> > > > > > >>> which can >> > >> > > > > > >>> > > > > > > >> switch in >> > >> > > > > > >>> > > > > > > >>>> between them. >> > >> > > > > > >>> > > > > > > >>>> >> > >> > > > > > >>> > > > > > > >>>> Its the simplest compromise I can >> come up >> > >> with at >> > >> > > > > the >> > >> > > > > > >>> moment, but if >> > >> > > > > > >>> > > > > > > >> you >> > >> > > > > > >>> > > > > > > >>>> think you have a better plan which >> could >> > >> better >> > >> > > > > balance >> > >> > > > > > >>> tradeoffs. >> > >> > > > > > >>> > > > > > Then >> > >> > > > > > >>> > > > > > > >>>> please let us know. :) >> > >> > > > > > >>> > > > > > > >>>> >> > >> > > > > > >>> > > > > > > >>>> Best, >> > >> > > > > > >>> > > > > > > >>>> Richard >> > >> > > > > > >>> > > > > > > >>>> >> > >> > > > > > >>> > > > > > > >>>> On Wed, Feb 5, 2020 at 5:12 PM John >> > >> Roesler < >> > >> > > > > > >>> vvcep...@apache.org> >> > >> > > > > > >>> > > > > > > >> wrote: >> > >> > > > > > >>> > > > > > > >>>> >> > >> > > > > > >>> > > > > > > >>>>> Hi all, >> > >> > > > > > >>> > > > > > > >>>>> >> > >> > > > > > >>> > > > > > > >>>>> Thanks for the thoughtful comments! >> > >> > > > > > >>> > > > > > > >>>>> >> > >> > > > > > >>> > > > > > > >>>>> I need more time to reflect on your >> > >> thoughts, but >> > >> > > > > just >> > >> > > > > > >>> wanted to >> > >> > > > > > >>> > > > > > offer >> > >> > > > > > >>> > > > > > > >>>>> a quick clarification about equals(). >> > >> > > > > > >>> > > > > > > >>>>> >> > >> > > > > > >>> > > > > > > >>>>> I only meant that we can't be sure >> if a >> > >> class's >> > >> > > > > > >>> equals() >> > >> > > > > > >>> > > > > > > >> implementation >> > >> > > > > > >>> > > > > > > >>>>> returns true for two semantically >> > >> identical >> > >> > > > > instances. >> > >> > > > > > >>> I.e., if a >> > >> > > > > > >>> > > > > > > >> class >> > >> > > > > > >>> > > > > > > >>>>> doesn't >> > >> > > > > > >>> > > > > > > >>>>> override the default equals() >> > >> implementation, then >> > >> > > > > we >> > >> > > > > > >>> would see >> > >> > > > > > >>> > > > > > > >> behavior >> > >> > > > > > >>> > > > > > > >>>>> like: >> > >> > > > > > >>> > > > > > > >>>>> >> > >> > > > > > >>> > > > > > > >>>>> new MyPair("A", 1).equals(new >> MyPair("A", >> > >> 1)) >> > >> > > > > returns >> > >> > > > > > >>> false >> > >> > > > > > >>> > > > > > > >>>>> >> > >> > > > > > >>> > > > > > > >>>>> In that case, I would still like to >> catch >> > >> no-op >> > >> > > > > > >>> updates by >> > >> > > > > > >>> > > > > > comparing >> > >> > > > > > >>> > > > > > > >> the >> > >> > > > > > >>> > > > > > > >>>>> serialized form of the records when >> we >> > >> happen to >> > >> > > > > have >> > >> > > > > > >>> it serialized >> > >> > > > > > >>> > > > > > > >> anyway >> > >> > > > > > >>> > > > > > > >>>>> (such as when the operation is >> stateful, >> > >> or when >> > >> > > > > we're >> > >> > > > > > >>> sending to a >> > >> > > > > > >>> > > > > > > >>>>> repartition topic and we have both >> the >> > >> "new" and >> > >> > > > > "old" >> > >> > > > > > >>> value from >> > >> > > > > > >>> > > > > > > >>>>> upstream). >> > >> > > > > > >>> > > > > > > >>>>> >> > >> > > > > > >>> > > > > > > >>>>> I didn't mean to suggest we'd try to >> use >> > >> > > > > reflection to >> > >> > > > > > >>> detect >> > >> > > > > > >>> > > > > > whether >> > >> > > > > > >>> > > > > > > >>>>> equals >> > >> > > > > > >>> > > > > > > >>>>> is implemented, although that is a >> neat >> > >> trick. I >> > >> > > > > was >> > >> > > > > > >>> thinking more >> > >> > > > > > >>> > > > > > of >> > >> > > > > > >>> > > > > > > >> a >> > >> > > > > > >>> > > > > > > >>>>> belt-and-suspenders algorithm where >> we do >> > >> the check >> > >> > > > > > >>> for no-ops >> > >> > > > > > >>> > > > > > based >> > >> > > > > > >>> > > > > > > >> on >> > >> > > > > > >>> > > > > > > >>>>> equals() and then _also_ check the >> > >> serialized bytes >> > >> > > > > > >>> for equality. >> > >> > > > > > >>> > > > > > > >>>>> >> > >> > > > > > >>> > > > > > > >>>>> Thanks, >> > >> > > > > > >>> > > > > > > >>>>> -John >> > >> > > > > > >>> > > > > > > >>>>> >> > >> > > > > > >>> > > > > > > >>>>> On Wed, Feb 5, 2020, at 15:31, Ted Yu >> > >> wrote: >> > >> > > > > > >>> > > > > > > >>>>>> Thanks for the comments, Matthias. >> > >> > > > > > >>> > > > > > > >>>>>> >> > >> > > > > > >>> > > > > > > >>>>>> w.r.t. requirement of an `equals()` >> > >> > > > > implementation, >> > >> > > > > > >>> each template >> > >> > > > > > >>> > > > > > > >> type >> > >> > > > > > >>> > > > > > > >>>>>> would have an equals() method. We >> can >> > >> use the >> > >> > > > > > >>> following code to >> > >> > > > > > >>> > > > > > know >> > >> > > > > > >>> > > > > > > >>>>>> whether it is provided by JVM or >> > >> provided by user. >> > >> > > > > > >>> > > > > > > >>>>>> >> > >> > > > > > >>> > > > > > > >>>>>> boolean customEquals = false; >> > >> > > > > > >>> > > > > > > >>>>>> try { >> > >> > > > > > >>> > > > > > > >>>>>> Class cls = >> > >> > > > > value.getClass().getMethod("equals", >> > >> > > > > > >>> > > > > > > >>>>>> Object.class).getDeclaringClass(); >> > >> > > > > > >>> > > > > > > >>>>>> if (!Object.class.equals(cls)) { >> > >> > > > > > >>> > > > > > > >>>>>> customEquals = true; >> > >> > > > > > >>> > > > > > > >>>>>> } >> > >> > > > > > >>> > > > > > > >>>>>> } catch (NoSuchMethodException >> nsme) { >> > >> > > > > > >>> > > > > > > >>>>>> // equals is always defined, >> this >> > >> wouldn't hit >> > >> > > > > > >>> > > > > > > >>>>>> } >> > >> > > > > > >>> > > > > > > >>>>>> >> > >> > > > > > >>> > > > > > > >>>>>> The next question is: what if the >> user >> > >> doesn't >> > >> > > > > > >>> provide equals() >> > >> > > > > > >>> > > > > > > >> method ? >> > >> > > > > > >>> > > > > > > >>>>>> Would we automatically fall back to >> > >> > > > > emit-on-update ? >> > >> > > > > > >>> > > > > > > >>>>>> >> > >> > > > > > >>> > > > > > > >>>>>> Cheers >> > >> > > > > > >>> > > > > > > >>>>>> >> > >> > > > > > >>> > > > > > > >>>>>> On Tue, Feb 4, 2020 at 1:37 PM >> Matthias >> > >> J. Sax < >> > >> > > > > > >>> mj...@apache.org> >> > >> > > > > > >>> > > > > > > >>>>> wrote: >> > >> > > > > > >>> > > > > > > >>>>>> >> > >> > > > > > >>> > > > > > > > First a high level comment: >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > Overall, I would like to make one step >> back, >> > >> and make >> > >> > > > > sure >> > >> > > > > > >>> we are >> > >> > > > > > >>> > > > > > > > discussion on the same level. >> Originally, I >> > >> understood >> > >> > > > > > >>> this KIP >> > >> > > > > > >>> > > > > > > >>> as a >> > >> > > > > > >>> > > > > > > > proposed change of _semantics_, however, >> > >> given the >> > >> > > > > latest >> > >> > > > > > >>> > > > > > > >>> discussion >> > >> > > > > > >>> > > > > > > > it seems it's actually not -- it's more >> an >> > >> > > > > _optimization_ >> > >> > > > > > >>> > > > > > > >>> proposal. >> > >> > > > > > >>> > > > > > > > Hence, we only need to make sure that >> this >> > >> optimization >> > >> > > > > > >>> does not >> > >> > > > > > >>> > > > > > > >>> break >> > >> > > > > > >>> > > > > > > > existing semantics. It this the right >> way to >> > >> think >> > >> > > > > about >> > >> > > > > > >>> it? >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > If yes, than it might actually be ok to >> have >> > >> different >> > >> > > > > > >>> behavior >> > >> > > > > > >>> > > > > > > > depending if there is a materialized >> KTable >> > >> or not. So >> > >> > > > > > >>> far, we >> > >> > > > > > >>> > > > > > > >>> never >> > >> > > > > > >>> > > > > > > > defined a public contract about our emit >> > >> strategy and >> > >> > > > > it >> > >> > > > > > >>> seems >> > >> > > > > > >>> > > > > > > >>> this >> > >> > > > > > >>> > > > > > > > KIP does not define one either. >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > Hence, I don't have as strong of an >> opinion >> > >> about >> > >> > > > > sending >> > >> > > > > > >>> > > > > > > >>> oldValues >> > >> > > > > > >>> > > > > > > > for example any longer. I guess the >> question >> > >> is really, >> > >> > > > > > >>> what can >> > >> > > > > > >>> > > > > > > >>> we >> > >> > > > > > >>> > > > > > > > implement in a reasonable way. >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > Other comments: >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > @Richard: >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > Can you please add the KIP to the KIP >> > >> overview table: >> > >> > > > > It's >> > >> > > > > > >>> missing >> > >> > > > > > >>> > > > > > > > ( >> > >> > > > > > >>> > > > > > > >>>>>> >> > >> > > > > > >>> > > > > > > >>> >> > >> > > > > > >>> > > > > > >> > >> > > > > > >>> >> > >> > > > > >> > >> >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pro >> > >> > > > > > >>> > > > > > > > posals). >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > @Bruno: >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > You mentioned caching. I think it's >> irrelevant >> > >> > > > > > >>> (orthogonal) and >> > >> > > > > > >>> > > > > > > >>> we can >> > >> > > > > > >>> > > > > > > > discuss this KIP without considering it. >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > @John: >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > >>>>>>>>> Even in the source table, we >> forward >> > >> the >> > >> > > > > updated >> > >> > > > > > >>> record with >> > >> > > > > > >>> > > > > > the >> > >> > > > > > >>> > > > > > > >>>>>>>>> higher of the two timestamps. So >> the >> > >> example is >> > >> > > > > > >>> more like: >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > That is not correct. Currently, we >> forward >> > >> with the >> > >> > > > > smaller >> > >> > > > > > >>> > > > > > > > out-of-order timestamp (changing the >> > >> timestamp would >> > >> > > > > > >>> corrupt the >> > >> > > > > > >>> > > > > > > >>> data >> > >> > > > > > >>> > > > > > > > -- we don't know, because we don't >> check, if >> > >> the value >> > >> > > > > is >> > >> > > > > > >>> the >> > >> > > > > > >>> > > > > > > >>> same >> > >> > > > > > >>> > > > > > > >>>>>> or >> > >> > > > > > >>> > > > > > > > a different one, hence, we must emit the >> > >> out-of-order >> > >> > > > > > >>> record >> > >> > > > > > >>> > > > > > > >>> as-is). >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > If we start to do emit-on-change, we also >> > >> need to emit >> > >> > > > > a >> > >> > > > > > >>> new >> > >> > > > > > >>> > > > > > > >>> record if >> > >> > > > > > >>> > > > > > > > the timestamp changes due to out-of-order >> > >> data, hence, >> > >> > > > > we >> > >> > > > > > >>> would >> > >> > > > > > >>> > > > > > > >>> still >> > >> > > > > > >>> > > > > > > > need to emit <K,V,T1> because that give >> us >> > >> correct >> > >> > > > > > >>> semantics: >> > >> > > > > > >>> > > > > > > >>> assume >> > >> > > > > > >>> > > > > > > > you have a filter() and afterward use the >> > >> filter >> > >> > > > > KTable in >> > >> > > > > > >>> a >> > >> > > > > > >>> > > > > > > > stream-table join -- the lower T1 >> timestamp >> > >> must be >> > >> > > > > > >>> propagated to >> > >> > > > > > >>> > > > > > > >>> the >> > >> > > > > > >>> > > > > > > > filtered KTable to ensure that that the >> > >> stream-table >> > >> > > > > join >> > >> > > > > > >>> compute >> > >> > > > > > >>> > > > > > > >>> the >> > >> > > > > > >>> > > > > > > > correct result. >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > Your point about requiring an `equals()` >> > >> > > > > implementation is >> > >> > > > > > >>> > > > > > > >>> actually a >> > >> > > > > > >>> > > > > > > > quite interesting one and boils down to >> my >> > >> statement >> > >> > > > > from >> > >> > > > > > >>> above >> > >> > > > > > >>> > > > > > > >>> about >> > >> > > > > > >>> > > > > > > > "what can we actually implement". What I >> don't >> > >> > > > > understand >> > >> > > > > > >>> is: >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > >>>>>>>>> This way, we still don't have to >> rely >> > >> on the >> > >> > > > > > >>> existence of an >> > >> > > > > > >>> > > > > > > >>>>>>>>> equals() method, but if it is >> there, >> > >> we can >> > >> > > > > > >>> benefit from it. >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > Your bullet point (2) says it uses >> `equals()` >> > >> -- >> > >> > > > > hence, it >> > >> > > > > > >>> seems >> > >> > > > > > >>> > > > > > > >>> we >> > >> > > > > > >>> > > > > > > > actually to rely on it? Also, how can we >> > >> detect if >> > >> > > > > there >> > >> > > > > > >>> is an >> > >> > > > > > >>> > > > > > > > `equals()` method to do the comparison? >> Would >> > >> be fail >> > >> > > > > if >> > >> > > > > > >>> we don't >> > >> > > > > > >>> > > > > > > >>> have >> > >> > > > > > >>> > > > > > > > `equals()` nor corresponding serializes >> to do >> > >> the >> > >> > > > > > >>> comparison? >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > >>>>>>>>> Wow, really good catch! Yes, we >> > >> absolutely need >> > >> > > > > > >>> metrics and >> > >> > > > > > >>> > > > > > > >>> logs if >> > >> > > > > > >>> > > > > > > >>>>>>>>> we're going to drop any records. >> And, >> > >> yes, we >> > >> > > > > > >>> should propose >> > >> > > > > > >>> > > > > > > >>>>>>>>> metrics and logs that are >> similar to >> > >> the >> > >> > > > > existing >> > >> > > > > > >>> ones when we >> > >> > > > > > >>> > > > > > > >>> drop >> > >> > > > > > >>> > > > > > > >>>>>>>>> records for other reasons. >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > I am not sure about this point. In fact, >> we >> > >> have >> > >> > > > > already >> > >> > > > > > >>> some >> > >> > > > > > >>> > > > > > > >>> no-ops >> > >> > > > > > >>> > > > > > > > in Kafka Streams in our join-operators >> and >> > >> don't report >> > >> > > > > > >>> any of >> > >> > > > > > >>> > > > > > > >>> those >> > >> > > > > > >>> > > > > > > > either. Emit-on-change is operator >> semantics >> > >> and I >> > >> > > > > don't >> > >> > > > > > >>> see why >> > >> > > > > > >>> > > > > > > >>> we >> > >> > > > > > >>> > > > > > > > would need to have a metric for it? It >> seems >> > >> to be >> > >> > > > > quite >> > >> > > > > > >>> different >> > >> > > > > > >>> > > > > > > > compared to dropping late or malformed >> > >> records. >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > -Matthias >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > > On 2/4/20 7:13 AM, Thomas Becker wrote: >> > >> > > > > > >>> > > > > > > >>>>>>>>> Thanks John for your thoughtful >> > >> reply. Some >> > >> > > > > > >>> comments inline. >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>> On Mon, 2020-02-03 at 11:51 >> -0600, >> > >> John Roesler >> > >> > > > > > >>> wrote: >> > >> > > > > > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This >> > >> email was >> > >> > > > > sent >> > >> > > > > > >>> from outside >> > >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or >> > >> attachments >> > >> > > > > > >>> unless you >> > >> > > > > > >>> > > > > > expected >> > >> > > > > > >>> > > > > > > >>>>>>>>>> them. >> > >> ________________________________ >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Hi Tommy, >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Thanks for the context. I can >> see the >> > >> > > > > attraction >> > >> > > > > > >>> of >> > >> > > > > > >>> > > > > > considering >> > >> > > > > > >>> > > > > > > >>>>>>>>>> these use cases together. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> To answer your question, if a >> part >> > >> of the >> > >> > > > > record >> > >> > > > > > >>> is not >> > >> > > > > > >>> > > > > > > >>> relevant >> > >> > > > > > >>> > > > > > > >>>>>>>>>> to downstream consumers, I was >> > >> thinking you >> > >> > > > > could >> > >> > > > > > >>> just use a >> > >> > > > > > >>> > > > > > > >>>>>>>>>> mapValue to remove it. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> E.g., suppose you wanted to do a >> > >> join between >> > >> > > > > two >> > >> > > > > > >>> tables. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> employeeInfo.join( >> employeePayroll, >> > >> (info, >> > >> > > > > > >>> payroll) -> new >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Result(info.name(), >> > >> payroll.salary()) ) >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> We only care about one attribute >> > >> from the Info >> > >> > > > > > >>> table (name), >> > >> > > > > > >>> > > > > > > >>> and >> > >> > > > > > >>> > > > > > > >>>>>>>>>> one from the Payroll table >> (salary), >> > >> and these >> > >> > > > > > >>> attributes >> > >> > > > > > >>> > > > > > > >>> change >> > >> > > > > > >>> > > > > > > >>>>>>>>>> rarely. On the other hand, there >> > >> might be many >> > >> > > > > > >>> other >> > >> > > > > > >>> > > > > > attributes >> > >> > > > > > >>> > > > > > > >>>>>>>>>> that change frequently of these >> > >> tables. We can >> > >> > > > > > >>> avoid >> > >> > > > > > >>> > > > > > triggering >> > >> > > > > > >>> > > > > > > >>>>>>>>>> the join unnecessarily by >> mapping >> > >> the input >> > >> > > > > > >>> tables to drop the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> unnecessary information before >> the >> > >> join: >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> names = >> employeeInfo.mapValues(info >> > >> -> >> > >> > > > > info.name()) >> > >> > > > > > >>> salaries >> > >> > > > > > >>> > > > > > = >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> employeePayroll.mapValues(payroll -> >> > >> > > > > > >>> payroll.salary()) >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> names.join( salaries, (name, >> salary) >> > >> -> new >> > >> > > > > > >>> Result(name, >> > >> > > > > > >>> > > > > > > >>> salary) >> > >> > > > > > >>> > > > > > > >>>>>>>>>> ) >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>> Ahh yes I see. This works, but >> in the >> > >> case >> > >> > > > > where >> > >> > > > > > >>> you're using >> > >> > > > > > >>> > > > > > > >>>>>>>>> schemas as we are (e.g. Avro), it >> > >> seems like >> > >> > > > > this >> > >> > > > > > >>> approach >> > >> > > > > > >>> > > > > > could >> > >> > > > > > >>> > > > > > > >>>>>>>>> lead to a proliferation of >> "skinny" >> > >> record >> > >> > > > > types >> > >> > > > > > >>> that just drop >> > >> > > > > > >>> > > > > > > >>>>>>>>> various fields. >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Especially if we take Matthias's >> > >> idea to drop >> > >> > > > > > >>> non-changes even >> > >> > > > > > >>> > > > > > > >>>>>>>>>> for stateless operations, this >> would >> > >> be quite >> > >> > > > > > >>> efficient and is >> > >> > > > > > >>> > > > > > > >>>>>>>>>> also a very straightforward >> > >> optimization to >> > >> > > > > > >>> understand once >> > >> > > > > > >>> > > > > > you >> > >> > > > > > >>> > > > > > > >>>>>>>>>> know that Streams provides >> > >> emit-on-change. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> From the context that you >> provided, >> > >> it seems >> > >> > > > > like >> > >> > > > > > >>> a slightly >> > >> > > > > > >>> > > > > > > >>>>>>>>>> different situation, though. >> Reading >> > >> between >> > >> > > > > the >> > >> > > > > > >>> lines a >> > >> > > > > > >>> > > > > > > >>> little, >> > >> > > > > > >>> > > > > > > >>>>>>>>>> it sounds like: in contrast to >> the >> > >> example >> > >> > > > > above, >> > >> > > > > > >>> in which we >> > >> > > > > > >>> > > > > > > >>> are >> > >> > > > > > >>> > > > > > > >>>>>>>>>> filtering out extra _data_, you >> have >> > >> some >> > >> > > > > extra >> > >> > > > > > >>> _metadata_ >> > >> > > > > > >>> > > > > > that >> > >> > > > > > >>> > > > > > > >>>>>>>>>> you still wish to pass down >> with the >> > >> data when >> > >> > > > > > >>> there is a >> > >> > > > > > >>> > > > > > > >>> "real" >> > >> > > > > > >>> > > > > > > >>>>>>>>>> update, but you don't want the >> > >> metadata >> > >> > > > > itself to >> > >> > > > > > >>> cause an >> > >> > > > > > >>> > > > > > > >>>>>>>>>> update. >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>> Despite my lack of clarity, yes >> > >> you've got it >> > >> > > > > > >>> right ;) This >> > >> > > > > > >>> > > > > > > >>>>>>>>> particular processor is the first >> > >> stop for this >> > >> > > > > > >>> data after >> > >> > > > > > >>> > > > > > > >>> coming >> > >> > > > > > >>> > > > > > > >>>>>>>>> in from external users, who often >> > >> simply post >> > >> > > > > the >> > >> > > > > > >>> same content >> > >> > > > > > >>> > > > > > > >>> each >> > >> > > > > > >>> > > > > > > >>>>>>>>> time and we're trying to shield >> > >> downstream >> > >> > > > > > >>> consumers from >> > >> > > > > > >>> > > > > > > >>>>>>>>> unnecessary churn. >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> It does seem handy to be able to >> > >> plug in a >> > >> > > > > custom >> > >> > > > > > >>> > > > > > > >>> ChangeDetector >> > >> > > > > > >>> > > > > > > >>>>>>>>>> for this purpose, but I worry >> about >> > >> the API >> > >> > > > > > >>> complexity. Maybe >> > >> > > > > > >>> > > > > > > >>> you >> > >> > > > > > >>> > > > > > > >>>>>>>>>> can help think though how to >> provide >> > >> the same >> > >> > > > > > >>> benefit while >> > >> > > > > > >>> > > > > > > >>>>>>>>>> limiting user-facing complexity. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Here's some extra context to >> > >> consider: >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> We currently don't make any >> extra >> > >> requirements >> > >> > > > > > >>> about the >> > >> > > > > > >>> > > > > > nature >> > >> > > > > > >>> > > > > > > >>>>>>>>>> of data that you can use in >> Streams. >> > >> For >> > >> > > > > example, >> > >> > > > > > >>> you don't >> > >> > > > > > >>> > > > > > > >>> have >> > >> > > > > > >>> > > > > > > >>>>>>>>>> to implement hashCode and >> equals, or >> > >> > > > > compareTo, >> > >> > > > > > >>> etc. With the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> current proposal, we can do an >> > >> airtight >> > >> > > > > > >>> comparison based only >> > >> > > > > > >>> > > > > > > >>> on >> > >> > > > > > >>> > > > > > > >>>>>>>>>> the serialized form of the >> values, >> > >> and we >> > >> > > > > > >>> actually don't have >> > >> > > > > > >>> > > > > > > >>> to >> > >> > > > > > >>> > > > > > > >>>>>>>>>> deserialize the "prior" value >> at all >> > >> for a >> > >> > > > > large >> > >> > > > > > >>> number of >> > >> > > > > > >>> > > > > > > >>>>>>>>>> operations. Admitedly, if we >> extend >> > >> the >> > >> > > > > proposal >> > >> > > > > > >>> to include >> > >> > > > > > >>> > > > > > > >>> no-op >> > >> > > > > > >>> > > > > > > >>>>>>>>>> detection for stateless >> operations, >> > >> we'd >> > >> > > > > probably >> > >> > > > > > >>> need to rely >> > >> > > > > > >>> > > > > > > >>> on >> > >> > > > > > >>> > > > > > > >>>>>>>>>> equals() for no-op checking, >> > >> otherwise we'd >> > >> > > > > wind >> > >> > > > > > >>> up requiring >> > >> > > > > > >>> > > > > > > >>>>>>>>>> serdes for stateless operations >> as >> > >> well. >> > >> > > > > > >>> Actually, I'd >> > >> > > > > > >>> > > > > > probably >> > >> > > > > > >>> > > > > > > >>>>>>>>>> argue for doing exactly that: >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> 1. In stateful operations, drop >> if >> > >> the >> > >> > > > > serialized >> > >> > > > > > >>> byte[]s are >> > >> > > > > > >>> > > > > > > >>> the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> same. After deserializing, also >> drop >> > >> if the >> > >> > > > > > >>> objects are equal >> > >> > > > > > >>> > > > > > > >>>>>>>>>> according to Object#equals(). >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> 2. In stateless operations, >> compare >> > >> the "new" >> > >> > > > > and >> > >> > > > > > >>> "old" values >> > >> > > > > > >>> > > > > > > >>>>>>>>>> (if "old" is available) based on >> > >> > > > > Object#equals(). >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> 3. As a final optimization, >> after >> > >> serializing >> > >> > > > > and >> > >> > > > > > >>> before >> > >> > > > > > >>> > > > > > > >>> sending >> > >> > > > > > >>> > > > > > > >>>>>>>>>> repartition records, compare the >> > >> serialized >> > >> > > > > data >> > >> > > > > > >>> and drop >> > >> > > > > > >>> > > > > > > >>>>>>>>>> no-ops. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> This way, we still don't have to >> > >> rely on the >> > >> > > > > > >>> existence of an >> > >> > > > > > >>> > > > > > > >>>>>>>>>> equals() method, but if it is >> there, >> > >> we can >> > >> > > > > > >>> benefit from it. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Also, we don't require a serde >> in >> > >> any new >> > >> > > > > > >>> situations, but we >> > >> > > > > > >>> > > > > > > >>> can >> > >> > > > > > >>> > > > > > > >>>>>>>>>> still leverage it when it is >> > >> available. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> For clarity, in my example >> above, >> > >> even if the >> > >> > > > > > >>> employeeInfo and >> > >> > > > > > >>> > > > > > > >>>>>>>>>> employeePayroll and Result >> records >> > >> all have >> > >> > > > > > >>> serdes, we need >> > >> > > > > > >>> > > > > > the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> "name" field (presumably >> String) and >> > >> the >> > >> > > > > "salary" >> > >> > > > > > >>> field >> > >> > > > > > >>> > > > > > > >>>>>>>>>> (presumable a Double) to have >> serdes >> > >> as well >> > >> > > > > in >> > >> > > > > > >>> the naive >> > >> > > > > > >>> > > > > > > >>>>>>>>>> implementation. But if we can >> > >> leverage >> > >> > > > > equals(), >> > >> > > > > > >>> then the >> > >> > > > > > >>> > > > > > > >>> "right >> > >> > > > > > >>> > > > > > > >>>>>>>>>> thing" happens automatically. >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>> I still don't totally follow why >> the >> > >> individual >> > >> > > > > > >>> components >> > >> > > > > > >>> > > > > > > >>> (name, >> > >> > > > > > >>> > > > > > > >>>>>>>>> salary) would have to have serdes >> > >> here. If >> > >> > > > > Result >> > >> > > > > > >>> has one, we >> > >> > > > > > >>> > > > > > > >>>>>>>>> compare bytes, and if Result >> > >> additionally has >> > >> > > > > an >> > >> > > > > > >>> equals() >> > >> > > > > > >>> > > > > > method >> > >> > > > > > >>> > > > > > > >>>>>>>>> (which presumably includes equals >> > >> comparisons >> > >> > > > > on >> > >> > > > > > >>> the >> > >> > > > > > >>> > > > > > constituent >> > >> > > > > > >>> > > > > > > >>>>>>>>> fields), have we not covered our >> > >> bases? >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> This dovetails in with my >> primary UX >> > >> concern; >> > >> > > > > > >>> where would the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> ChangeDetector actually be >> > >> registered? None of >> > >> > > > > > >>> the operators >> > >> > > > > > >>> > > > > > in >> > >> > > > > > >>> > > > > > > >>>>>>>>>> my example have names or topics >> or >> > >> any other >> > >> > > > > > >>> identifiable >> > >> > > > > > >>> > > > > > > >>>>>>>>>> characteristic that could be >> passed >> > >> to a >> > >> > > > > > >>> ChangeDetector class >> > >> > > > > > >>> > > > > > > >>>>>>>>>> registered via config. You >> could say >> > >> that we >> > >> > > > > make >> > >> > > > > > >>> > > > > > > >>> ChangeDetector >> > >> > > > > > >>> > > > > > > >>>>>>>>>> an optional parameter to every >> > >> operation in >> > >> > > > > > >>> Streams, but this >> > >> > > > > > >>> > > > > > > >>>>>>>>>> seems to carry quite a bit of >> mental >> > >> burden >> > >> > > > > with >> > >> > > > > > >>> it. People >> > >> > > > > > >>> > > > > > > >>> will >> > >> > > > > > >>> > > > > > > >>>>>>>>>> wonder what it's for and >> whether or >> > >> not they >> > >> > > > > > >>> should be using >> > >> > > > > > >>> > > > > > > >>> it. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> There would almost certainly be >> a >> > >> > > > > misconception >> > >> > > > > > >>> that it's >> > >> > > > > > >>> > > > > > > >>>>>>>>>> preferable to implement it >> always, >> > >> which >> > >> > > > > would be >> > >> > > > > > >>> unfortunate. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Plus, to actually implment >> metadata >> > >> flowing >> > >> > > > > > >>> through the >> > >> > > > > > >>> > > > > > > >>> topology >> > >> > > > > > >>> > > > > > > >>>>>>>>>> as in your use case, you'd have >> to >> > >> do two >> > >> > > > > things: >> > >> > > > > > >>> 1. make sure >> > >> > > > > > >>> > > > > > > >>>>>>>>>> that all operations actually >> > >> preserve the >> > >> > > > > > >>> metadata alongside >> > >> > > > > > >>> > > > > > > >>> the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> data (e.g., don't accidentally >> add a >> > >> mapValues >> > >> > > > > > >>> like I did, or >> > >> > > > > > >>> > > > > > > >>> you >> > >> > > > > > >>> > > > > > > >>>>>>>>>> drop the metadata). 2. >> implement a >> > >> > > > > ChangeDetector >> > >> > > > > > >>> for every >> > >> > > > > > >>> > > > > > > >>>>>>>>>> single operation in the >> topology, or >> > >> you don't >> > >> > > > > > >>> get the benefit >> > >> > > > > > >>> > > > > > > >>> of >> > >> > > > > > >>> > > > > > > >>>>>>>>>> dropping non-changes internally >> 2b. >> > >> > > > > > >>> Alternatively, you could >> > >> > > > > > >>> > > > > > > >>> just >> > >> > > > > > >>> > > > > > > >>>>>>>>>> add the ChangeDetector to one >> > >> operation toward >> > >> > > > > > >>> the end of the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> topology. This would not drop >> > >> redundant >> > >> > > > > > >>> computation >> > >> > > > > > >>> > > > > > internally, >> > >> > > > > > >>> > > > > > > >>>>>>>>>> but only drop redundant >> _outputs_. >> > >> But this is >> > >> > > > > > >>> just about the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> same as your current solution. >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>> I definitely see your point >> regarding >> > >> > > > > > >>> configuration. I was >> > >> > > > > > >>> > > > > > > >>>>>>>>> originally thinking about this >> when >> > >> the >> > >> > > > > > >>> deduplication was going >> > >> > > > > > >>> > > > > > > >>> to >> > >> > > > > > >>> > > > > > > >>>>>>>>> be opt-in, and it seemed very >> natural >> > >> to say >> > >> > > > > > >>> something like: >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> employeeInfo.join(employeePayroll, >> > >> (info, >> > >> > > > > payroll) >> > >> > > > > > >>> -> new >> > >> > > > > > >>> > > > > > > >>>>>>>>> Result(info.name(), >> > >> payroll.salary())) >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> .suppress(duplicatesAccordingTo(someChangeDetector)) >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>> Alternatively you can imagine a >> > >> similar method >> > >> > > > > > >>> being on >> > >> > > > > > >>> > > > > > > >>>>>>>>> Materialized, though obviously >> this >> > >> makes less >> > >> > > > > > >>> sense if we >> > >> > > > > > >>> > > > > > don't >> > >> > > > > > >>> > > > > > > >>>>>>>>> want to require materialization. >> If >> > >> we're now >> > >> > > > > > >>> talking about >> > >> > > > > > >>> > > > > > > >>>>>>>>> changing the default behavior >> and not >> > >> having >> > >> > > > > any >> > >> > > > > > >>> configuration >> > >> > > > > > >>> > > > > > > >>>>>>>>> options, it's harder to find a >> place >> > >> for this. >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> A final thought; if it really >> is a >> > >> metadata >> > >> > > > > > >>> question, can we >> > >> > > > > > >>> > > > > > > >>> just >> > >> > > > > > >>> > > > > > > >>>>>>>>>> plan to finish up the support >> for >> > >> headers in >> > >> > > > > > >>> Streams? I.e., >> > >> > > > > > >>> > > > > > > >>> give >> > >> > > > > > >>> > > > > > > >>>>>>>>>> you a way to control the way >> that >> > >> headers flow >> > >> > > > > > >>> through the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> topology? Then, we could treat >> > >> headers the >> > >> > > > > same >> > >> > > > > > >>> way we treat >> > >> > > > > > >>> > > > > > > >>>>>>>>>> timestamps in the no-op >> checking... >> > >> We >> > >> > > > > completely >> > >> > > > > > >>> ignore them >> > >> > > > > > >>> > > > > > > >>>>>>>>>> for the sake of comparison. >> Thus, >> > >> neither the >> > >> > > > > > >>> timestamp nor >> > >> > > > > > >>> > > > > > the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> headers would get updated in >> > >> internal state >> > >> > > > > or in >> > >> > > > > > >>> downstream >> > >> > > > > > >>> > > > > > > >>>>>>>>>> views as long as the value >> itself >> > >> doesn't >> > >> > > > > change. >> > >> > > > > > >>> This seems >> > >> > > > > > >>> > > > > > to >> > >> > > > > > >>> > > > > > > >>>>>>>>>> give us a way to support your >> use >> > >> case without >> > >> > > > > > >>> adding to the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> mental overhead of using >> Streams for >> > >> simple >> > >> > > > > > >>> things. >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>> Agree headers could be a decent >> fit >> > >> for this >> > >> > > > > > >>> particular case >> > >> > > > > > >>> > > > > > > >>>>>>>>> because it's mostly metadata, >> though >> > >> to be >> > >> > > > > honest >> > >> > > > > > >>> we haven't >> > >> > > > > > >>> > > > > > > >>> looked >> > >> > > > > > >>> > > > > > > >>>>>>>>> at headers much (mostly because, >> and >> > >> to your >> > >> > > > > > >>> point, support >> > >> > > > > > >>> > > > > > > >>> seems >> > >> > > > > > >>> > > > > > > >>>>>>>>> to be lacking). I feel like there >> > >> would be >> > >> > > > > other >> > >> > > > > > >>> cases where >> > >> > > > > > >>> > > > > > > >>> this >> > >> > > > > > >>> > > > > > > >>>>>>>>> feature could be valuable, but I >> > >> admit I can't >> > >> > > > > > >>> come up with >> > >> > > > > > >>> > > > > > > >>>>>>>>> anything right this second. >> Perhaps >> > >> yuzhihong >> > >> > > > > had >> > >> > > > > > >>> an example in >> > >> > > > > > >>> > > > > > > >>>>>>>>> mind? >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> I.e., simple things should be >> easy, >> > >> and >> > >> > > > > complex >> > >> > > > > > >>> things should >> > >> > > > > > >>> > > > > > > >>> be >> > >> > > > > > >>> > > > > > > >>>>>>>>>> possible. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> What are your thoughts? Thanks, >> -John >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> On Mon, Feb 3, 2020, at 07:19, >> > >> Thomas Becker >> > >> > > > > > >>> wrote: >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Hi John, Can you describe how >> you'd >> > >> use >> > >> > > > > > >>> filtering/mapping to >> > >> > > > > > >>> > > > > > > >>>>>>>>>> deduplicate records? To give >> some >> > >> background >> > >> > > > > on >> > >> > > > > > >>> my suggestion >> > >> > > > > > >>> > > > > > > >>> we >> > >> > > > > > >>> > > > > > > >>>>>>>>>> currently have a small stream >> > >> processor that >> > >> > > > > > >>> exists solely to >> > >> > > > > > >>> > > > > > > >>>>>>>>>> deduplicate, which we do using a >> > >> process that >> > >> > > > > I >> > >> > > > > > >>> assume would >> > >> > > > > > >>> > > > > > be >> > >> > > > > > >>> > > > > > > >>>>>>>>>> similar to what would be done >> here >> > >> (with a >> > >> > > > > store >> > >> > > > > > >>> of keys and >> > >> > > > > > >>> > > > > > > >>> hash >> > >> > > > > > >>> > > > > > > >>>>>>>>>> values). But the records we are >> > >> deduplicating >> > >> > > > > > >>> have some >> > >> > > > > > >>> > > > > > > >>> metadata >> > >> > > > > > >>> > > > > > > >>>>>>>>>> fields (such as timestamps of >> when >> > >> the record >> > >> > > > > was >> > >> > > > > > >>> posted) that >> > >> > > > > > >>> > > > > > > >>> we >> > >> > > > > > >>> > > > > > > >>>>>>>>>> don't consider semantically >> > >> meaningful for >> > >> > > > > > >>> downstream >> > >> > > > > > >>> > > > > > > >>> consumers, >> > >> > > > > > >>> > > > > > > >>>>>>>>>> and therefore we also suppress >> > >> updates that >> > >> > > > > only >> > >> > > > > > >>> touch those >> > >> > > > > > >>> > > > > > > >>>>>>>>>> fields. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> -Tommy >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, 2020-01-31 at 19:30 >> -0600, >> > >> John >> > >> > > > > Roesler >> > >> > > > > > >>> wrote: >> > >> > > > > > >>> > > > > > > >>> [EXTERNAL >> > >> > > > > > >>> > > > > > > >>>>>>>>>> EMAIL] Attention: This email was >> > >> sent from >> > >> > > > > > >>> outside TiVo. DO >> > >> > > > > > >>> > > > > > NOT >> > >> > > > > > >>> > > > > > > >>>>>>>>>> CLICK any links or attachments >> > >> unless you >> > >> > > > > > >>> expected them. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> ________________________________ >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Hi Thomas and yuzhihong, That’s >> an >> > >> interesting >> > >> > > > > > >>> idea. Can you >> > >> > > > > > >>> > > > > > > >>> help >> > >> > > > > > >>> > > > > > > >>>>>>>>>> think of a use case that isn’t >> also >> > >> served by >> > >> > > > > > >>> filtering or >> > >> > > > > > >>> > > > > > > >>>>>>>>>> mapping beforehand? Thanks for >> > >> helping to >> > >> > > > > design >> > >> > > > > > >>> this feature! >> > >> > > > > > >>> > > > > > > >>>>>>>>>> -John On Fri, Jan 31, 2020, at >> 18:56, >> > >> > > > > > >>> yuzhih...@gmail.com >> > >> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:yuzhih...@gmail.com> >> wrote: >> > >> I think >> > >> > > > > this >> > >> > > > > > >>> is good >> > >> > > > > > >>> > > > > > > >>> idea. On >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Jan 31, 2020, at 4:49 PM, Thomas >> > >> Becker < >> > >> > > > > > >>> > > > > > > >>> thomas.bec...@tivo.com >> > >> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:thomas.bec...@tivo.com >> >> >> > >> wrote: How >> > >> > > > > do >> > >> > > > > > >>> folks feel >> > >> > > > > > >>> > > > > > > >>> about >> > >> > > > > > >>> > > > > > > >>>>>>>>>> allowing the mechanism by which >> > >> no-ops are >> > >> > > > > > >>> detected to be >> > >> > > > > > >>> > > > > > > >>>>>>>>>> pluggable? Meaning use something >> > >> like a hash >> > >> > > > > by >> > >> > > > > > >>> default, but >> > >> > > > > > >>> > > > > > > >>> you >> > >> > > > > > >>> > > > > > > >>>>>>>>>> could optionally provide an >> > >> implementation of >> > >> > > > > > >>> something to use >> > >> > > > > > >>> > > > > > > >>>>>>>>>> instead, like a ChangeDetector. >> This >> > >> could be >> > >> > > > > > >>> useful for >> > >> > > > > > >>> > > > > > > >>> example >> > >> > > > > > >>> > > > > > > >>>>>>>>>> to ignore changes to certain >> fields, >> > >> which may >> > >> > > > > > >>> not be relevant >> > >> > > > > > >>> > > > > > > >>> to >> > >> > > > > > >>> > > > > > > >>>>>>>>>> the operation being performed. >> > >> > > > > > >>> > > > > > ________________________________ >> > >> > > > > > >>> > > > > > > >>>>>>>>>> From: John Roesler < >> > >> vvcep...@apache.org >> > >> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:vvcep...@apache.org>> >> Sent: >> > >> Friday, >> > >> > > > > > >>> January 31, 2020 >> > >> > > > > > >>> > > > > > > >>> 4:51 >> > >> > > > > > >>> > > > > > > >>>>>>>>>> PM To: dev@kafka.apache.org >> <mailto: >> > >> > > > > > >>> dev@kafka.apache.org> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> <dev@kafka.apache.org <mailto: >> > >> > > > > > >>> dev@kafka.apache.org>> Subject: >> > >> > > > > > >>> > > > > > > >>> Re: >> > >> > > > > > >>> > > > > > > >>>>>>>>>> [KAFKA-557] Add emit on change >> > >> support for >> > >> > > > > Kafka >> > >> > > > > > >>> Streams >> > >> > > > > > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This >> > >> email was >> > >> > > > > sent >> > >> > > > > > >>> from outside >> > >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or >> > >> attachments >> > >> > > > > > >>> unless you >> > >> > > > > > >>> > > > > > expected >> > >> > > > > > >>> > > > > > > >>>>>>>>>> them. >> > >> ________________________________ >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Hello all, Sorry for my >> silence. It >> > >> seems >> > >> > > > > like we >> > >> > > > > > >>> are getting >> > >> > > > > > >>> > > > > > > >>>>>>>>>> close to consensus. Hopefully, >> we >> > >> could move >> > >> > > > > to a >> > >> > > > > > >>> vote soon! >> > >> > > > > > >>> > > > > > > >>> All >> > >> > > > > > >>> > > > > > > >>>>>>>>>> of the reasoning from Matthias >> and >> > >> Bruno >> > >> > > > > around >> > >> > > > > > >>> timestamp is >> > >> > > > > > >>> > > > > > > >>>>>>>>>> compelling. I would be strongly >> in >> > >> favor of >> > >> > > > > > >>> stating a few >> > >> > > > > > >>> > > > > > > >>> things >> > >> > > > > > >>> > > > > > > >>>>>>>>>> very clearly in the KIP: 1. >> Streams >> > >> will drop >> > >> > > > > > >>> no-op updates >> > >> > > > > > >>> > > > > > > >>> only >> > >> > > > > > >>> > > > > > > >>>>>>>>>> for KTable operations. That is, >> we >> > >> won't make >> > >> > > > > any >> > >> > > > > > >>> changes to >> > >> > > > > > >>> > > > > > > >>>>>>>>>> KStream aggregations at the >> moment. >> > >> It does >> > >> > > > > seem >> > >> > > > > > >>> like we can >> > >> > > > > > >>> > > > > > > >>>>>>>>>> potentially revisit the time >> > >> semantics of that >> > >> > > > > > >>> operation in >> > >> > > > > > >>> > > > > > the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> future, but we don't need to do >> it >> > >> now. On the >> > >> > > > > > >>> other hand, the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> proposed semantics for KTable >> > >> timestamps >> > >> > > > > (marking >> > >> > > > > > >>> the >> > >> > > > > > >>> > > > > > beginning >> > >> > > > > > >>> > > > > > > >>>>>>>>>> of the validity of that record) >> > >> makes sense to >> > >> > > > > > >>> me. 2. Streams >> > >> > > > > > >>> > > > > > > >>>>>>>>>> will only drop no-op updates for >> > >> _stateful_ >> > >> > > > > > >>> KTable operations. >> > >> > > > > > >>> > > > > > > >>> We >> > >> > > > > > >>> > > > > > > >>>>>>>>>> don't want to add a hard >> guarantee >> > >> that >> > >> > > > > Streams >> > >> > > > > > >>> will _never_ >> > >> > > > > > >>> > > > > > > >>>>>>>>>> emit a no-op table update >> because it >> > >> would >> > >> > > > > > >>> require adding >> > >> > > > > > >>> > > > > > state >> > >> > > > > > >>> > > > > > > >>>>>>>>>> to otherwise stateless >> operations. >> > >> If someone >> > >> > > > > is >> > >> > > > > > >>> really >> > >> > > > > > >>> > > > > > > >>> concerned >> > >> > > > > > >>> > > > > > > >>>>>>>>>> about a particular stateless >> > >> operation >> > >> > > > > producing >> > >> > > > > > >>> a lot of >> > >> > > > > > >>> > > > > > no-op >> > >> > > > > > >>> > > > > > > >>>>>>>>>> results, all they have to do is >> > >> materialize >> > >> > > > > it, >> > >> > > > > > >>> and Streams >> > >> > > > > > >>> > > > > > > >>> would >> > >> > > > > > >>> > > > > > > >>>>>>>>>> automatically drop the no-ops. >> > >> Additionally, >> > >> > > > > I'm >> > >> > > > > > >>> +1 on not >> > >> > > > > > >>> > > > > > > >>> adding >> > >> > > > > > >>> > > > > > > >>>>>>>>>> an opt-out at this time. >> Regarding >> > >> the KIP >> > >> > > > > > >>> itself, I would >> > >> > > > > > >>> > > > > > > >>> clean >> > >> > > > > > >>> > > > > > > >>>>>>>>>> it up a bit before calling for a >> > >> vote. There >> > >> > > > > is a >> > >> > > > > > >>> lot of >> > >> > > > > > >>> > > > > > > >>>>>>>>>> "discussion"-type language >> there, >> > >> which is >> > >> > > > > very >> > >> > > > > > >>> natural to >> > >> > > > > > >>> > > > > > > >>> read, >> > >> > > > > > >>> > > > > > > >>>>>>>>>> but makes it a bit hard to see >> what >> > >> _exactly_ >> > >> > > > > the >> > >> > > > > > >>> kip is >> > >> > > > > > >>> > > > > > > >>>>>>>>>> proposing. Richard, would you >> mind >> > >> just making >> > >> > > > > > >>> the "proposed >> > >> > > > > > >>> > > > > > > >>>>>>>>>> behavior change" a simple and >> > >> succinct list of >> > >> > > > > > >>> bullet points? >> > >> > > > > > >>> > > > > > > >>>>>>>>>> I.e., please drop glue phrases >> like >> > >> "there has >> > >> > > > > > >>> been some >> > >> > > > > > >>> > > > > > > >>>>>>>>>> discussion" or "possibly we >> could do >> > >> X". For >> > >> > > > > the >> > >> > > > > > >>> final version >> > >> > > > > > >>> > > > > > > >>> of >> > >> > > > > > >>> > > > > > > >>>>>>>>>> the KIP, it should just say, >> > >> "Streams will do >> > >> > > > > X, >> > >> > > > > > >>> Streams will >> > >> > > > > > >>> > > > > > > >>> do >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Y". Feel free to add an >> elaboration >> > >> section to >> > >> > > > > > >>> explain more >> > >> > > > > > >>> > > > > > > >>> about >> > >> > > > > > >>> > > > > > > >>>>>>>>>> what X and Y mean, but we don't >> need >> > >> to talk >> > >> > > > > about >> > >> > > > > > >>> > > > > > > >>> possibilities >> > >> > > > > > >>> > > > > > > >>>>>>>>>> or alternatives except in the >> > >> "rejected >> > >> > > > > > >>> alternatives" section. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Accordingly, can you also move >> the >> > >> options you >> > >> > > > > > >>> presented in >> > >> > > > > > >>> > > > > > the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> intro to the "rejected >> alternatives" >> > >> section >> > >> > > > > and >> > >> > > > > > >>> only mention >> > >> > > > > > >>> > > > > > > >>> the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> final proposal itself? This just >> > >> really helps >> > >> > > > > > >>> reviewers to >> > >> > > > > > >>> > > > > > know >> > >> > > > > > >>> > > > > > > >>>>>>>>>> what they are voting for, and it >> > >> helps >> > >> > > > > everyone >> > >> > > > > > >>> after the fact >> > >> > > > > > >>> > > > > > > >>>>>>>>>> when they are trying to get >> clarity >> > >> on what >> > >> > > > > > >>> exactly the >> > >> > > > > > >>> > > > > > > >>> proposal >> > >> > > > > > >>> > > > > > > >>>>>>>>>> is, versus all the things it >> could >> > >> have been. >> > >> > > > > > >>> Thanks, -John >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> On Mon, Jan 27, 2020, at 18:14, >> > >> Richard Yu >> > >> > > > > wrote: >> > >> > > > > > >>> Hello to >> > >> > > > > > >>> > > > > > all, >> > >> > > > > > >>> > > > > > > >>>>>>>>>> I've finished making some >> initial >> > >> > > > > modifications >> > >> > > > > > >>> to the KIP. I >> > >> > > > > > >>> > > > > > > >>>>>>>>>> have decided to keep the >> > >> implementation >> > >> > > > > section >> > >> > > > > > >>> in the KIP for >> > >> > > > > > >>> > > > > > > >>>>>>>>>> record-keeping purposes. For >> now, we >> > >> should >> > >> > > > > focus >> > >> > > > > > >>> on only the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> proposed behavior changes >> instead. >> > >> See if you >> > >> > > > > > >>> have any >> > >> > > > > > >>> > > > > > > >>> comments! >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Cheers, Richard On Sat, Jan 25, >> 2020 >> > >> at 11:12 >> > >> > > > > AM >> > >> > > > > > >>> Richard Yu >> > >> > > > > > >>> > > > > > > >>>>>>>>>> <yohan.richard...@gmail.com >> <mailto: >> > >> > > > > > >>> > > > > > yohan.richard...@gmail.com >> > >> > > > > > >>> > > > > > > >>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> wrote: Hi all, Thanks for all >> the >> > >> discussion! >> > >> > > > > > >>> @John and @Bruno >> > >> > > > > > >>> > > > > > > >>> I >> > >> > > > > > >>> > > > > > > >>>>>>>>>> will survey other possible >> systems >> > >> and see >> > >> > > > > what I >> > >> > > > > > >>> can do. Just >> > >> > > > > > >>> > > > > > > >>> a >> > >> > > > > > >>> > > > > > > >>>>>>>>>> question, by systems, I suppose >> you >> > >> would mean >> > >> > > > > > >>> the pros and >> > >> > > > > > >>> > > > > > > >>> cons >> > >> > > > > > >>> > > > > > > >>>>>>>>>> of different reporting >> strategies? >> > >> I'm not >> > >> > > > > > >>> completely certain >> > >> > > > > > >>> > > > > > > >>> on >> > >> > > > > > >>> > > > > > > >>>>>>>>>> this point, so it would be >> great if >> > >> you can >> > >> > > > > > >>> clarify on that. >> > >> > > > > > >>> > > > > > So >> > >> > > > > > >>> > > > > > > >>>>>>>>>> here's what I got from all the >> > >> discussion so >> > >> > > > > far: >> > >> > > > > > >>> - Since both >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Matthias and John seems to have >> come >> > >> to a >> > >> > > > > > >>> consensus on this, >> > >> > > > > > >>> > > > > > > >>> then >> > >> > > > > > >>> > > > > > > >>>>>>>>>> we will go for an all-round >> > >> behavorial change >> > >> > > > > for >> > >> > > > > > >>> KTables. >> > >> > > > > > >>> > > > > > > >>> After >> > >> > > > > > >>> > > > > > > >>>>>>>>>> some thought, I decided that for >> > >> now, an >> > >> > > > > opt-out >> > >> > > > > > >>> config will >> > >> > > > > > >>> > > > > > > >>> not >> > >> > > > > > >>> > > > > > > >>>>>>>>>> be added. As John have pointed >> out, >> > >> no-op >> > >> > > > > changes >> > >> > > > > > >>> tend to >> > >> > > > > > >>> > > > > > > >>>>>>>>>> explode further down the >> topology as >> > >> they are >> > >> > > > > > >>> forwarded to >> > >> > > > > > >>> > > > > > more >> > >> > > > > > >>> > > > > > > >>>>>>>>>> and more processor nodes >> downstream. >> > >> - About >> > >> > > > > > >>> using hash codes, >> > >> > > > > > >>> > > > > > > >>>>>>>>>> after some explanation from >> John, it >> > >> looks >> > >> > > > > like >> > >> > > > > > >>> hash codes >> > >> > > > > > >>> > > > > > > >>> might >> > >> > > > > > >>> > > > > > > >>>>>>>>>> not be as ideal (for >> > >> implementation). For >> > >> > > > > now, we >> > >> > > > > > >>> will omit >> > >> > > > > > >>> > > > > > > >>> that >> > >> > > > > > >>> > > > > > > >>>>>>>>>> detail, and save it for the PR. >> - >> > >> @Bruno You >> > >> > > > > do >> > >> > > > > > >>> have valid >> > >> > > > > > >>> > > > > > > >>>>>>>>>> concerns. Though, I am not >> > >> completely certain >> > >> > > > > if >> > >> > > > > > >>> we want to do >> > >> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-change only for >> materialized >> > >> KTables. >> > >> > > > > I >> > >> > > > > > >>> will put it >> > >> > > > > > >>> > > > > > > >>> down >> > >> > > > > > >>> > > > > > > >>>>>>>>>> in the KIP regardless. I will >> do my >> > >> best to >> > >> > > > > > >>> address all points >> > >> > > > > > >>> > > > > > > >>>>>>>>>> raised so far on the discussion. >> > >> Hope we could >> > >> > > > > > >>> keep this >> > >> > > > > > >>> > > > > > going! >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Best, Richard On Fri, Jan 24, >> 2020 >> > >> at 6:07 PM >> > >> > > > > > >>> Bruno Cadonna >> > >> > > > > > >>> > > > > > > >>>>>>>>>> <br...@confluent.io <mailto: >> > >> > > > > br...@confluent.io>> >> > >> > > > > > >>> wrote: Thank >> > >> > > > > > >>> > > > > > > >>> you >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Matthias for the use cases! >> Looking >> > >> at both >> > >> > > > > use >> > >> > > > > > >>> cases, I think >> > >> > > > > > >>> > > > > > > >>>>>>>>>> you need to elaborate on them >> in the >> > >> KIP, >> > >> > > > > > >>> Richard. Emit from >> > >> > > > > > >>> > > > > > > >>>>>>>>>> plain KTable: I agree with >> Matthias >> > >> that the >> > >> > > > > > >>> lower timestamp >> > >> > > > > > >>> > > > > > > >>>>>>>>>> makes sense because it marks the >> > >> start of the >> > >> > > > > > >>> validity of the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> record. Idempotent records with >> a >> > >> higher >> > >> > > > > > >>> timestamp can be >> > >> > > > > > >>> > > > > > > >>> safely >> > >> > > > > > >>> > > > > > > >>>>>>>>>> ignored. A corner case that I >> > >> discussed with >> > >> > > > > > >>> Matthias offline >> > >> > > > > > >>> > > > > > > >>> is >> > >> > > > > > >>> > > > > > > >>>>>>>>>> when we do not materialize a >> KTable >> > >> due to >> > >> > > > > > >>> optimization. Then >> > >> > > > > > >>> > > > > > > >>> we >> > >> > > > > > >>> > > > > > > >>>>>>>>>> cannot avoid the idempotent >> records >> > >> because >> > >> > > > > we do >> > >> > > > > > >>> not keep the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> first record with the lower >> > >> timestamp to >> > >> > > > > compare >> > >> > > > > > >>> to. Emit from >> > >> > > > > > >>> > > > > > > >>>>>>>>>> KTable with aggregations: If we >> > >> specify that >> > >> > > > > an >> > >> > > > > > >>> aggregation >> > >> > > > > > >>> > > > > > > >>>>>>>>>> result should have the highest >> > >> timestamp of >> > >> > > > > the >> > >> > > > > > >>> records that >> > >> > > > > > >>> > > > > > > >>>>>>>>>> participated in the >> aggregation, we >> > >> cannot >> > >> > > > > ignore >> > >> > > > > > >>> any >> > >> > > > > > >>> > > > > > > >>> idempotent >> > >> > > > > > >>> > > > > > > >>>>>>>>>> records. Admittedly, the result >> of an >> > >> > > > > aggregation >> > >> > > > > > >>> usually >> > >> > > > > > >>> > > > > > > >>>>>>>>>> changes, but there are >> aggregations >> > >> where the >> > >> > > > > > >>> result may not >> > >> > > > > > >>> > > > > > > >>>>>>>>>> change like min and max, or sum >> when >> > >> the >> > >> > > > > incoming >> > >> > > > > > >>> records have >> > >> > > > > > >>> > > > > > > >>> a >> > >> > > > > > >>> > > > > > > >>>>>>>>>> value of zero. In those cases, >> we >> > >> could >> > >> > > > > benefit >> > >> > > > > > >>> of the emit on >> > >> > > > > > >>> > > > > > > >>>>>>>>>> change, but only if we define >> the >> > >> semantics >> > >> > > > > of the >> > >> > > > > > >>> > > > > > aggregations >> > >> > > > > > >>> > > > > > > >>>>>>>>>> to not use the highest >> timestamp of >> > >> the >> > >> > > > > > >>> participating records >> > >> > > > > > >>> > > > > > > >>> for >> > >> > > > > > >>> > > > > > > >>>>>>>>>> the result. In Kafka Streams, >> we do >> > >> not have >> > >> > > > > min, >> > >> > > > > > >>> max, and sum >> > >> > > > > > >>> > > > > > > >>> as >> > >> > > > > > >>> > > > > > > >>>>>>>>>> explicit aggregations, but we >> need >> > >> to provide >> > >> > > > > an >> > >> > > > > > >>> API to define >> > >> > > > > > >>> > > > > > > >>>>>>>>>> what timestamp should be used >> for >> > >> the result >> > >> > > > > of >> > >> > > > > > >>> an aggregation >> > >> > > > > > >>> > > > > > > >>> if >> > >> > > > > > >>> > > > > > > >>>>>>>>>> we want to go down this path. >> All of >> > >> this does >> > >> > > > > > >>> not block this >> > >> > > > > > >>> > > > > > > >>> KIP >> > >> > > > > > >>> > > > > > > >>>>>>>>>> and I just wanted to put this >> > >> aspects up for >> > >> > > > > > >>> discussion. The >> > >> > > > > > >>> > > > > > > >>> KIP >> > >> > > > > > >>> > > > > > > >>>>>>>>>> can limit itself to emit from >> > >> materialized >> > >> > > > > > >>> KTables. However, >> > >> > > > > > >>> > > > > > > >>> the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> limits should be explicitly >> stated >> > >> in the KIP. >> > >> > > > > > >>> Best, Bruno >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020 at 10:58 AM >> > >> Matthias J. >> > >> > > > > Sax >> > >> > > > > > >>> > > > > > > >>>>>>>>>> <matth...@confluent.io <mailto: >> > >> > > > > > >>> matth...@confluent.io>> wrote: >> > >> > > > > > >>> > > > > > > >>>>>>>>>> IMHO, the question about >> semantics >> > >> depends on >> > >> > > > > the >> > >> > > > > > >>> use case, in >> > >> > > > > > >>> > > > > > > >>>>>>>>>> particular on the origin of a >> > >> KTable. If >> > >> > > > > there is >> > >> > > > > > >>> a changlog >> > >> > > > > > >>> > > > > > > >>>>>>>>>> topic that one reads directly >> into a >> > >> KTable, >> > >> > > > > > >>> emit-on-change >> > >> > > > > > >>> > > > > > > >>> does >> > >> > > > > > >>> > > > > > > >>>>>>>>>> actually make sense, because the >> > >> timestamp >> > >> > > > > > >>> indicates _when_ >> > >> > > > > > >>> > > > > > the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> update was _effective_. For this >> > >> case, it is >> > >> > > > > > >>> semantically >> > >> > > > > > >>> > > > > > sound >> > >> > > > > > >>> > > > > > > >>>>>>>>>> to _not_ update the timestamp >> in the >> > >> store, >> > >> > > > > > >>> because the second >> > >> > > > > > >>> > > > > > > >>>>>>>>>> update is actually idempotent >> and >> > >> advancing >> > >> > > > > the >> > >> > > > > > >>> timestamp is >> > >> > > > > > >>> > > > > > > >>> not >> > >> > > > > > >>> > > > > > > >>>>>>>>>> ideal (one could even consider >> it to >> > >> be wrong >> > >> > > > > to >> > >> > > > > > >>> advance the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> timestamp) because the "valid >> time" >> > >> of the >> > >> > > > > record >> > >> > > > > > >>> pair did not >> > >> > > > > > >>> > > > > > > >>>>>>>>>> change. This reasoning also >> applies >> > >> to >> > >> > > > > > >>> KTable-KTable joins. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> However, if the KTable is the >> result >> > >> of an >> > >> > > > > > >>> aggregation, I >> > >> > > > > > >>> > > > > > think >> > >> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-update is more natural, >> > >> because the >> > >> > > > > > >>> timestamp reflects >> > >> > > > > > >>> > > > > > > >>>>>>>>>> the _last_ time (ie, highest >> > >> timestamp) of all >> > >> > > > > > >>> input records >> > >> > > > > > >>> > > > > > > >>> the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> contributed to the result. >> Hence, >> > >> updating the >> > >> > > > > > >>> timestamp and >> > >> > > > > > >>> > > > > > > >>>>>>>>>> emitting a new record actually >> > >> sounds correct >> > >> > > > > to >> > >> > > > > > >>> me. This >> > >> > > > > > >>> > > > > > > >>> applies >> > >> > > > > > >>> > > > > > > >>>>>>>>>> to windowed and non-windowed >> > >> aggregations >> > >> > > > > IMHO. >> > >> > > > > > >>> However, >> > >> > > > > > >>> > > > > > > >>>>>>>>>> considering the argument that >> the >> > >> timestamp >> > >> > > > > > >>> should not be >> > >> > > > > > >>> > > > > > > >>> update >> > >> > > > > > >>> > > > > > > >>>>>>>>>> in the first case in the store >> to >> > >> begin with, >> > >> > > > > > >>> both cases are >> > >> > > > > > >>> > > > > > > >>>>>>>>>> actually the same, and both can >> be >> > >> modeled as >> > >> > > > > > >>> emit-on-change: >> > >> > > > > > >>> > > > > > > >>> if >> > >> > > > > > >>> > > > > > > >>>>>>>>>> a `table()` operator does not >> update >> > >> the >> > >> > > > > > >>> timestamp if the >> > >> > > > > > >>> > > > > > value >> > >> > > > > > >>> > > > > > > >>>>>>>>>> does not change, there is _no_ >> > >> change and thus >> > >> > > > > > >>> nothing is >> > >> > > > > > >>> > > > > > > >>>>>>>>>> emitted. At the same time, if an >> > >> aggregation >> > >> > > > > > >>> operator does >> > >> > > > > > >>> > > > > > > >>> update >> > >> > > > > > >>> > > > > > > >>>>>>>>>> the timestamp (even if the value >> > >> does not >> > >> > > > > change) >> > >> > > > > > >>> there _is_ a >> > >> > > > > > >>> > > > > > > >>>>>>>>>> change and we emit. Note that >> > >> handling >> > >> > > > > > >>> out-of-order data for >> > >> > > > > > >>> > > > > > > >>>>>>>>>> aggregations would also work >> > >> seamlessly with >> > >> > > > > this >> > >> > > > > > >>> approach -- >> > >> > > > > > >>> > > > > > > >>> for >> > >> > > > > > >>> > > > > > > >>>>>>>>>> out-of-order records, the >> timestamp >> > >> does never >> > >> > > > > > >>> change, and >> > >> > > > > > >>> > > > > > > >>> thus, >> > >> > > > > > >>> > > > > > > >>>>>>>>>> we only emit if the result >> itself >> > >> changes. >> > >> > > > > > >>> Therefore, I would >> > >> > > > > > >>> > > > > > > >>>>>>>>>> argue that we might not even >> need >> > >> any config, >> > >> > > > > > >>> because the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-change behavior is just >> > >> correct and >> > >> > > > > > >>> reduced the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> downstream load, while our >> current >> > >> behavior is >> > >> > > > > > >>> not ideal (even >> > >> > > > > > >>> > > > > > > >>> if >> > >> > > > > > >>> > > > > > > >>>>>>>>>> it's also correct). Thoughts? >> > >> -Matthias On >> > >> > > > > > >>> 1/24/20 9:37 AM, >> > >> > > > > > >>> > > > > > > >>> John >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Roesler wrote: Hi Bruno, Thanks >> for >> > >> that >> > >> > > > > idea. I >> > >> > > > > > >>> hadn't >> > >> > > > > > >>> > > > > > > >>>>>>>>>> considered that option before, >> and >> > >> it does >> > >> > > > > seem >> > >> > > > > > >>> like that >> > >> > > > > > >>> > > > > > would >> > >> > > > > > >>> > > > > > > >>>>>>>>>> be the right place to put it if >> we >> > >> think it >> > >> > > > > might >> > >> > > > > > >>> be >> > >> > > > > > >>> > > > > > > >>> semantically >> > >> > > > > > >>> > > > > > > >>>>>>>>>> important to control on a >> > >> table-by-table >> > >> > > > > basis. I >> > >> > > > > > >>> had been >> > >> > > > > > >>> > > > > > > >>>>>>>>>> thinking of it less >> semantically and >> > >> more >> > >> > > > > > >>> practically. In the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> context of a large topology, or >> more >> > >> > > > > generally, a >> > >> > > > > > >>> large >> > >> > > > > > >>> > > > > > > >>> software >> > >> > > > > > >>> > > > > > > >>>>>>>>>> system that contains many >> topologies >> > >> and other >> > >> > > > > > >>> event-driven >> > >> > > > > > >>> > > > > > > >>>>>>>>>> systems, each no-op result >> becomes >> > >> an input >> > >> > > > > that >> > >> > > > > > >>> is destined >> > >> > > > > > >>> > > > > > to >> > >> > > > > > >>> > > > > > > >>>>>>>>>> itself become a no-op result, >> and so >> > >> on, all >> > >> > > > > the >> > >> > > > > > >>> way through >> > >> > > > > > >>> > > > > > > >>> the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> system. Thus, a single pointless >> > >> processing >> > >> > > > > > >>> result becomes >> > >> > > > > > >>> > > > > > > >>>>>>>>>> amplified into a large number of >> > >> pointless >> > >> > > > > > >>> computations, cache >> > >> > > > > > >>> > > > > > > >>>>>>>>>> perturbations, and network and >> disk >> > >> I/O >> > >> > > > > > >>> operations. If you >> > >> > > > > > >>> > > > > > also >> > >> > > > > > >>> > > > > > > >>>>>>>>>> consider operations with fan-out >> > >> implications, >> > >> > > > > > >>> like branching >> > >> > > > > > >>> > > > > > > >>> or >> > >> > > > > > >>> > > > > > > >>>>>>>>>> foreign-key joins, the wasted >> > >> resources are >> > >> > > > > > >>> amplified not just >> > >> > > > > > >>> > > > > > > >>> in >> > >> > > > > > >>> > > > > > > >>>>>>>>>> proportion to the size of the >> > >> system, but the >> > >> > > > > > >>> size of the >> > >> > > > > > >>> > > > > > > >>> system >> > >> > > > > > >>> > > > > > > >>>>>>>>>> times the average fan-out (to >> the >> > >> power of the >> > >> > > > > > >>> number of >> > >> > > > > > >>> > > > > > > >>> fan-out >> > >> > > > > > >>> > > > > > > >>>>>>>>>> operations on the path(s) >> through the >> > >> > > > > system). In >> > >> > > > > > >>> my time >> > >> > > > > > >>> > > > > > > >>>>>>>>>> operating such systems, I've >> > >> observed these >> > >> > > > > > >>> effects to be very >> > >> > > > > > >>> > > > > > > >>>>>>>>>> real, and actually, the system >> and >> > >> use case >> > >> > > > > > >>> doesn't have to be >> > >> > > > > > >>> > > > > > > >>>>>>>>>> very large before the >> amplification >> > >> poses an >> > >> > > > > > >>> existential >> > >> > > > > > >>> > > > > > threat >> > >> > > > > > >>> > > > > > > >>>>>>>>>> to the system as a whole. This >> is >> > >> the basis >> > >> > > > > of my >> > >> > > > > > >>> advocating >> > >> > > > > > >>> > > > > > > >>> for >> > >> > > > > > >>> > > > > > > >>>>>>>>>> a simple behavior change, rather >> > >> than an >> > >> > > > > opt-in >> > >> > > > > > >>> config of any >> > >> > > > > > >>> > > > > > > >>>>>>>>>> kind. It seems like Streams >> should >> > >> "do the >> > >> > > > > right >> > >> > > > > > >>> thing" for >> > >> > > > > > >>> > > > > > the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> majority use case. My theory >> (which >> > >> may be >> > >> > > > > wrong) >> > >> > > > > > >>> is that the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> majority use case is more like >> > >> "relational >> > >> > > > > > >>> queries" than "CEP >> > >> > > > > > >>> > > > > > > >>>>>>>>>> queries". Even if you were >> doing some >> > >> > > > > > >>> event-sensitive >> > >> > > > > > >>> > > > > > > >>>>>>>>>> computation, wouldn't you do >> them as >> > >> Stream >> > >> > > > > > >>> operations (where >> > >> > > > > > >>> > > > > > > >>>>>>>>>> this feature is inapplicable >> > >> anyway)? In >> > >> > > > > keeping >> > >> > > > > > >>> with the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> "practical" perspective, I >> suggested >> > >> the >> > >> > > > > opt-out >> > >> > > > > > >>> config only >> > >> > > > > > >>> > > > > > in >> > >> > > > > > >>> > > > > > > >>>>>>>>>> the (I think unlikely) event >> that >> > >> filtering >> > >> > > > > out >> > >> > > > > > >>> pointless >> > >> > > > > > >>> > > > > > > >>> updates >> > >> > > > > > >>> > > > > > > >>>>>>>>>> actually harms performance. I'd >> also >> > >> be >> > >> > > > > perfectly >> > >> > > > > > >>> fine without >> > >> > > > > > >>> > > > > > > >>>>>>>>>> the opt-out config. I really >> think >> > >> that >> > >> > > > > (because >> > >> > > > > > >>> of the >> > >> > > > > > >>> > > > > > > >>> timestamp >> > >> > > > > > >>> > > > > > > >>>>>>>>>> semantics work already >> underway), >> > >> we're >> > >> > > > > already >> > >> > > > > > >>> pre-fetching >> > >> > > > > > >>> > > > > > > >>> the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> prior result most of the time, >> so >> > >> there would >> > >> > > > > > >>> actually be very >> > >> > > > > > >>> > > > > > > >>>>>>>>>> little extra I/O involved in >> > >> implementing >> > >> > > > > > >>> emit-on-change. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> However, we should consider >> whether >> > >> my >> > >> > > > > experience >> > >> > > > > > >>> is likely to >> > >> > > > > > >>> > > > > > > >>>>>>>>>> be general. Do you have some use >> > >> case in mind >> > >> > > > > for >> > >> > > > > > >>> which you'd >> > >> > > > > > >>> > > > > > > >>>>>>>>>> actually want some KTable >> results to >> > >> be >> > >> > > > > > >>> emit-on-update for >> > >> > > > > > >>> > > > > > > >>>>>>>>>> semantic reasons? Thanks, -John >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020, at 11:02, >> > >> Bruno Cadonna >> > >> > > > > > >>> wrote: Hi >> > >> > > > > > >>> > > > > > > >>> Richard, >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Thank you for the KIP. I agree >> with >> > >> John that >> > >> > > > > we >> > >> > > > > > >>> should focus >> > >> > > > > > >>> > > > > > > >>> on >> > >> > > > > > >>> > > > > > > >>>>>>>>>> the interface and behavior >> change in >> > >> a KIP. We >> > >> > > > > > >>> can discuss the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> implementation later. I am also >> +1 >> > >> for the >> > >> > > > > > >>> survey. I had a >> > >> > > > > > >>> > > > > > > >>>>>>>>>> thought about this. Couldn't we >> > >> consider >> > >> > > > > > >>> emit-on-change to be >> > >> > > > > > >>> > > > > > > >>> one >> > >> > > > > > >>> > > > > > > >>>>>>>>>> config of suppress (like >> > >> `untilWindowCloses`)? >> > >> > > > > > >>> What you >> > >> > > > > > >>> > > > > > > >>>>>>>>>> basically propose is to suppress >> > >> updates if >> > >> > > > > they >> > >> > > > > > >>> do not change >> > >> > > > > > >>> > > > > > > >>>>>>>>>> the result. Considering emit on >> > >> change as a >> > >> > > > > > >>> flavour of >> > >> > > > > > >>> > > > > > suppress >> > >> > > > > > >>> > > > > > > >>>>>>>>>> would be more flexible because >> it >> > >> would >> > >> > > > > specify >> > >> > > > > > >>> the behavior >> > >> > > > > > >>> > > > > > > >>>>>>>>>> locally for a KTable instead of >> > >> globally for >> > >> > > > > all >> > >> > > > > > >>> KTables. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Additionally, specifying the >> > >> behavior in one >> > >> > > > > > >>> place instead of >> > >> > > > > > >>> > > > > > > >>>>>>>>>> multiple places feels more >> intuitive >> > >> and >> > >> > > > > > >>> consistent to me. >> > >> > > > > > >>> > > > > > > >>> Best, >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Bruno On Fri, Jan 24, 2020 at >> 7:49 >> > >> AM John >> > >> > > > > Roesler >> > >> > > > > > >>> > > > > > > >>>>>>>>>> <vvcep...@apache.org <mailto: >> > >> > > > > vvcep...@apache.org>> >> > >> > > > > > >>> wrote: Hi >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Richard, Thanks for picking >> this up! >> > >> I know >> > >> > > > > of at >> > >> > > > > > >>> least one >> > >> > > > > > >>> > > > > > > >>> large >> > >> > > > > > >>> > > > > > > >>>>>>>>>> community member for which this >> > >> feature is >> > >> > > > > > >>> absolutely >> > >> > > > > > >>> > > > > > > >>> essential. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> If I understand your two >> options, it >> > >> seems >> > >> > > > > like >> > >> > > > > > >>> the proposal >> > >> > > > > > >>> > > > > > is >> > >> > > > > > >>> > > > > > > >>>>>>>>>> to implement it as a behavior >> change >> > >> > > > > regardless, >> > >> > > > > > >>> and the >> > >> > > > > > >>> > > > > > > >>> question >> > >> > > > > > >>> > > > > > > >>>>>>>>>> is whether to provide an opt-out >> > >> config or >> > >> > > > > not. >> > >> > > > > > >>> Given that any >> > >> > > > > > >>> > > > > > > >>>>>>>>>> implementation of this feature >> would >> > >> have some >> > >> > > > > > >>> performance >> > >> > > > > > >>> > > > > > > >>> impact >> > >> > > > > > >>> > > > > > > >>>>>>>>>> under some workloads, and also >> that >> > >> we don't >> > >> > > > > know >> > >> > > > > > >>> if anyone >> > >> > > > > > >>> > > > > > > >>>>>>>>>> really depends on >> emit-on-update time >> > >> > > > > semantics, >> > >> > > > > > >>> it seems like >> > >> > > > > > >>> > > > > > > >>> we >> > >> > > > > > >>> > > > > > > >>>>>>>>>> should propose to add an opt-out >> > >> config. Can >> > >> > > > > you >> > >> > > > > > >>> update the >> > >> > > > > > >>> > > > > > KIP >> > >> > > > > > >>> > > > > > > >>>>>>>>>> to mention the exact config key >> and >> > >> value(s) >> > >> > > > > > >>> you'd propose? >> > >> > > > > > >>> > > > > > > >>> Just >> > >> > > > > > >>> > > > > > > >>>>>>>>>> to move the discussion forward, >> maybe >> > >> > > > > something >> > >> > > > > > >>> like: emit.on >> > >> > > > > > >>> > > > > > > >>> := >> > >> > > > > > >>> > > > > > > >>>>>>>>>> change|update with the new >> default >> > >> being >> > >> > > > > "change" >> > >> > > > > > >>> Thanks for >> > >> > > > > > >>> > > > > > > >>>>>>>>>> pointing out the timestamp >> issue in >> > >> > > > > particular. I >> > >> > > > > > >>> agree that >> > >> > > > > > >>> > > > > > if >> > >> > > > > > >>> > > > > > > >>>>>>>>>> we discard the latter update as >> a >> > >> no-op, then >> > >> > > > > we >> > >> > > > > > >>> also have to >> > >> > > > > > >>> > > > > > > >>>>>>>>>> discard its timestamp >> (obviously, we >> > >> don't >> > >> > > > > > >>> forward the >> > >> > > > > > >>> > > > > > > >>> timestamp >> > >> > > > > > >>> > > > > > > >>>>>>>>>> update, as that's the whole >> point, >> > >> but we also >> > >> > > > > > >>> can't update >> > >> > > > > > >>> > > > > > the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> timestamp in the store, as the >> store >> > >> must >> > >> > > > > remain >> > >> > > > > > >>> consistent >> > >> > > > > > >>> > > > > > > >>> with >> > >> > > > > > >>> > > > > > > >>>>>>>>>> what has been emitted). I have >> to >> > >> confess >> > >> > > > > that I >> > >> > > > > > >>> disagree with >> > >> > > > > > >>> > > > > > > >>>>>>>>>> your implementation proposal, >> but >> > >> it's also >> > >> > > > > not >> > >> > > > > > >>> necessary to >> > >> > > > > > >>> > > > > > > >>>>>>>>>> discuss implementation in the >> KIP. >> > >> Maybe it >> > >> > > > > would >> > >> > > > > > >>> be less >> > >> > > > > > >>> > > > > > > >>>>>>>>>> controversial if you just drop >> that >> > >> section >> > >> > > > > for >> > >> > > > > > >>> now, so that >> > >> > > > > > >>> > > > > > > >>> the >> > >> > > > > > >>> > > > > > > >>>>>>>>>> KIP discussion can focus on the >> > >> behavior >> > >> > > > > change >> > >> > > > > > >>> and config. >> > >> > > > > > >>> > > > > > > >>> Just >> > >> > > > > > >>> > > > > > > >>>>>>>>>> for reference, there is some >> > >> research into >> > >> > > > > this >> > >> > > > > > >>> domain. For >> > >> > > > > > >>> > > > > > > >>>>>>>>>> example, see the "Report" >> section >> > >> (3.2.3) of >> > >> > > > > the >> > >> > > > > > >>> SECRET paper: >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>> >> > >> > > > > > >>> > > > > > >> > >> > > > > > >>> >> > >> > > > > >> > >> >> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeop >> > >> > > > > > >>> > > > > > > > le.csail.mit.edu >> > >> > > > > > >>> > > > > > > >>>>>> >> > >> > > > > > >>> >> %2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&data >> > >> > > > > > >>> > > > > > > > =02%7C01%7CThomas.Becker%40tivo.com >> > >> > > > > > >>> > > > > > > >>>>>> >> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7 >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > >>>>>> >> > >> > > > > > >>> > > > > > > >>> >> > >> > > > > > >>> > > > > > >> > >> > > > > > >>> >> > >> > > > > >> > >> >> Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637163491160859282&sdata >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> >> > >> =4dSGIS8jNPAPP7B48r9e%2BUgFh3WdmzVyXhyT63eP8dI%3D&reserved=0 >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > > It might help to round out the proposal >> if >> > >> you take a >> > >> > > > > brief >> > >> > > > > > >>> > > > > > > >>> survey of >> > >> > > > > > >>> > > > > > > >>>>>>>>>> the behaviors of other systems, >> > >> along with >> > >> > > > > pros >> > >> > > > > > >>> and cons if >> > >> > > > > > >>> > > > > > any >> > >> > > > > > >>> > > > > > > >>>>>>>>>> are reported. Thanks, -John >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 10, 2020, at 22:27, >> > >> Richard Yu >> > >> > > > > wrote: >> > >> > > > > > >>> Hi >> > >> > > > > > >>> > > > > > everybody! >> > >> > > > > > >>> > > > > > > >>>>>>>>>> I'd like to propose a change >> that we >> > >> probably >> > >> > > > > > >>> should've added >> > >> > > > > > >>> > > > > > > >>> for >> > >> > > > > > >>> > > > > > > >>>>>>>>>> a long time now. The key >> benefit of >> > >> this KIP >> > >> > > > > > >>> would be reduced >> > >> > > > > > >>> > > > > > > >>>>>>>>>> traffic in Kafka Streams since >> a lot >> > >> of no-op >> > >> > > > > > >>> results would no >> > >> > > > > > >>> > > > > > > >>>>>>>>>> longer be sent downstream. Here >> is >> > >> the KIP for >> > >> > > > > > >>> reference. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>> >> > >> > > > > > >>> > > > > > >> > >> > > > > > >>> >> > >> > > > > >> > >> >> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwi >> > >> > > > > > >>> > > > > > > > ki.apache.org >> > >> > > > > > >>> > > > > > > >>>>>> >> > >> > > > > > >>> >> %2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > >>>>>> >> > >> > > > > > >>> > > > > > > >>> >> > >> > > > > > >>> > > > > > >> > >> > > > > > >>> >> > >> > > > > >> > >> >> %2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&data=02%7C01%7CThom >> > >> > > > > > >>> > > > > > > > as.Becker%40tivo.com >> > >> > > > > > >>> > > > > > > >>>>>> >> > >> > > > > %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7Cd05b7c6912014c >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > >>>>>> >> > >> > > > > > >>> > > > > > > >>> >> > >> > > > > > >>> > > > > > >> > >> > > > > > >>> >> > >> > > > > >> > >> >> 0db45d7f1dcc227e4d%7C1%7C1%7C637163491160869277&sdata=zYpCSFOsyN4%2B >> > >> > > > > > >>> > > > > > > > >> > >> 4rKRZBQ%2FZvcGQ4EINR9Qm6PLsB7EKrc%3D&reserved=0 >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > > Currently, I seek to formalize our >> approach >> > >> for this >> > >> > > > > KIP >> > >> > > > > > >>> first >> > >> > > > > > >>> > > > > > > >>> before >> > >> > > > > > >>> > > > > > > >>>>>>>>>> we determine concrete API >> additions / >> > >> > > > > > >>> configurations. Some >> > >> > > > > > >>> > > > > > > >>>>>>>>>> configs might warrant adding, >> whiles >> > >> others >> > >> > > > > are >> > >> > > > > > >>> not necessary >> > >> > > > > > >>> > > > > > > >>>>>>>>>> since adding them would only >> increase >> > >> > > > > complexity >> > >> > > > > > >>> of Kafka >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Streams. Cheers, Richard >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> ________________________________ >> > >> This email >> > >> > > > > and >> > >> > > > > > >>> any >> > >> > > > > > >>> > > > > > attachments >> > >> > > > > > >>> > > > > > > >>>>>>>>>> may contain confidential and >> > >> privileged >> > >> > > > > material >> > >> > > > > > >>> for the sole >> > >> > > > > > >>> > > > > > > >>> use >> > >> > > > > > >>> > > > > > > >>>>>>>>>> of the intended recipient. Any >> > >> review, >> > >> > > > > copying, or >> > >> > > > > > >>> > > > > > distribution >> > >> > > > > > >>> > > > > > > >>>>>>>>>> of this email (or any >> attachments) >> > >> by others >> > >> > > > > is >> > >> > > > > > >>> prohibited. If >> > >> > > > > > >>> > > > > > > >>>>>>>>>> you are not the intended >> recipient, >> > >> please >> > >> > > > > > >>> contact the sender >> > >> > > > > > >>> > > > > > > >>>>>>>>>> immediately and permanently >> delete >> > >> this email >> > >> > > > > and >> > >> > > > > > >>> any >> > >> > > > > > >>> > > > > > > >>>>>>>>>> attachments. No employee or >> agent of >> > >> TiVo is >> > >> > > > > > >>> authorized to >> > >> > > > > > >>> > > > > > > >>>>>>>>>> conclude any binding agreement >> on >> > >> behalf of >> > >> > > > > TiVo >> > >> > > > > > >>> by email. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Binding agreements with TiVo may >> > >> only be made >> > >> > > > > by >> > >> > > > > > >>> a signed >> > >> > > > > > >>> > > > > > > >>> written >> > >> > > > > > >>> > > > > > > >>>>>>>>>> agreement. -- *Tommy Becker* >> > >> *Principal >> > >> > > > > Engineer * >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> *Personalized Content Discovery* >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> *O* +1 919.460.4747 *tivo.com* >> < >> > >> > > > > > >>> http://www.tivo.com/> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>>> This email and any attachments >> may >> > >> contain >> > >> > > > > > >>> confidential and >> > >> > > > > > >>> > > > > > > >>>>>>>>>> privileged material for the >> sole use >> > >> of the >> > >> > > > > > >>> intended >> > >> > > > > > >>> > > > > > recipient. >> > >> > > > > > >>> > > > > > > >>>>>>>>>> Any review, copying, or >> distribution >> > >> of this >> > >> > > > > > >>> email (or any >> > >> > > > > > >>> > > > > > > >>>>>>>>>> attachments) by others is >> > >> prohibited. If you >> > >> > > > > are >> > >> > > > > > >>> not the >> > >> > > > > > >>> > > > > > > >>> intended >> > >> > > > > > >>> > > > > > > >>>>>>>>>> recipient, please contact the >> sender >> > >> > > > > immediately >> > >> > > > > > >>> and >> > >> > > > > > >>> > > > > > > >>> permanently >> > >> > > > > > >>> > > > > > > >>>>>>>>>> delete this email and any >> > >> attachments. No >> > >> > > > > > >>> employee or agent of >> > >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo is authorized to conclude >> any >> > >> binding >> > >> > > > > > >>> agreement on behalf >> > >> > > > > > >>> > > > > > > >>> of >> > >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo by email. Binding >> agreements >> > >> with TiVo >> > >> > > > > may >> > >> > > > > > >>> only be made >> > >> > > > > > >>> > > > > > > >>> by a >> > >> > > > > > >>> > > > > > > >>>>>>>>>> signed written agreement. >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>> -- >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>> *Tommy Becker* /Principal >> Engineer / >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>> /Personalized Content Discovery/ >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>> *O* +1 919.460.4747 *tivo.com* < >> > >> > > > > > >>> http://www.tivo.com/> >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>> >> > >> > > > > > >>> > > > > > >> > >> > > > > > >>> >> > >> > > > > >> > >> >> ---------------------------------------------------------------------- >> > >> > > > > > >>> > > > > > > >>>>>>> >> > >> > > > > > >>> > > > > > > >>>>>> >> > >> > > > > > >>> > > > > > > >>>>> >> > >> > > > > > >>> > > > > > > >>>> >> > >> > > > > > >>> > > > > > > >>> >> > >> > > > > > >>> > > > > > > >> >> > >> > > > > > >>> > > > > > > > >> > >> > > > > > >>> > > > > > > >> > >> > > > > > >>> > > > > > > >> > >> > > > > > >>> > > > > > > Attachments: >> > >> > > > > > >>> > > > > > > * signature.asc >> > >> > > > > > >>> > > > > > >> > >> > > > > > >>> > > > >> > >> > > > > > >>> > >> > >> > > > > > >>> >> > >> > > > > > >> >> > >> > > > > > >> > >> > > > > >> > >> > > >> > >> > > >> > >> >> > > >> > >> >