Hi all,

may I make a non-binding proposal for the metric name? I would prefer
"skipped-idempotent-updates" to be consistent with the
"dropped-records".

Best,
Bruno

On Tue, Mar 3, 2020 at 11:57 PM Richard Yu <yohan.richard...@gmail.com> wrote:
>
> Hi all,
>
> Thanks for the discussion!
>
> @Guozhang, I will make the corresponding changes to the KIP (i.e. renaming
> the sensor and adding some notes).
> With the current state of things, we are very close. Just need that one
> last binding vote.
>
> @Matthias J. Sax <matth...@confluent.io>  It would be ideal if we can also
> get your last two cents on this as well.
> Other than that, we are good.
>
> Best,
> Richard
>
>
> On Tue, Mar 3, 2020 at 10:46 AM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Bruno, John:
> >
> > 1) That makes sense. If we consider them to be node-specific metrics that
> > only applies to a subset of built-in processor nodes that are irrelevant to
> > alert-relevant metrics (just like suppression-emit (rate | total)), they'd
> > better be per-node instead of per-task and we would not associate such
> > events with warning. With that in mind, I'd suggest we consider renaming
> > the metric without the `dropped` keyword to distinguish it with the
> > per-task level sensor. How about "idempotent-update-skip (rate | total)"?
> >
> > Also a minor suggestion: we should clarify in the KIP / javadocs which
> > built-in processor nodes would have this metric while others don't.
> >
> > 2) About stream time tracking, there are multiple known issues that we
> > should close to improve our consistency semantics:
> >
> >  a. preserve stream time of active tasks across rebalances where they may
> > be migrated. This is what KAFKA-9368
> > <https://issues.apache.org/jira/browse/KAFKA-9368> meant for.
> >  b. preserve stream time of standby tasks to be aligned with the active
> > tasks, via the changelog topics.
> >
> > And what I'm more concerning is b) here. For example: let's say we have a
> > topology of `source -> A -> repartition -> B` where both A and B have
> > states along with changelogs, and both of them have standbys. If a record
> > is piped from the source and completed traversed through the topology, we
> > need to make sure that the stream time inferred across:
> >
> > * active task A (inferred from the source record),
> > * active task B (inferred from the derived record from repartition topic),
> > * standby task A (inferred from the changelog topic of A's store),
> > * standby task B (inferred from the changelog topic of B's store)
> >
> > are consistent (note I'm not saying they should be "exactly the same", but
> > consistent, meaning that they may have different values but as long as that
> > does not impact the time-based queries, it is fine). The main motivation is
> > that on IQ, where both active and standby tasks could be accessed, we can
> > eventually improve our consistency guarantee to have 1) read-your-write, 2)
> > consistency across stores, etc.
> >
> > I agree with John's assessment in the previous email, and just to clarify
> > more concretely what I'm thinking.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Mar 3, 2020 at 9:03 AM John Roesler <vvcep...@apache.org> wrote:
> >
> > > Thanks, Guozhang and Bruno!
> > >
> > > 2)
> > > I had a similar though to both of you about the metrics, but I ultimately
> > > came out with a conclusion like Bruno's. These aren't dropped invalid
> > > records, they're intentionally dropped, valid, but unnecessary, updates.
> > > A "warning" for this case definitely seems wrong, and I'd also not
> > > recommend
> > > counting these events along with "dropped-records", because those are
> > > all dropped invalid records, e.g., late or null-keyed or couldn't be
> > > deserialized.
> > >
> > > Like Bruno pointed out, an operator should be concerned to see
> > > non-zero "dropped-records", and would then consult the logs for warnings.
> > > But that same person should be happy to see "dropped-idempotent-updates"
> > > increasing, since it means they're saving time and money. Maybe the name
> > > of the metric could be different, but I couldn't think of a better one.
> > > OTOH,
> > > maybe it just stands out to us because we recently discussed those other
> > > metrics in KIP-444?
> > >
> > > 1)
> > > Maybe we should discuss this point more. It seems like we should maintain
> > > an invariant that the following three objects always have exactly the
> > same
> > > state (modulo flush boundaries):
> > > 1. The internal state store
> > > 2. The changelog
> > > 3. The operation's result view
> > >
> > > That is, if I have a materialized Filter, then it seems like I _must_
> > store
> > > exactly the same record in the store and the changelog, and also forward
> > > the exact same record, including the timestamp, to the downstream
> > > operations.
> > >
> > > If we store something different in the internal state store than the
> > > changelog, we can get a situation where the state is actually different
> > > after
> > > restoration than it is during processing, and queries against standbys
> > > would
> > > return different results than queries against the active tasks.
> > >
> > > Regarding storing something different in the store+changelog than we
> > > forward downstream, consider the following topology:
> > > sourceTable
> > >   .filter(someFilter, Materialized.as("f1"))
> > >   .filter(_ -> true, Materialized.as("f2"))
> > >
> > > If we didn't forward exactly the same data we store, then querying f2
> > > would return different results than querying f1, which is clearly not
> > > correct, given the topology.
> > >
> > > It seems like maybe what you have in mind is the preservation of stream
> > > time across restart/rebalance? This bug is still open, actually:
> > > https://issues.apache.org/jira/browse/KAFKA-9368
> > > It seems like solving that bug would be independent of KIP-557. I.e.,
> > > KIP-557 neither makes that bug worse or better.
> > >
> > > One other thought I had is maybe you were thinking that operators
> > > would update their internally tracked stream time, but still discard
> > > records? I think that _would_ be a bug. That is, if a record gets
> > discarded
> > > as idempotent, it should have no effect at all on the state of the
> > > application.
> > > Reflecting on my prior analysis of stream time, most of the cases where
> > we
> > > track stream time is in Stream aggregations, and in those cases, if an
> > > incoming record's timestamp is higher than the previous stream time, it
> > > would already not be considered idempotent. So we would store, log, and
> > > forward the result with the new timestamp.
> > > The only other case is Suppress. With respect to idempotence, Suppress is
> > > equivalent to a stateless no-op transformation. All it does is collect
> > and
> > > delay
> > > updates. It has no memory of what it previously emitted, so it wouldn't
> > > be possible for it to check for idempotence anyway.
> > >
> > > Was that what you were thinking?
> > > Thanks,
> > > -John
> > >
> > >
> > > On Tue, Mar 3, 2020, at 02:34, Bruno Cadonna wrote:
> > > > Hi Guozhang,
> > > >
> > > > I also had the same thought about using the existing "dropped-records"
> > > > metrics. However, I think in this case it would be better to use a new
> > > > metric because dropped idempotent updates is an optimization, they do
> > > > not represent missed records. The dropped idempotent updates in
> > > > general do not change the result and so do not need a warn log
> > > > message. Whereas dropped records due to expired windows, serialization
> > > > errors, or lateness might be something concerning that need a warn log
> > > > message.
> > > >
> > > > Looking at the metrics, you would be happy to see
> > > > "dropped-idempotent-updates" increase, because that means Streams gets
> > > > rid of no-ops downstream, but you would be concerned if
> > > > "dropped-records" would increase, because that means your records or
> > > > the configuration of your app has issues. The
> > > > "dropped-idempotent-updates" metric could also be an indication that
> > > > you could further optimize your setup, by getting rid of idempotent
> > > > updates further upstream.
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > > On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > > > >
> > > > > Hello Richard,
> > > > >
> > > > > Thanks for the KIP. I once reviewed it and was concerned about its
> > > effects
> > > > > on stream time advancing. After reading the updated KIP I think it
> > has
> > > > > answered a lot of them already.
> > > > >
> > > > > I have a couple minor comments still, otherwise I'm +1:
> > > > >
> > > > > 1) I want to clarify that for operations resulted in KTables (not
> > only
> > > > > aggregations, but consider KTable#filter that may also result in a
> > new
> > > > > KTable), even if we drop emissions to the downstream topics we would
> > > still
> > > > > append to the corresponding changelog if timestamp has changed. This
> > is
> > > > > because the timestamps on the changelog is read by the standby tasks
> > > which
> > > > > relies on them to infer its own stream time advancing.
> > > > >
> > > > > 2) About the metrics, in KIP-444 we are consolidating all types of
> > > > > scenarios that can cause dropped records to the same metrics:
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> > > > >
> > > > > late-records-drop: INFO at processor node level, replaced by INFO
> > > > > task-level "dropped-records".
> > > > >
> > > > > skipped-records: INFO at thread and processor node level, replaced by
> > > INFO
> > > > > task-level "dropped-records".
> > > > >
> > > > > expired-window-record-drop: DEBUG at state store level, replaced by
> > > INFO
> > > > > task-level "dropped-records".
> > > > >
> > > > > The main idea is that instead of using different metrics to indicate
> > > > > different types of scenarios, and users just alert on that single
> > > metrics.
> > > > > When alert triggers, they can look into the log4j for its causes (we
> > > made
> > > > > sure that all sensor recordings of this metric would be associated
> > > with a
> > > > > warning log4j).
> > > > >
> > > > > So I'd suggest that instead of introducing a new per-node
> > > > > "dropped-idempotent-updates", we just piggy-back on the existing
> > > task-level
> > > > > metric; unless we think that idempotent drops are more frequent than
> > > others
> > > > > and also they do not worth a warning log, in that case we can
> > consider
> > > > > break this metric down with different tags for example.
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Mon, Mar 2, 2020 at 1:59 PM Richard Yu <
> > yohan.richard...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > Thanks for the votes so far!
> > > > > > @Matthias or @Guozhang Wang <guozh...@confluent.io> it would be
> > > great to
> > > > > > also get your input on this KIP.
> > > > > >
> > > > > > It looks to be pretty close to completion, so the finishing touches
> > > are all
> > > > > > we need. :)
> > > > > >
> > > > > > Best,
> > > > > > Richard
> > > > > >
> > > > > > On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine <
> > > > > > ghassan.yamm...@bazaarvoice.com> wrote:
> > > > > >
> > > > > > > Hello all,
> > > > > > >
> > > > > > > +1 (non-binding)
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Ghassan
> > > > > > >
> > > > > > > On 3/2/20, 12:43 PM, "Bruno Cadonna" <br...@confluent.io>
> > wrote:
> > > > > > >
> > > > > > >     EXTERNAL: This email originated from outside of Bazaarvoice.
> > > Do not
> > > > > > > click any links or open any attachments unless you trust the
> > > sender and
> > > > > > > know the content is safe.
> > > > > > >
> > > > > > >
> > > > > > >     Hi Richard,
> > > > > > >
> > > > > > >     +1 (non-binding)
> > > > > > >
> > > > > > >     Best,
> > > > > > >     Bruno
> > > > > > >
> > > > > > >     On Mon, Mar 2, 2020 at 4:33 PM John Roesler <
> > > vvcep...@apache.org>
> > > > > > > wrote:
> > > > > > >     >
> > > > > > >     > Hi Richard,
> > > > > > >     >
> > > > > > >     > Thanks for the KIP!
> > > > > > >     >
> > > > > > >     > I'm +1 (binding)
> > > > > > >     >
> > > > > > >     > -john
> > > > > > >     >
> > > > > > >     > On Thu, Feb 27, 2020, at 14:40, Richard Yu wrote:
> > > > > > >     > > Hi all,
> > > > > > >     > >
> > > > > > >     > > I am proposing a new optimization to Kafka Streams which
> > > would
> > > > > > > greatly
> > > > > > >     > > reduce the number of idempotent updates (or no-ops) in
> > the
> > > Kafka
> > > > > > > Streams
> > > > > > >     > > DAG.
> > > > > > >     > > A number of users have been interested in this feature,
> > so
> > > it
> > > > > > > would be nice
> > > > > > >     > > to pass this one in.
> > > > > > >     > >
> > > > > > >     > > For information, the KIP is described below:
> > > > > > >     > >
> > > > > > >
> > > > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> > > > > > >     > >
> > > > > > >     > > We aim to make Kafka Streams more efficient by adopting
> > > the "emit
> > > > > > > on
> > > > > > >     > > change" reporting strategy.
> > > > > > >     > >
> > > > > > >     > > Please cast your vote!
> > > > > > >     > >
> > > > > > >     > > Best,
> > > > > > >     > > Richard
> > > > > > >     > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >

Reply via email to