-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 Richard,
you cannot close a KIP as accepted with 2 binging votes. (cf https://cwiki.apache.org/confluence/display/KAFKA/Bylaws) You could only discard the KIP as long as it's not accepted :D However, I am +1 (binding) and thus you can close the VOTE as accepted. Just a three minor follow up comments: (1) In "Reporting Strategies" you mention in point (2) "Emit on update / non-empty content" -- I am not sure what "empty content" would be. This is a little bit confusing. Maybe just remove it? (2) "Design Reasoning" > we have decided that we will forward aggregation results if and > only if the timestamp and the value had not changed This sounds incorrect. If both value and timestamp have not changed, we would skip the update from my understanding? Ie, to phrase is differently: for a table-operation we only consider the value to make a comparison and if the value does not change, we don't emit anything (even if the timestamp changed). For windowed aggregations however, even if the value does not change, but the timestamp advances, we emit, ie, a changing timestamp is not considered idempotent for this case. (Note, that the timestamp can never go backward for this case, because it's computed as maximum over all input record for the window). (3) The discussion about stream time is very interesting. I agree that it's an orthogonal concern to this KIP. - -Matthias On 3/6/20 1:52 PM, Richard Yu wrote: > Hi all, > > I have decided to pass this KIP with 2 binding votes and 3 > non-binding votes (including mine). I will update KIP status > shortly after this. > > Best, Richard > > On Thu, Mar 5, 2020 at 3:45 PM Richard Yu > <yohan.richard...@gmail.com> wrote: > >> Hi all, >> >> Just polling for some last changes on the name. I think that >> since there doesn't seem to be much objection to any major >> changes in the KIP, I will pass it this Friday. >> >> If you feel that we still need some more discussion, please let >> me know. :) >> >> Best, Richard >> >> P.S. Will start working on a PR for this one soon. >> >> On Wed, Mar 4, 2020 at 1:30 PM Guozhang Wang <wangg...@gmail.com> >> wrote: >> >>> Regarding the metric name, I was actually trying to be >>> consistent with the node-level `suppression-emit` as I feel >>> this one's characteristics is closer to that. I other folks >>> feels better to align with the task-level "dropped-records" I >>> think I can be convinced too. >>> >>> >>> Guozhang >>> >>> On Wed, Mar 4, 2020 at 12:09 AM Bruno Cadonna >>> <br...@confluent.io> wrote: >>> >>>> Hi all, >>>> >>>> may I make a non-binding proposal for the metric name? I >>>> would prefer "skipped-idempotent-updates" to be consistent >>>> with the "dropped-records". >>>> >>>> Best, Bruno >>>> >>>> On Tue, Mar 3, 2020 at 11:57 PM Richard Yu >>>> <yohan.richard...@gmail.com> wrote: >>>>> >>>>> Hi all, >>>>> >>>>> Thanks for the discussion! >>>>> >>>>> @Guozhang, I will make the corresponding changes to the KIP >>>>> (i.e. >>>> renaming >>>>> the sensor and adding some notes). With the current state >>>>> of things, we are very close. Just need that >>> one >>>>> last binding vote. >>>>> >>>>> @Matthias J. Sax <matth...@confluent.io> It would be ideal >>>>> if we can >>>> also >>>>> get your last two cents on this as well. Other than that, >>>>> we are good. >>>>> >>>>> Best, Richard >>>>> >>>>> >>>>> On Tue, Mar 3, 2020 at 10:46 AM Guozhang Wang >>>>> <wangg...@gmail.com> >>>> wrote: >>>>> >>>>>> Hi Bruno, John: >>>>>> >>>>>> 1) That makes sense. If we consider them to be >>>>>> node-specific metrics >>>> that >>>>>> only applies to a subset of built-in processor nodes that >>>>>> are >>>> irrelevant to >>>>>> alert-relevant metrics (just like suppression-emit (rate >>>>>> | total)), >>>> they'd >>>>>> better be per-node instead of per-task and we would not >>>>>> associate >>> such >>>>>> events with warning. With that in mind, I'd suggest we >>>>>> consider >>>> renaming >>>>>> the metric without the `dropped` keyword to distinguish >>>>>> it with the per-task level sensor. How about >>>>>> "idempotent-update-skip (rate | >>>> total)"? >>>>>> >>>>>> Also a minor suggestion: we should clarify in the KIP / >>>>>> javadocs >>> which >>>>>> built-in processor nodes would have this metric while >>>>>> others don't. >>>>>> >>>>>> 2) About stream time tracking, there are multiple known >>>>>> issues that >>> we >>>>>> should close to improve our consistency semantics: >>>>>> >>>>>> a. preserve stream time of active tasks across rebalances >>>>>> where >>> they >>>> may >>>>>> be migrated. This is what KAFKA-9368 >>>>>> <https://issues.apache.org/jira/browse/KAFKA-9368> meant >>>>>> for. b. preserve stream time of standby tasks to be >>>>>> aligned with the >>> active >>>>>> tasks, via the changelog topics. >>>>>> >>>>>> And what I'm more concerning is b) here. For example: >>>>>> let's say we >>>> have a >>>>>> topology of `source -> A -> repartition -> B` where both >>>>>> A and B >>> have >>>>>> states along with changelogs, and both of them have >>>>>> standbys. If a >>>> record >>>>>> is piped from the source and completed traversed through >>>>>> the >>> topology, >>>> we >>>>>> need to make sure that the stream time inferred across: >>>>>> >>>>>> * active task A (inferred from the source record), * >>>>>> active task B (inferred from the derived record from >>>>>> repartition >>>> topic), >>>>>> * standby task A (inferred from the changelog topic of >>>>>> A's store), * standby task B (inferred from the changelog >>>>>> topic of B's store) >>>>>> >>>>>> are consistent (note I'm not saying they should be >>>>>> "exactly the >>> same", >>>> but >>>>>> consistent, meaning that they may have different values >>>>>> but as long >>> as >>>> that >>>>>> does not impact the time-based queries, it is fine). The >>>>>> main >>>> motivation is >>>>>> that on IQ, where both active and standby tasks could be >>>>>> accessed, >>> we >>>> can >>>>>> eventually improve our consistency guarantee to have 1) >>>> read-your-write, 2) >>>>>> consistency across stores, etc. >>>>>> >>>>>> I agree with John's assessment in the previous email, and >>>>>> just to >>>> clarify >>>>>> more concretely what I'm thinking. >>>>>> >>>>>> >>>>>> Guozhang >>>>>> >>>>>> >>>>>> On Tue, Mar 3, 2020 at 9:03 AM John Roesler >>>>>> <vvcep...@apache.org> >>>> wrote: >>>>>> >>>>>>> Thanks, Guozhang and Bruno! >>>>>>> >>>>>>> 2) I had a similar though to both of you about the >>>>>>> metrics, but I >>>> ultimately >>>>>>> came out with a conclusion like Bruno's. These aren't >>>>>>> dropped >>> invalid >>>>>>> records, they're intentionally dropped, valid, but >>>>>>> unnecessary, >>>> updates. >>>>>>> A "warning" for this case definitely seems wrong, and >>>>>>> I'd also not recommend counting these events along with >>>>>>> "dropped-records", because those >>> are >>>>>>> all dropped invalid records, e.g., late or null-keyed >>>>>>> or couldn't >>> be >>>>>>> deserialized. >>>>>>> >>>>>>> Like Bruno pointed out, an operator should be concerned >>>>>>> to see non-zero "dropped-records", and would then >>>>>>> consult the logs for >>>> warnings. >>>>>>> But that same person should be happy to see >>>> "dropped-idempotent-updates" >>>>>>> increasing, since it means they're saving time and >>>>>>> money. Maybe >>> the >>>> name >>>>>>> of the metric could be different, but I couldn't think >>>>>>> of a better >>>> one. >>>>>>> OTOH, maybe it just stands out to us because we >>>>>>> recently discussed those >>>> other >>>>>>> metrics in KIP-444? >>>>>>> >>>>>>> 1) Maybe we should discuss this point more. It seems >>>>>>> like we should >>>> maintain >>>>>>> an invariant that the following three objects always >>>>>>> have exactly >>> the >>>>>> same >>>>>>> state (modulo flush boundaries): 1. The internal state >>>>>>> store 2. The changelog 3. The operation's result view >>>>>>> >>>>>>> That is, if I have a materialized Filter, then it seems >>>>>>> like I >>> _must_ >>>>>> store >>>>>>> exactly the same record in the store and the changelog, >>>>>>> and also >>>> forward >>>>>>> the exact same record, including the timestamp, to the >>>>>>> downstream operations. >>>>>>> >>>>>>> If we store something different in the internal state >>>>>>> store than >>> the >>>>>>> changelog, we can get a situation where the state is >>>>>>> actually >>>> different >>>>>>> after restoration than it is during processing, and >>>>>>> queries against >>>> standbys >>>>>>> would return different results than queries against the >>>>>>> active tasks. >>>>>>> >>>>>>> Regarding storing something different in the >>>>>>> store+changelog than >>> we >>>>>>> forward downstream, consider the following topology: >>>>>>> sourceTable .filter(someFilter, Materialized.as("f1")) >>>>>>> .filter(_ -> true, Materialized.as("f2")) >>>>>>> >>>>>>> If we didn't forward exactly the same data we store, >>>>>>> then >>> querying f2 >>>>>>> would return different results than querying f1, which >>>>>>> is clearly >>> not >>>>>>> correct, given the topology. >>>>>>> >>>>>>> It seems like maybe what you have in mind is the >>>>>>> preservation of >>>> stream >>>>>>> time across restart/rebalance? This bug is still open, >>>>>>> actually: >>>>>>> https://issues.apache.org/jira/browse/KAFKA-9368 It >>>>>>> seems like solving that bug would be independent of >>>>>>> KIP-557. >>> I.e., >>>>>>> KIP-557 neither makes that bug worse or better. >>>>>>> >>>>>>> One other thought I had is maybe you were thinking that >>>>>>> operators would update their internally tracked stream >>>>>>> time, but still >>> discard >>>>>>> records? I think that _would_ be a bug. That is, if a >>>>>>> record gets >>>>>> discarded >>>>>>> as idempotent, it should have no effect at all on the >>>>>>> state of the application. Reflecting on my prior >>>>>>> analysis of stream time, most of the cases >>>> where >>>>>> we >>>>>>> track stream time is in Stream aggregations, and in >>>>>>> those cases, >>> if >>>> an >>>>>>> incoming record's timestamp is higher than the previous >>>>>>> stream >>> time, >>>> it >>>>>>> would already not be considered idempotent. So we would >>>>>>> store, >>> log, >>>> and >>>>>>> forward the result with the new timestamp. The only >>>>>>> other case is Suppress. With respect to idempotence, >>>> Suppress is >>>>>>> equivalent to a stateless no-op transformation. All it >>>>>>> does is >>>> collect >>>>>> and >>>>>>> delay updates. It has no memory of what it previously >>>>>>> emitted, so it >>>> wouldn't >>>>>>> be possible for it to check for idempotence anyway. >>>>>>> >>>>>>> Was that what you were thinking? Thanks, -John >>>>>>> >>>>>>> >>>>>>> On Tue, Mar 3, 2020, at 02:34, Bruno Cadonna wrote: >>>>>>>> Hi Guozhang, >>>>>>>> >>>>>>>> I also had the same thought about using the existing >>>> "dropped-records" >>>>>>>> metrics. However, I think in this case it would be >>>>>>>> better to >>> use a >>>> new >>>>>>>> metric because dropped idempotent updates is an >>>>>>>> optimization, >>> they >>>> do >>>>>>>> not represent missed records. The dropped idempotent >>>>>>>> updates in general do not change the result and so do >>>>>>>> not need a warn log message. Whereas dropped records >>>>>>>> due to expired windows, >>>> serialization >>>>>>>> errors, or lateness might be something concerning >>>>>>>> that need a >>> warn >>>> log >>>>>>>> message. >>>>>>>> >>>>>>>> Looking at the metrics, you would be happy to see >>>>>>>> "dropped-idempotent-updates" increase, because that >>>>>>>> means >>> Streams >>>> gets >>>>>>>> rid of no-ops downstream, but you would be concerned >>>>>>>> if "dropped-records" would increase, because that >>>>>>>> means your >>> records >>>> or >>>>>>>> the configuration of your app has issues. The >>>>>>>> "dropped-idempotent-updates" metric could also be an >>>>>>>> indication >>>> that >>>>>>>> you could further optimize your setup, by getting rid >>>>>>>> of >>> idempotent >>>>>>>> updates further upstream. >>>>>>>> >>>>>>>> Best, Bruno >>>>>>>> >>>>>>>> On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang < >>> wangg...@gmail.com> >>>>>> wrote: >>>>>>>>> >>>>>>>>> Hello Richard, >>>>>>>>> >>>>>>>>> Thanks for the KIP. I once reviewed it and was >>>>>>>>> concerned about >>>> its >>>>>>> effects >>>>>>>>> on stream time advancing. After reading the updated >>>>>>>>> KIP I >>> think >>>> it >>>>>> has >>>>>>>>> answered a lot of them already. >>>>>>>>> >>>>>>>>> I have a couple minor comments still, otherwise I'm >>>>>>>>> +1: >>>>>>>>> >>>>>>>>> 1) I want to clarify that for operations resulted >>>>>>>>> in KTables >>> (not >>>>>> only >>>>>>>>> aggregations, but consider KTable#filter that may >>>>>>>>> also result >>> in >>>> a >>>>>> new >>>>>>>>> KTable), even if we drop emissions to the >>>>>>>>> downstream topics we >>>> would >>>>>>> still >>>>>>>>> append to the corresponding changelog if timestamp >>>>>>>>> has >>> changed. >>>> This >>>>>> is >>>>>>>>> because the timestamps on the changelog is read by >>>>>>>>> the standby >>>> tasks >>>>>>> which >>>>>>>>> relies on them to infer its own stream time >>>>>>>>> advancing. >>>>>>>>> >>>>>>>>> 2) About the metrics, in KIP-444 we are >>>>>>>>> consolidating all >>> types >>>> of >>>>>>>>> scenarios that can cause dropped records to the >>>>>>>>> same metrics: >>>>>>>>> >>>>>>> >>>>>> >>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emi t+on+change+support+for+Kafka+Streams >>>>>>>>> >>>>>>>>> >>> late-records-drop: INFO at processor node level, replaced by >>> INFO >>>>>>>>> task-level "dropped-records". >>>>>>>>> >>>>>>>>> skipped-records: INFO at thread and processor node >>>>>>>>> level, >>>> replaced by >>>>>>> INFO >>>>>>>>> task-level "dropped-records". >>>>>>>>> >>>>>>>>> expired-window-record-drop: DEBUG at state store >>>>>>>>> level, >>> replaced >>>> by >>>>>>> INFO >>>>>>>>> task-level "dropped-records". >>>>>>>>> >>>>>>>>> The main idea is that instead of using different >>>>>>>>> metrics to >>>> indicate >>>>>>>>> different types of scenarios, and users just alert >>>>>>>>> on that >>> single >>>>>>> metrics. >>>>>>>>> When alert triggers, they can look into the log4j >>>>>>>>> for its >>> causes >>>> (we >>>>>>> made >>>>>>>>> sure that all sensor recordings of this metric >>>>>>>>> would be >>>> associated >>>>>>> with a >>>>>>>>> warning log4j). >>>>>>>>> >>>>>>>>> So I'd suggest that instead of introducing a new >>>>>>>>> per-node "dropped-idempotent-updates", we just >>>>>>>>> piggy-back on the >>> existing >>>>>>> task-level >>>>>>>>> metric; unless we think that idempotent drops are >>>>>>>>> more >>> frequent >>>> than >>>>>>> others >>>>>>>>> and also they do not worth a warning log, in that >>>>>>>>> case we can >>>>>> consider >>>>>>>>> break this metric down with different tags for >>>>>>>>> example. >>>>>>>>> >>>>>>>>> Guozhang >>>>>>>>> >>>>>>>>> On Mon, Mar 2, 2020 at 1:59 PM Richard Yu < >>>>>> yohan.richard...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi all, >>>>>>>>>> >>>>>>>>>> Thanks for the votes so far! @Matthias or >>>>>>>>>> @Guozhang Wang <guozh...@confluent.io> it >>> would >>>> be >>>>>>> great to >>>>>>>>>> also get your input on this KIP. >>>>>>>>>> >>>>>>>>>> It looks to be pretty close to completion, so the >>>>>>>>>> finishing >>>> touches >>>>>>> are all >>>>>>>>>> we need. :) >>>>>>>>>> >>>>>>>>>> Best, Richard >>>>>>>>>> >>>>>>>>>> On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine >>>>>>>>>> < ghassan.yamm...@bazaarvoice.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hello all, >>>>>>>>>>> >>>>>>>>>>> +1 (non-binding) >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> >>>>>>>>>>> Ghassan >>>>>>>>>>> >>>>>>>>>>> On 3/2/20, 12:43 PM, "Bruno Cadonna" >>>>>>>>>>> <br...@confluent.io >>>> >>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> EXTERNAL: This email originated from outside >>>>>>>>>>> of >>>> Bazaarvoice. >>>>>>> Do not >>>>>>>>>>> click any links or open any attachments unless >>>>>>>>>>> you trust >>> the >>>>>>> sender and >>>>>>>>>>> know the content is safe. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Hi Richard, >>>>>>>>>>> >>>>>>>>>>> +1 (non-binding) >>>>>>>>>>> >>>>>>>>>>> Best, Bruno >>>>>>>>>>> >>>>>>>>>>> On Mon, Mar 2, 2020 at 4:33 PM John Roesler < >>>>>>> vvcep...@apache.org> >>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi Richard, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for the KIP! >>>>>>>>>>>> >>>>>>>>>>>> I'm +1 (binding) >>>>>>>>>>>> >>>>>>>>>>>> -john >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Feb 27, 2020, at 14:40, Richard Yu >>>>>>>>>>>> wrote: >>>>>>>>>>>>> Hi all, >>>>>>>>>>>>> >>>>>>>>>>>>> I am proposing a new optimization to Kafka >>>>>>>>>>>>> Streams >>>> which >>>>>>> would >>>>>>>>>>> greatly >>>>>>>>>>>>> reduce the number of idempotent updates >>>>>>>>>>>>> (or >>> no-ops) >>>> in >>>>>> the >>>>>>> Kafka >>>>>>>>>>> Streams >>>>>>>>>>>>> DAG. A number of users have been interested >>>>>>>>>>>>> in this >>>> feature, >>>>>> so >>>>>>> it >>>>>>>>>>> would be nice >>>>>>>>>>>>> to pass this one in. >>>>>>>>>>>>> >>>>>>>>>>>>> For information, the KIP is described >>>>>>>>>>>>> below: >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>> >>>>>> >>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emi t+on+change+support+for+Kafka+Streams >>>>>>>>>>> >>> >> >>>>>>>>>>>>> We aim to make Kafka Streams more efficient >>>>>>>>>>>>> by >>>> adopting >>>>>>> the "emit >>>>>>>>>>> on >>>>>>>>>>>>> change" reporting strategy. >>>>>>>>>>>>> >>>>>>>>>>>>> Please cast your vote! >>>>>>>>>>>>> >>>>>>>>>>>>> Best, Richard >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- -- Guozhang >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- -- Guozhang >>>>>> >>>> >>> >>> >>> -- -- Guozhang >>> >> > -----BEGIN PGP SIGNATURE----- iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5kC1kACgkQO4miYXKq /OhHKA/+OewqjX248vjk6GO6Ex/f2kOJuIIDGb4/c0NlTIS/Iyat1+S8N9P58KNP pg133xwdWHagU7wajYMktoFiPamQ+Cv+PPhr7qz38JdfVAvzpNb8tcsI/wr5apOQ XNlBsPhQBLtO/JQUve72OqY/TC9unbpBfhA4tvdA/qkLNvDaX542SrZdlwXuqTKH EBpgEPBrwaqJ5S65KTMs6Fppc5c2V3dWOAC7Ssql30OneUd/RS88oQ07oNkwZwss tADw+tzXtw8a0C0PGtMoXhLrs9wipEsuGOP8N6uvuQCM7YoIvTyeBf3Cu7jG8NFB r2caoWY4TZkqCRsrKe37nNbR8KpjkNQBxCZ7nvIJ9B3KCdB0JOFQXwYj1+23z6aX T1otQ+0ZIg5lzpIFiHCzwzO5mo2VUEYryRvanw/f2S/LaaBIcg83Dz5TJIv8dFcd mU7Vu1KXtpWTgpg48JkWd9qSwPqBaR+nvbdP/DnStwf9/9n5SSGgcdS83jw/w6RV N1bX6YlDCFYeIIT14lrsbWiHSZpiFARZ0fn+VBm8DAF0g+mWlX5Hg30yHKujDj+h qMDZkI2K2eoYRJaUFcS3yvr2RqCtgXMCEr+jrAVGHDaq+Lt4mbEJRZdon3MiF0Ht WmEiNaQa7Tu5h+8P5Rb05kPAB6ODa7/sC0BxC54uRXLdPnNxQCs= =nuG1 -----END PGP SIGNATURE-----