Hi all, I have decided to pass this KIP with 2 binding votes and 3 non-binding votes (including mine). I will update KIP status shortly after this.
Best, Richard On Thu, Mar 5, 2020 at 3:45 PM Richard Yu <yohan.richard...@gmail.com> wrote: > Hi all, > > Just polling for some last changes on the name. > I think that since there doesn't seem to be much objection to any major > changes in the KIP, I will pass it this Friday. > > If you feel that we still need some more discussion, please let me know. :) > > Best, > Richard > > P.S. Will start working on a PR for this one soon. > > On Wed, Mar 4, 2020 at 1:30 PM Guozhang Wang <wangg...@gmail.com> wrote: > >> Regarding the metric name, I was actually trying to be consistent with the >> node-level `suppression-emit` as I feel this one's characteristics is >> closer to that. I other folks feels better to align with the task-level >> "dropped-records" I think I can be convinced too. >> >> >> Guozhang >> >> On Wed, Mar 4, 2020 at 12:09 AM Bruno Cadonna <br...@confluent.io> wrote: >> >> > Hi all, >> > >> > may I make a non-binding proposal for the metric name? I would prefer >> > "skipped-idempotent-updates" to be consistent with the >> > "dropped-records". >> > >> > Best, >> > Bruno >> > >> > On Tue, Mar 3, 2020 at 11:57 PM Richard Yu <yohan.richard...@gmail.com> >> > wrote: >> > > >> > > Hi all, >> > > >> > > Thanks for the discussion! >> > > >> > > @Guozhang, I will make the corresponding changes to the KIP (i.e. >> > renaming >> > > the sensor and adding some notes). >> > > With the current state of things, we are very close. Just need that >> one >> > > last binding vote. >> > > >> > > @Matthias J. Sax <matth...@confluent.io> It would be ideal if we can >> > also >> > > get your last two cents on this as well. >> > > Other than that, we are good. >> > > >> > > Best, >> > > Richard >> > > >> > > >> > > On Tue, Mar 3, 2020 at 10:46 AM Guozhang Wang <wangg...@gmail.com> >> > wrote: >> > > >> > > > Hi Bruno, John: >> > > > >> > > > 1) That makes sense. If we consider them to be node-specific metrics >> > that >> > > > only applies to a subset of built-in processor nodes that are >> > irrelevant to >> > > > alert-relevant metrics (just like suppression-emit (rate | total)), >> > they'd >> > > > better be per-node instead of per-task and we would not associate >> such >> > > > events with warning. With that in mind, I'd suggest we consider >> > renaming >> > > > the metric without the `dropped` keyword to distinguish it with the >> > > > per-task level sensor. How about "idempotent-update-skip (rate | >> > total)"? >> > > > >> > > > Also a minor suggestion: we should clarify in the KIP / javadocs >> which >> > > > built-in processor nodes would have this metric while others don't. >> > > > >> > > > 2) About stream time tracking, there are multiple known issues that >> we >> > > > should close to improve our consistency semantics: >> > > > >> > > > a. preserve stream time of active tasks across rebalances where >> they >> > may >> > > > be migrated. This is what KAFKA-9368 >> > > > <https://issues.apache.org/jira/browse/KAFKA-9368> meant for. >> > > > b. preserve stream time of standby tasks to be aligned with the >> active >> > > > tasks, via the changelog topics. >> > > > >> > > > And what I'm more concerning is b) here. For example: let's say we >> > have a >> > > > topology of `source -> A -> repartition -> B` where both A and B >> have >> > > > states along with changelogs, and both of them have standbys. If a >> > record >> > > > is piped from the source and completed traversed through the >> topology, >> > we >> > > > need to make sure that the stream time inferred across: >> > > > >> > > > * active task A (inferred from the source record), >> > > > * active task B (inferred from the derived record from repartition >> > topic), >> > > > * standby task A (inferred from the changelog topic of A's store), >> > > > * standby task B (inferred from the changelog topic of B's store) >> > > > >> > > > are consistent (note I'm not saying they should be "exactly the >> same", >> > but >> > > > consistent, meaning that they may have different values but as long >> as >> > that >> > > > does not impact the time-based queries, it is fine). The main >> > motivation is >> > > > that on IQ, where both active and standby tasks could be accessed, >> we >> > can >> > > > eventually improve our consistency guarantee to have 1) >> > read-your-write, 2) >> > > > consistency across stores, etc. >> > > > >> > > > I agree with John's assessment in the previous email, and just to >> > clarify >> > > > more concretely what I'm thinking. >> > > > >> > > > >> > > > Guozhang >> > > > >> > > > >> > > > On Tue, Mar 3, 2020 at 9:03 AM John Roesler <vvcep...@apache.org> >> > wrote: >> > > > >> > > > > Thanks, Guozhang and Bruno! >> > > > > >> > > > > 2) >> > > > > I had a similar though to both of you about the metrics, but I >> > ultimately >> > > > > came out with a conclusion like Bruno's. These aren't dropped >> invalid >> > > > > records, they're intentionally dropped, valid, but unnecessary, >> > updates. >> > > > > A "warning" for this case definitely seems wrong, and I'd also not >> > > > > recommend >> > > > > counting these events along with "dropped-records", because those >> are >> > > > > all dropped invalid records, e.g., late or null-keyed or couldn't >> be >> > > > > deserialized. >> > > > > >> > > > > Like Bruno pointed out, an operator should be concerned to see >> > > > > non-zero "dropped-records", and would then consult the logs for >> > warnings. >> > > > > But that same person should be happy to see >> > "dropped-idempotent-updates" >> > > > > increasing, since it means they're saving time and money. Maybe >> the >> > name >> > > > > of the metric could be different, but I couldn't think of a better >> > one. >> > > > > OTOH, >> > > > > maybe it just stands out to us because we recently discussed those >> > other >> > > > > metrics in KIP-444? >> > > > > >> > > > > 1) >> > > > > Maybe we should discuss this point more. It seems like we should >> > maintain >> > > > > an invariant that the following three objects always have exactly >> the >> > > > same >> > > > > state (modulo flush boundaries): >> > > > > 1. The internal state store >> > > > > 2. The changelog >> > > > > 3. The operation's result view >> > > > > >> > > > > That is, if I have a materialized Filter, then it seems like I >> _must_ >> > > > store >> > > > > exactly the same record in the store and the changelog, and also >> > forward >> > > > > the exact same record, including the timestamp, to the downstream >> > > > > operations. >> > > > > >> > > > > If we store something different in the internal state store than >> the >> > > > > changelog, we can get a situation where the state is actually >> > different >> > > > > after >> > > > > restoration than it is during processing, and queries against >> > standbys >> > > > > would >> > > > > return different results than queries against the active tasks. >> > > > > >> > > > > Regarding storing something different in the store+changelog than >> we >> > > > > forward downstream, consider the following topology: >> > > > > sourceTable >> > > > > .filter(someFilter, Materialized.as("f1")) >> > > > > .filter(_ -> true, Materialized.as("f2")) >> > > > > >> > > > > If we didn't forward exactly the same data we store, then >> querying f2 >> > > > > would return different results than querying f1, which is clearly >> not >> > > > > correct, given the topology. >> > > > > >> > > > > It seems like maybe what you have in mind is the preservation of >> > stream >> > > > > time across restart/rebalance? This bug is still open, actually: >> > > > > https://issues.apache.org/jira/browse/KAFKA-9368 >> > > > > It seems like solving that bug would be independent of KIP-557. >> I.e., >> > > > > KIP-557 neither makes that bug worse or better. >> > > > > >> > > > > One other thought I had is maybe you were thinking that operators >> > > > > would update their internally tracked stream time, but still >> discard >> > > > > records? I think that _would_ be a bug. That is, if a record gets >> > > > discarded >> > > > > as idempotent, it should have no effect at all on the state of the >> > > > > application. >> > > > > Reflecting on my prior analysis of stream time, most of the cases >> > where >> > > > we >> > > > > track stream time is in Stream aggregations, and in those cases, >> if >> > an >> > > > > incoming record's timestamp is higher than the previous stream >> time, >> > it >> > > > > would already not be considered idempotent. So we would store, >> log, >> > and >> > > > > forward the result with the new timestamp. >> > > > > The only other case is Suppress. With respect to idempotence, >> > Suppress is >> > > > > equivalent to a stateless no-op transformation. All it does is >> > collect >> > > > and >> > > > > delay >> > > > > updates. It has no memory of what it previously emitted, so it >> > wouldn't >> > > > > be possible for it to check for idempotence anyway. >> > > > > >> > > > > Was that what you were thinking? >> > > > > Thanks, >> > > > > -John >> > > > > >> > > > > >> > > > > On Tue, Mar 3, 2020, at 02:34, Bruno Cadonna wrote: >> > > > > > Hi Guozhang, >> > > > > > >> > > > > > I also had the same thought about using the existing >> > "dropped-records" >> > > > > > metrics. However, I think in this case it would be better to >> use a >> > new >> > > > > > metric because dropped idempotent updates is an optimization, >> they >> > do >> > > > > > not represent missed records. The dropped idempotent updates in >> > > > > > general do not change the result and so do not need a warn log >> > > > > > message. Whereas dropped records due to expired windows, >> > serialization >> > > > > > errors, or lateness might be something concerning that need a >> warn >> > log >> > > > > > message. >> > > > > > >> > > > > > Looking at the metrics, you would be happy to see >> > > > > > "dropped-idempotent-updates" increase, because that means >> Streams >> > gets >> > > > > > rid of no-ops downstream, but you would be concerned if >> > > > > > "dropped-records" would increase, because that means your >> records >> > or >> > > > > > the configuration of your app has issues. The >> > > > > > "dropped-idempotent-updates" metric could also be an indication >> > that >> > > > > > you could further optimize your setup, by getting rid of >> idempotent >> > > > > > updates further upstream. >> > > > > > >> > > > > > Best, >> > > > > > Bruno >> > > > > > >> > > > > > On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang < >> wangg...@gmail.com> >> > > > wrote: >> > > > > > > >> > > > > > > Hello Richard, >> > > > > > > >> > > > > > > Thanks for the KIP. I once reviewed it and was concerned about >> > its >> > > > > effects >> > > > > > > on stream time advancing. After reading the updated KIP I >> think >> > it >> > > > has >> > > > > > > answered a lot of them already. >> > > > > > > >> > > > > > > I have a couple minor comments still, otherwise I'm +1: >> > > > > > > >> > > > > > > 1) I want to clarify that for operations resulted in KTables >> (not >> > > > only >> > > > > > > aggregations, but consider KTable#filter that may also result >> in >> > a >> > > > new >> > > > > > > KTable), even if we drop emissions to the downstream topics we >> > would >> > > > > still >> > > > > > > append to the corresponding changelog if timestamp has >> changed. >> > This >> > > > is >> > > > > > > because the timestamps on the changelog is read by the standby >> > tasks >> > > > > which >> > > > > > > relies on them to infer its own stream time advancing. >> > > > > > > >> > > > > > > 2) About the metrics, in KIP-444 we are consolidating all >> types >> > of >> > > > > > > scenarios that can cause dropped records to the same metrics: >> > > > > > > >> > > > > >> > > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams >> > > > > > > >> > > > > > > late-records-drop: INFO at processor node level, replaced by >> INFO >> > > > > > > task-level "dropped-records". >> > > > > > > >> > > > > > > skipped-records: INFO at thread and processor node level, >> > replaced by >> > > > > INFO >> > > > > > > task-level "dropped-records". >> > > > > > > >> > > > > > > expired-window-record-drop: DEBUG at state store level, >> replaced >> > by >> > > > > INFO >> > > > > > > task-level "dropped-records". >> > > > > > > >> > > > > > > The main idea is that instead of using different metrics to >> > indicate >> > > > > > > different types of scenarios, and users just alert on that >> single >> > > > > metrics. >> > > > > > > When alert triggers, they can look into the log4j for its >> causes >> > (we >> > > > > made >> > > > > > > sure that all sensor recordings of this metric would be >> > associated >> > > > > with a >> > > > > > > warning log4j). >> > > > > > > >> > > > > > > So I'd suggest that instead of introducing a new per-node >> > > > > > > "dropped-idempotent-updates", we just piggy-back on the >> existing >> > > > > task-level >> > > > > > > metric; unless we think that idempotent drops are more >> frequent >> > than >> > > > > others >> > > > > > > and also they do not worth a warning log, in that case we can >> > > > consider >> > > > > > > break this metric down with different tags for example. >> > > > > > > >> > > > > > > Guozhang >> > > > > > > >> > > > > > > On Mon, Mar 2, 2020 at 1:59 PM Richard Yu < >> > > > yohan.richard...@gmail.com> >> > > > > > > wrote: >> > > > > > > >> > > > > > > > Hi all, >> > > > > > > > >> > > > > > > > Thanks for the votes so far! >> > > > > > > > @Matthias or @Guozhang Wang <guozh...@confluent.io> it >> would >> > be >> > > > > great to >> > > > > > > > also get your input on this KIP. >> > > > > > > > >> > > > > > > > It looks to be pretty close to completion, so the finishing >> > touches >> > > > > are all >> > > > > > > > we need. :) >> > > > > > > > >> > > > > > > > Best, >> > > > > > > > Richard >> > > > > > > > >> > > > > > > > On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine < >> > > > > > > > ghassan.yamm...@bazaarvoice.com> wrote: >> > > > > > > > >> > > > > > > > > Hello all, >> > > > > > > > > >> > > > > > > > > +1 (non-binding) >> > > > > > > > > >> > > > > > > > > Thanks, >> > > > > > > > > >> > > > > > > > > Ghassan >> > > > > > > > > >> > > > > > > > > On 3/2/20, 12:43 PM, "Bruno Cadonna" <br...@confluent.io >> > >> > > > wrote: >> > > > > > > > > >> > > > > > > > > EXTERNAL: This email originated from outside of >> > Bazaarvoice. >> > > > > Do not >> > > > > > > > > click any links or open any attachments unless you trust >> the >> > > > > sender and >> > > > > > > > > know the content is safe. >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > Hi Richard, >> > > > > > > > > >> > > > > > > > > +1 (non-binding) >> > > > > > > > > >> > > > > > > > > Best, >> > > > > > > > > Bruno >> > > > > > > > > >> > > > > > > > > On Mon, Mar 2, 2020 at 4:33 PM John Roesler < >> > > > > vvcep...@apache.org> >> > > > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > Hi Richard, >> > > > > > > > > > >> > > > > > > > > > Thanks for the KIP! >> > > > > > > > > > >> > > > > > > > > > I'm +1 (binding) >> > > > > > > > > > >> > > > > > > > > > -john >> > > > > > > > > > >> > > > > > > > > > On Thu, Feb 27, 2020, at 14:40, Richard Yu wrote: >> > > > > > > > > > > Hi all, >> > > > > > > > > > > >> > > > > > > > > > > I am proposing a new optimization to Kafka Streams >> > which >> > > > > would >> > > > > > > > > greatly >> > > > > > > > > > > reduce the number of idempotent updates (or >> no-ops) >> > in >> > > > the >> > > > > Kafka >> > > > > > > > > Streams >> > > > > > > > > > > DAG. >> > > > > > > > > > > A number of users have been interested in this >> > feature, >> > > > so >> > > > > it >> > > > > > > > > would be nice >> > > > > > > > > > > to pass this one in. >> > > > > > > > > > > >> > > > > > > > > > > For information, the KIP is described below: >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > >> > > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams >> > > > > > > > > > > >> > > > > > > > > > > We aim to make Kafka Streams more efficient by >> > adopting >> > > > > the "emit >> > > > > > > > > on >> > > > > > > > > > > change" reporting strategy. >> > > > > > > > > > > >> > > > > > > > > > > Please cast your vote! >> > > > > > > > > > > >> > > > > > > > > > > Best, >> > > > > > > > > > > Richard >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > -- >> > > > > > > -- Guozhang >> > > > > > >> > > > > >> > > > >> > > > >> > > > -- >> > > > -- Guozhang >> > > > >> > >> >> >> -- >> -- Guozhang >> >