w.r.t. new metric, there is already droppedRecordsSensor which logs: "Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]",
It seems we should introduce another metric which records the skipped (duplicate) values. This way, it is easier to observe the effect when this feature is in production. Cheers > > ---------- Forwarded message --------- > From: Richard Yu <yohan.richard...@gmail.com> > Date: Sun, Feb 2, 2020 at 10:21 AM > Subject: Re: [KAFKA-557] Add emit on change support for Kafka Streams > To: <dev@kafka.apache.org> > > > Hi Bruno, > > Thanks for the reply! > > I've included some basic description on the reporting strategies in the > KIP (I might include more information on that later). I've also worked to > add some more details on behavior changes as well as rejected alternatives. > Hope it will help facilitate the process. :) > > I just want to add something on a relevant topic: we need metrics. I think > this should also be included with this change for a number of reasons. For > some users, they already know that their Streams application is > experiencing a lot of no-op traffic. But that doesn't mean other users are > aware of the same problem. Also, if we are dropping no-ops, then we might > as well record exactly how many we have dropped out of how many total > operations we've done. Therefore, I argue that we also include some metric > which records this data and reports it to the user. > > Beyond that, let me know if we might need to address anything else. :) > > Cheers, > Richard > > > > On Sun, Feb 2, 2020 at 3:57 AM Bruno Cadonna <br...@confluent.io> wrote: > >> Hi, >> >> Richard, thank you for the updated KIP. >> >> Regarding your question about the survey, IMO the survey should >> contain a brief description of the emit (report) strategy of each >> system and a list of pros and cons. I personally would be interested >> what emit strategy Flink uses. >> >> I have a few comments about the KIP and its documentation: >> >> KIP-specific: >> >> 1. I agree with Matthias that we should also include aggregations >> where neither the value nor the timestamp change. >> >> 2. Regarding Matthias' concerns about the dependency of the result of >> a stateless operation on the materialization, I have two >> questions/observations: >> a) Is the result not already dependent on the materialization since in >> case of materlized results the cache would not emit all records for >> the same key downstream? >> b) Emitting more records from a non-materialized operation should not >> affect the semantics because we are emitting changelog records. The >> next time these changelog records are materialized the result should >> be correct. Right? However, I see the issue when a KTable is >> transformed to a KStream with `toStream()`. The stream would then >> differ depending on the materialization. But that seems to me an issue >> that is not specific to the emit strategy and that we already have >> when we use a cache, don't we? Is it even an issue? >> >> 3. With out-of-order records we would emit more records. Let's assume >> the following records >> K, V, T3 >> K, V, T1 >> K, V, T2 >> with T1 < T2 < T3 >> >> A KTable that reads this records in this order, would emit (assuming no >> cache) >> K, V, T3 >> K, V, T1 >> >> The record at T3 is emitted because it is the first. >> The record at T1 is emitted because T1 < T3. >> The record at T2 is not emitted because T2 >= T1 >> Correct? >> >> Richard, it would be good to add a section about out-of-order records >> to the KIP. >> >> >> Documentation-specific: >> >> 1. I agree with John on his feedback on the KIP document. It is really >> important to clearly state what this KIP will improve and what not, >> otherwise it becomes hard to vote on the KIP and to decide whether the >> KIP is fully implemented or not. >> >> 2. Could you please already state in the "Motivation" section of the >> KIP where you list the current emit strategies that the emit strategy >> only applies to operations that involve a KTable? Probably for most it >> will be clear what you mean, but IMO KIPs should be easily >> approachable and it doesn't cost much to add this information. >> >> 3. Could you please list the rejected suppress extension in the >> "Rejected Alternatives" section? >> >> 4. In the discussion about materializing results of stateless >> operations, could you please add that those stateless operations are >> on KTables? IMO adding this information makes the KIP easier >> approachable by people that are not that familiar with the matter. >> Best, >> Bruno >> >> On Sat, Feb 1, 2020 at 11:33 PM Richard Yu <yohan.richard...@gmail.com> >> wrote: >> > >> > Hi all, >> > >> > You all have good points! >> > >> > I took a look, and I thought it over. After some thinking, it appears >> the >> > main point of contention is whether or not we can support emit on change >> > for stateless operations. I agree with John in that we probably should >> > restrict ourselves to materialized KTables: >> > >> > 1. Loading any prior results would incur performance hits, regardless >> > how much one looks at it. >> > 2. Matthias's approach does have its merits, but my primary concern >> is >> > that we are effectively performing the same operation twice. And some >> > stateless operations can be quite expensive, so if performed twice, >> can >> > incur considerable performance hits. The idea isn't bad, but it will >> need >> > some work. >> > 3. About stateless operations: Kafka Streams by design was never >> *intended >> > *to have stateless operations load prior results. After all, it >> appears >> > to me that only stateful operations should have that capability. If >> we also >> > load prior results for stateless operations, then we will have >> considerable >> > redundancy. (If we load something similar to hash codes, then that >> is a >> > different matter, but we already covered that hash codes is >> unreliable for >> > comparisons). >> > >> > Indeed, there is a discrepancy if stateless operations don't drop no-ops >> > while stateful ones do. But I think that we shouldn't restrict >> ourselves to >> > such a perspective. After all, performance should be a priority over >> > resolving such a discrepancy. >> > >> > This is just my point of view, but some of my points might need work. >> Let >> > me know what you think. :) >> > >> > Cheers, >> > Richard >> > >> > On Fri, Jan 31, 2020 at 5:47 PM Matthias J. Sax <matth...@confluent.io> >> > wrote: >> > >> > > I did not read the updated KIP itself yet. However, I do have concerns >> > > about the idea to have different behavior for different operators. >> > > >> > > >> > > (1) If there is a KStream aggregation, for which neither the >> > > aggregation-value nor the result timestamp changes, there is no reason >> > > to emit if we do emit-on-change semantics. Hence, why would be need to >> > > stay on an emit-on-update model? >> > > >> > > >> > > (2) If a KTable is materialized into a local state store or not, is >> > > semantically irrelevant and an implementation detail IMHO. Hence, I >> > > think we need to ensure that we have the same behavior for both cases: >> > > >> > > Example: >> > > >> > > stream.groupByKey() >> > > .count() >> > > .filter(...) >> > > .toStream().to(...); >> > > >> > > stream.grouyByKey() >> > > .count() >> > > .filter(..., Materialized.as("filted-table")) >> > > .toStream().to(...); >> > > >> > > It would be rather confusion for users if both would have a different >> > > result. >> > > >> > > However, I actually believe we can achieve emit-on-change semantics >> for >> > > both cases. Note that internally, the output of `count()` is a >> > > `<key,change<newValue,oldValue>>` changelog. Atm, we don't enable >> "emit >> > > old value" for all cases, but I think if we always enable it if there >> is >> > > no downstream state store, the downstream operator can actually >> > > recompute its "current result" (that would otherwise be in the store) >> > > based on the old value, the new result based on the new value, compare >> > > old and new result and make the correct decision to emit or not. >> > > >> > > However, we should verify that this really works as expected before we >> > > decide on this KIP. >> > > >> > > >> > > (3) I think we also need to think a little bit about the handling of >> > > out-of-order data. Atm, I don't see any issue in particular, but it >> > > would be great if everybody could think about out-of-order handling >> and >> > > if/how it affects emit-on-change behavior. Also note, that KIP-280 is >> > > allowing a timestamp-based compaction that might allow us to fix a >> > > potential issue (in case there is one). >> > > >> > > >> > > -Matthias >> > > >> > > >> > > On 1/31/20 5:30 PM, John Roesler wrote: >> > > > Hi Thomas and yuzhihong, >> > > > >> > > > That’s an interesting idea. Can you help think of a use case that >> isn’t >> > > also served by filtering or mapping beforehand? >> > > > >> > > > Thanks for helping to design this feature! >> > > > -John >> > > > >> > > > On Fri, Jan 31, 2020, at 18:56, yuzhih...@gmail.com wrote: >> > > >> I think this is good idea. >> > > >> >> > > >>> On Jan 31, 2020, at 4:49 PM, Thomas Becker < >> thomas.bec...@tivo.com> >> > > wrote: >> > > >>> >> > > >>> How do folks feel about allowing the mechanism by which no-ops >> are >> > > detected to be pluggable? Meaning use something like a hash by >> default, but >> > > you could optionally provide an implementation of something to use >> instead, >> > > like a ChangeDetector. This could be useful for example to ignore >> changes >> > > to certain fields, which may not be relevant to the operation being >> > > performed. >> > > >>> ________________________________ >> > > >>> From: John Roesler <vvcep...@apache.org> >> > > >>> Sent: Friday, January 31, 2020 4:51 PM >> > > >>> To: dev@kafka.apache.org <dev@kafka.apache.org> >> > > >>> Subject: Re: [KAFKA-557] Add emit on change support for Kafka >> Streams >> > > >>> >> > > >>> [EXTERNAL EMAIL] Attention: This email was sent from outside >> TiVo. DO >> > > NOT CLICK any links or attachments unless you expected them. >> > > >>> ________________________________ >> > > >>> >> > > >>> >> > > >>> Hello all, >> > > >>> >> > > >>> Sorry for my silence. It seems like we are getting close to >> consensus. >> > > >>> Hopefully, we could move to a vote soon! >> > > >>> >> > > >>> All of the reasoning from Matthias and Bruno around timestamp is >> > > compelling. I >> > > >>> would be strongly in favor of stating a few things very clearly >> in the >> > > KIP: >> > > >>> 1. Streams will drop no-op updates only for KTable operations. >> > > >>> >> > > >>> That is, we won't make any changes to KStream aggregations at >> the >> > > moment. It >> > > >>> does seem like we can potentially revisit the time semantics of >> that >> > > operation >> > > >>> in the future, but we don't need to do it now. >> > > >>> >> > > >>> On the other hand, the proposed semantics for KTable timestamps >> > > (marking the >> > > >>> beginning of the validity of that record) makes sense to me. >> > > >>> >> > > >>> 2. Streams will only drop no-op updates for _stateful_ KTable >> > > operations. >> > > >>> >> > > >>> We don't want to add a hard guarantee that Streams will _never_ >> emit >> > > a no-op >> > > >>> table update because it would require adding state to otherwise >> > > stateless >> > > >>> operations. If someone is really concerned about a particular >> > > stateless >> > > >>> operation producing a lot of no-op results, all they have to do >> is >> > > >>> materialize it, and Streams would automatically drop the no-ops. >> > > >>> >> > > >>> Additionally, I'm +1 on not adding an opt-out at this time. >> > > >>> >> > > >>> Regarding the KIP itself, I would clean it up a bit before >> calling for >> > > a vote. >> > > >>> There is a lot of "discussion"-type language there, which is very >> > > natural to >> > > >>> read, but makes it a bit hard to see what _exactly_ the kip is >> > > proposing. >> > > >>> >> > > >>> Richard, would you mind just making the "proposed behavior >> change" a >> > > simple and >> > > >>> succinct list of bullet points? I.e., please drop glue phrases >> like >> > > "there has >> > > >>> been some discussion" or "possibly we could do X". For the final >> > > version of the >> > > >>> KIP, it should just say, "Streams will do X, Streams will do Y". >> Feel >> > > free to >> > > >>> add an elaboration section to explain more about what X and Y >> mean, >> > > but we don't >> > > >>> need to talk about possibilities or alternatives except in the >> > > "rejected >> > > >>> alternatives" section. >> > > >>> >> > > >>> Accordingly, can you also move the options you presented in the >> intro >> > > to the >> > > >>> "rejected alternatives" section and only mention the final >> proposal >> > > itself? >> > > >>> >> > > >>> This just really helps reviewers to know what they are voting >> for, and >> > > it helps >> > > >>> everyone after the fact when they are trying to get clarity on >> what >> > > exactly the >> > > >>> proposal is, versus all the things it could have been. >> > > >>> >> > > >>> Thanks, >> > > >>> -John >> > > >>> >> > > >>> >> > > >>>> On Mon, Jan 27, 2020, at 18:14, Richard Yu wrote: >> > > >>>> Hello to all, >> > > >>>> >> > > >>>> I've finished making some initial modifications to the KIP. >> > > >>>> I have decided to keep the implementation section in the KIP for >> > > >>>> record-keeping purposes. >> > > >>>> >> > > >>>> For now, we should focus on only the proposed behavior changes >> > > instead. >> > > >>>> >> > > >>>> See if you have any comments! >> > > >>>> >> > > >>>> Cheers, >> > > >>>> Richard >> > > >>>> >> > > >>>> On Sat, Jan 25, 2020 at 11:12 AM Richard Yu < >> > > yohan.richard...@gmail.com> >> > > >>>> wrote: >> > > >>>> >> > > >>>>> Hi all, >> > > >>>>> >> > > >>>>> Thanks for all the discussion! >> > > >>>>> >> > > >>>>> @John and @Bruno I will survey other possible systems and see >> what I >> > > can >> > > >>>>> do. >> > > >>>>> Just a question, by systems, I suppose you would mean the pros >> and >> > > cons of >> > > >>>>> different reporting strategies? >> > > >>>>> >> > > >>>>> I'm not completely certain on this point, so it would be great >> if >> > > you can >> > > >>>>> clarify on that. >> > > >>>>> >> > > >>>>> So here's what I got from all the discussion so far: >> > > >>>>> >> > > >>>>> - Since both Matthias and John seems to have come to a >> consensus on >> > > >>>>> this, then we will go for an all-round behavorial change for >> > > KTables. After >> > > >>>>> some thought, I decided that for now, an opt-out config will >> not >> > > be added. >> > > >>>>> As John have pointed out, no-op changes tend to explode >> further >> > > down the >> > > >>>>> topology as they are forwarded to more and more processor >> nodes >> > > downstream. >> > > >>>>> - About using hash codes, after some explanation from John, it >> > > looks >> > > >>>>> like hash codes might not be as ideal (for implementation). >> For >> > > now, we >> > > >>>>> will omit that detail, and save it for the PR. >> > > >>>>> - @Bruno You do have valid concerns. Though, I am not >> completely >> > > >>>>> certain if we want to do emit-on-change only for materialized >> > > KTables. I >> > > >>>>> will put it down in the KIP regardless. >> > > >>>>> >> > > >>>>> I will do my best to address all points raised so far on the >> > > discussion. >> > > >>>>> Hope we could keep this going! >> > > >>>>> >> > > >>>>> Best, >> > > >>>>> Richard >> > > >>>>> >> > > >>>>>> On Fri, Jan 24, 2020 at 6:07 PM Bruno Cadonna < >> br...@confluent.io> >> > > wrote: >> > > >>>>> >> > > >>>>>> Thank you Matthias for the use cases! >> > > >>>>>> >> > > >>>>>> Looking at both use cases, I think you need to elaborate on >> them in >> > > >>>>>> the KIP, Richard. >> > > >>>>>> >> > > >>>>>> Emit from plain KTable: >> > > >>>>>> I agree with Matthias that the lower timestamp makes sense >> because >> > > it >> > > >>>>>> marks the start of the validity of the record. Idempotent >> records >> > > with >> > > >>>>>> a higher timestamp can be safely ignored. A corner case that I >> > > >>>>>> discussed with Matthias offline is when we do not materialize a >> > > KTable >> > > >>>>>> due to optimization. Then we cannot avoid the idempotent >> records >> > > >>>>>> because we do not keep the first record with the lower >> timestamp to >> > > >>>>>> compare to. >> > > >>>>>> >> > > >>>>>> Emit from KTable with aggregations: >> > > >>>>>> If we specify that an aggregation result should have the >> highest >> > > >>>>>> timestamp of the records that participated in the aggregation, >> we >> > > >>>>>> cannot ignore any idempotent records. Admittedly, the result >> of an >> > > >>>>>> aggregation usually changes, but there are aggregations where >> the >> > > >>>>>> result may not change like min and max, or sum when the >> incoming >> > > >>>>>> records have a value of zero. In those cases, we could benefit >> of >> > > the >> > > >>>>>> emit on change, but only if we define the semantics of the >> > > >>>>>> aggregations to not use the highest timestamp of the >> participating >> > > >>>>>> records for the result. In Kafka Streams, we do not have min, >> max, >> > > and >> > > >>>>>> sum as explicit aggregations, but we need to provide an API to >> > > define >> > > >>>>>> what timestamp should be used for the result of an aggregation >> if we >> > > >>>>>> want to go down this path. >> > > >>>>>> >> > > >>>>>> All of this does not block this KIP and I just wanted to put >> this >> > > >>>>>> aspects up for discussion. The KIP can limit itself to emit >> from >> > > >>>>>> materialized KTables. However, the limits should be explicitly >> > > stated >> > > >>>>>> in the KIP. >> > > >>>>>> >> > > >>>>>> Best, >> > > >>>>>> Bruno >> > > >>>>>> >> > > >>>>>> >> > > >>>>>> >> > > >>>>>> On Fri, Jan 24, 2020 at 10:58 AM Matthias J. Sax < >> > > matth...@confluent.io> >> > > >>>>>> wrote: >> > > >>>>>>> >> > > >>>>>>> IMHO, the question about semantics depends on the use case, in >> > > >>>>>>> particular on the origin of a KTable. >> > > >>>>>>> >> > > >>>>>>> If there is a changlog topic that one reads directly into a >> KTable, >> > > >>>>>>> emit-on-change does actually make sense, because the timestamp >> > > indicates >> > > >>>>>>> _when_ the update was _effective_. For this case, it is >> > > semantically >> > > >>>>>>> sound to _not_ update the timestamp in the store, because the >> > > second >> > > >>>>>>> update is actually idempotent and advancing the timestamp is >> not >> > > ideal >> > > >>>>>>> (one could even consider it to be wrong to advance the >> timestamp) >> > > >>>>>>> because the "valid time" of the record pair did not change. >> > > >>>>>>> >> > > >>>>>>> This reasoning also applies to KTable-KTable joins. >> > > >>>>>>> >> > > >>>>>>> However, if the KTable is the result of an aggregation, I >> think >> > > >>>>>>> emit-on-update is more natural, because the timestamp >> reflects the >> > > >>>>>>> _last_ time (ie, highest timestamp) of all input records the >> > > contributed >> > > >>>>>>> to the result. Hence, updating the timestamp and emitting a >> new >> > > record >> > > >>>>>>> actually sounds correct to me. This applies to windowed and >> > > non-windowed >> > > >>>>>>> aggregations IMHO. >> > > >>>>>>> >> > > >>>>>>> However, considering the argument that the timestamp should >> not be >> > > >>>>>>> update in the first case in the store to begin with, both >> cases are >> > > >>>>>>> actually the same, and both can be modeled as emit-on-change: >> if a >> > > >>>>>>> `table()` operator does not update the timestamp if the value >> does >> > > not >> > > >>>>>>> change, there is _no_ change and thus nothing is emitted. At >> the >> > > same >> > > >>>>>>> time, if an aggregation operator does update the timestamp >> (even >> > > if the >> > > >>>>>>> value does not change) there _is_ a change and we emit. >> > > >>>>>>> >> > > >>>>>>> Note that handling out-of-order data for aggregations would >> also >> > > work >> > > >>>>>>> seamlessly with this approach -- for out-of-order records, the >> > > timestamp >> > > >>>>>>> does never change, and thus, we only emit if the result itself >> > > changes. >> > > >>>>>>> >> > > >>>>>>> Therefore, I would argue that we might not even need any >> config, >> > > because >> > > >>>>>>> the emit-on-change behavior is just correct and reduced the >> > > downstream >> > > >>>>>>> load, while our current behavior is not ideal (even if it's >> also >> > > >>>>>> correct). >> > > >>>>>>> >> > > >>>>>>> Thoughts? >> > > >>>>>>> >> > > >>>>>>> -Matthias >> > > >>>>>>> >> > > >>>>>>> On 1/24/20 9:37 AM, John Roesler wrote: >> > > >>>>>>>> Hi Bruno, >> > > >>>>>>>> >> > > >>>>>>>> Thanks for that idea. I hadn't considered that >> > > >>>>>>>> option before, and it does seem like that would be >> > > >>>>>>>> the right place to put it if we think it might be >> > > >>>>>>>> semantically important to control on a >> > > >>>>>>>> table-by-table basis. >> > > >>>>>>>> >> > > >>>>>>>> I had been thinking of it less semantically and >> > > >>>>>>>> more practically. In the context of a large >> > > >>>>>>>> topology, or more generally, a large software >> > > >>>>>>>> system that contains many topologies and other >> > > >>>>>>>> event-driven systems, each no-op result becomes an >> > > >>>>>>>> input that is destined to itself become a no-op >> > > >>>>>>>> result, and so on, all the way through the system. >> > > >>>>>>>> Thus, a single pointless processing result becomes >> > > >>>>>>>> amplified into a large number of pointless >> > > >>>>>>>> computations, cache perturbations, and network >> > > >>>>>>>> and disk I/O operations. If you also consider >> > > >>>>>>>> operations with fan-out implications, like >> > > >>>>>>>> branching or foreign-key joins, the wasted >> > > >>>>>>>> resources are amplified not just in proportion to >> > > >>>>>>>> the size of the system, but the size of the system >> > > >>>>>>>> times the average fan-out (to the power of the >> > > >>>>>>>> number of fan-out operations on the path(s) >> > > >>>>>>>> through the system). >> > > >>>>>>>> >> > > >>>>>>>> In my time operating such systems, I've observed >> > > >>>>>>>> these effects to be very real, and actually, the >> > > >>>>>>>> system and use case doesn't have to be very large >> > > >>>>>>>> before the amplification poses an existential >> > > >>>>>>>> threat to the system as a whole. >> > > >>>>>>>> >> > > >>>>>>>> This is the basis of my advocating for a simple >> > > >>>>>>>> behavior change, rather than an opt-in config of >> > > >>>>>>>> any kind. It seems like Streams should "do the >> > > >>>>>>>> right thing" for the majority use case. My theory >> > > >>>>>>>> (which may be wrong) is that the majority use case >> > > >>>>>>>> is more like "relational queries" than "CEP >> > > >>>>>>>> queries". Even if you were doing some >> > > >>>>>>>> event-sensitive computation, wouldn't you do them >> > > >>>>>>>> as Stream operations (where this feature is >> > > >>>>>>>> inapplicable anyway)? >> > > >>>>>>>> >> > > >>>>>>>> In keeping with the "practical" perspective, I >> > > >>>>>>>> suggested the opt-out config only in the (I think >> > > >>>>>>>> unlikely) event that filtering out pointless >> > > >>>>>>>> updates actually harms performance. I'd also be >> > > >>>>>>>> perfectly fine without the opt-out config. I >> > > >>>>>>>> really think that (because of the timestamp >> > > >>>>>>>> semantics work already underway), we're already >> > > >>>>>>>> pre-fetching the prior result most of the time, so >> > > >>>>>>>> there would actually be very little extra I/O >> > > >>>>>>>> involved in implementing emit-on-change. >> > > >>>>>>>> >> > > >>>>>>>> However, we should consider whether my experience >> > > >>>>>>>> is likely to be general. Do you have some use >> > > >>>>>>>> case in mind for which you'd actually want some >> > > >>>>>>>> KTable results to be emit-on-update for semantic >> > > >>>>>>>> reasons? >> > > >>>>>>>> >> > > >>>>>>>> Thanks, >> > > >>>>>>>> -John >> > > >>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>> On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna wrote: >> > > >>>>>>>>> Hi Richard, >> > > >>>>>>>>> >> > > >>>>>>>>> Thank you for the KIP. >> > > >>>>>>>>> >> > > >>>>>>>>> I agree with John that we should focus on the interface and >> > > behavior >> > > >>>>>>>>> change in a KIP. We can discuss the implementation later. >> > > >>>>>>>>> >> > > >>>>>>>>> I am also +1 for the survey. >> > > >>>>>>>>> >> > > >>>>>>>>> I had a thought about this. Couldn't we consider >> emit-on-change >> > > to be >> > > >>>>>>>>> one config of suppress (like `untilWindowCloses`)? What you >> > > basically >> > > >>>>>>>>> propose is to suppress updates if they do not change the >> result. >> > > >>>>>>>>> Considering emit on change as a flavour of suppress would >> be more >> > > >>>>>>>>> flexible because it would specify the behavior locally for a >> > > KTable >> > > >>>>>>>>> instead of globally for all KTables. Additionally, >> specifying the >> > > >>>>>>>>> behavior in one place instead of multiple places feels more >> > > intuitive >> > > >>>>>>>>> and consistent to me. >> > > >>>>>>>>> >> > > >>>>>>>>> Best, >> > > >>>>>>>>> Bruno >> > > >>>>>>>>> >> > > >>>>>>>>> On Fri, Jan 24, 2020 at 7:49 AM John Roesler < >> > > vvcep...@apache.org> >> > > >>>>>> wrote: >> > > >>>>>>>>>> >> > > >>>>>>>>>> Hi Richard, >> > > >>>>>>>>>> >> > > >>>>>>>>>> Thanks for picking this up! I know of at least one large >> > > community >> > > >>>>>> member >> > > >>>>>>>>>> for which this feature is absolutely essential. >> > > >>>>>>>>>> >> > > >>>>>>>>>> If I understand your two options, it seems like the >> proposal is >> > > to >> > > >>>>>> implement >> > > >>>>>>>>>> it as a behavior change regardless, and the question is >> whether >> > > to >> > > >>>>>> provide >> > > >>>>>>>>>> an opt-out config or not. >> > > >>>>>>>>>> >> > > >>>>>>>>>> Given that any implementation of this feature would have >> some >> > > >>>>>> performance >> > > >>>>>>>>>> impact under some workloads, and also that we don't know if >> > > anyone >> > > >>>>>> really >> > > >>>>>>>>>> depends on emit-on-update time semantics, it seems like we >> > > should >> > > >>>>>> propose >> > > >>>>>>>>>> to add an opt-out config. Can you update the KIP to >> mention the >> > > >>>>>> exact >> > > >>>>>>>>>> config key and value(s) you'd propose? >> > > >>>>>>>>>> >> > > >>>>>>>>>> Just to move the discussion forward, maybe something like: >> > > >>>>>>>>>> emit.on := change|update >> > > >>>>>>>>>> with the new default being "change" >> > > >>>>>>>>>> >> > > >>>>>>>>>> Thanks for pointing out the timestamp issue in particular. >> I >> > > agree >> > > >>>>>> that if >> > > >>>>>>>>>> we discard the latter update as a no-op, then we also have >> to >> > > >>>>>> discard its >> > > >>>>>>>>>> timestamp (obviously, we don't forward the timestamp >> update, as >> > > >>>>>> that's >> > > >>>>>>>>>> the whole point, but we also can't update the timestamp in >> the >> > > >>>>>> store, as >> > > >>>>>>>>>> the store must remain consistent with what has been >> emitted). >> > > >>>>>>>>>> >> > > >>>>>>>>>> I have to confess that I disagree with your implementation >> > > >>>>>> proposal, but >> > > >>>>>>>>>> it's also not necessary to discuss implementation in the >> KIP. >> > > Maybe >> > > >>>>>> it would >> > > >>>>>>>>>> be less controversial if you just drop that section for >> now, so >> > > >>>>>> that the KIP >> > > >>>>>>>>>> discussion can focus on the behavior change and config. >> > > >>>>>>>>>> >> > > >>>>>>>>>> Just for reference, there is some research into this >> domain. For >> > > >>>>>> example, >> > > >>>>>>>>>> see the "Report" section (3.2.3) of the SECRET paper: >> > > >>>>>>>>>> >> > > >>>>>> >> > > >> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeople.csail.mit.edu%2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&data=02%7C01%7CThomas.Becker%40tivo.com%7C63670904ae324e62575508d7a697c3ad%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637161043078978612&sdata=yCdlYShUf2y3mqZQHA8ZGR83%2B99CZp%2B5r0HksqS%2B%2FPc%3D&reserved=0 >> > > >>>>>>>>>> >> > > >>>>>>>>>> It might help to round out the proposal if you take a brief >> > > survey >> > > >>>>>> of the >> > > >>>>>>>>>> behaviors of other systems, along with pros and cons if >> any are >> > > >>>>>> reported. >> > > >>>>>>>>>> >> > > >>>>>>>>>> Thanks, >> > > >>>>>>>>>> -John >> > > >>>>>>>>>> >> > > >>>>>>>>>> >> > > >>>>>>>>>> On Fri, Jan 10, 2020, at 22:27, Richard Yu wrote: >> > > >>>>>>>>>>> Hi everybody! >> > > >>>>>>>>>>> >> > > >>>>>>>>>>> I'd like to propose a change that we probably should've >> added >> > > for >> > > >>>>>> a long >> > > >>>>>>>>>>> time now. >> > > >>>>>>>>>>> >> > > >>>>>>>>>>> The key benefit of this KIP would be reduced traffic in >> Kafka >> > > >>>>>> Streams since >> > > >>>>>>>>>>> a lot of no-op results would no longer be sent downstream. >> > > >>>>>>>>>>> Here is the KIP for reference. >> > > >>>>>>>>>>> >> > > >>>>>>>>>>> >> > > >>>>>> >> > > >> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit%2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&data=02%7C01%7CThomas.Becker%40tivo.com%7C63670904ae324e62575508d7a697c3ad%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637161043078988604&sdata=eVtuuyDX6aNsYcw8wOmM1HSinOq5ptPPUaTxXqgyA7g%3D&reserved=0 >> > > >>>>>>>>>>> >> > > >>>>>>>>>>> Currently, I seek to formalize our approach for this KIP >> first >> > > >>>>>> before we >> > > >>>>>>>>>>> determine concrete API additions / configurations. >> > > >>>>>>>>>>> Some configs might warrant adding, whiles others are not >> > > necessary >> > > >>>>>> since >> > > >>>>>>>>>>> adding them would only increase complexity of Kafka >> Streams. >> > > >>>>>>>>>>> >> > > >>>>>>>>>>> Cheers, >> > > >>>>>>>>>>> Richard >> > > >>>>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>> >> > > >>>>>> >> > > >>>>> >> > > >>>> >> > > >>> >> > > >>> ________________________________ >> > > >>> >> > > >>> This email and any attachments may contain confidential and >> privileged >> > > material for the sole use of the intended recipient. Any review, >> copying, >> > > or distribution of this email (or any attachments) by others is >> prohibited. >> > > If you are not the intended recipient, please contact the sender >> > > immediately and permanently delete this email and any attachments. No >> > > employee or agent of TiVo is authorized to conclude any binding >> agreement >> > > on behalf of TiVo by email. Binding agreements with TiVo may only be >> made >> > > by a signed written agreement. >> > > >> >> > > >> > > >> >