Hi all,

I have decided to pass this KIP with 2 binding votes and 3 non-binding
votes (including mine).
I will update KIP status shortly after this.

Best,
Richard

On Thu, Mar 5, 2020 at 3:45 PM Richard Yu <yohan.richard...@gmail.com>
wrote:

> Hi all,
>
> Just polling for some last changes on the name.
> I think that since there doesn't seem to be much objection to any major
> changes in the KIP, I will pass it this Friday.
>
> If you feel that we still need some more discussion, please let me know. :)
>
> Best,
> Richard
>
> P.S. Will start working on a PR for this one soon.
>
> On Wed, Mar 4, 2020 at 1:30 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Regarding the metric name, I was actually trying to be consistent with the
>> node-level `suppression-emit` as I feel this one's characteristics is
>> closer to that. I other folks feels better to align with the task-level
>> "dropped-records" I think I can be convinced too.
>>
>>
>> Guozhang
>>
>> On Wed, Mar 4, 2020 at 12:09 AM Bruno Cadonna <br...@confluent.io> wrote:
>>
>> > Hi all,
>> >
>> > may I make a non-binding proposal for the metric name? I would prefer
>> > "skipped-idempotent-updates" to be consistent with the
>> > "dropped-records".
>> >
>> > Best,
>> > Bruno
>> >
>> > On Tue, Mar 3, 2020 at 11:57 PM Richard Yu <yohan.richard...@gmail.com>
>> > wrote:
>> > >
>> > > Hi all,
>> > >
>> > > Thanks for the discussion!
>> > >
>> > > @Guozhang, I will make the corresponding changes to the KIP (i.e.
>> > renaming
>> > > the sensor and adding some notes).
>> > > With the current state of things, we are very close. Just need that
>> one
>> > > last binding vote.
>> > >
>> > > @Matthias J. Sax <matth...@confluent.io>  It would be ideal if we can
>> > also
>> > > get your last two cents on this as well.
>> > > Other than that, we are good.
>> > >
>> > > Best,
>> > > Richard
>> > >
>> > >
>> > > On Tue, Mar 3, 2020 at 10:46 AM Guozhang Wang <wangg...@gmail.com>
>> > wrote:
>> > >
>> > > > Hi Bruno, John:
>> > > >
>> > > > 1) That makes sense. If we consider them to be node-specific metrics
>> > that
>> > > > only applies to a subset of built-in processor nodes that are
>> > irrelevant to
>> > > > alert-relevant metrics (just like suppression-emit (rate | total)),
>> > they'd
>> > > > better be per-node instead of per-task and we would not associate
>> such
>> > > > events with warning. With that in mind, I'd suggest we consider
>> > renaming
>> > > > the metric without the `dropped` keyword to distinguish it with the
>> > > > per-task level sensor. How about "idempotent-update-skip (rate |
>> > total)"?
>> > > >
>> > > > Also a minor suggestion: we should clarify in the KIP / javadocs
>> which
>> > > > built-in processor nodes would have this metric while others don't.
>> > > >
>> > > > 2) About stream time tracking, there are multiple known issues that
>> we
>> > > > should close to improve our consistency semantics:
>> > > >
>> > > >  a. preserve stream time of active tasks across rebalances where
>> they
>> > may
>> > > > be migrated. This is what KAFKA-9368
>> > > > <https://issues.apache.org/jira/browse/KAFKA-9368> meant for.
>> > > >  b. preserve stream time of standby tasks to be aligned with the
>> active
>> > > > tasks, via the changelog topics.
>> > > >
>> > > > And what I'm more concerning is b) here. For example: let's say we
>> > have a
>> > > > topology of `source -> A -> repartition -> B` where both A and B
>> have
>> > > > states along with changelogs, and both of them have standbys. If a
>> > record
>> > > > is piped from the source and completed traversed through the
>> topology,
>> > we
>> > > > need to make sure that the stream time inferred across:
>> > > >
>> > > > * active task A (inferred from the source record),
>> > > > * active task B (inferred from the derived record from repartition
>> > topic),
>> > > > * standby task A (inferred from the changelog topic of A's store),
>> > > > * standby task B (inferred from the changelog topic of B's store)
>> > > >
>> > > > are consistent (note I'm not saying they should be "exactly the
>> same",
>> > but
>> > > > consistent, meaning that they may have different values but as long
>> as
>> > that
>> > > > does not impact the time-based queries, it is fine). The main
>> > motivation is
>> > > > that on IQ, where both active and standby tasks could be accessed,
>> we
>> > can
>> > > > eventually improve our consistency guarantee to have 1)
>> > read-your-write, 2)
>> > > > consistency across stores, etc.
>> > > >
>> > > > I agree with John's assessment in the previous email, and just to
>> > clarify
>> > > > more concretely what I'm thinking.
>> > > >
>> > > >
>> > > > Guozhang
>> > > >
>> > > >
>> > > > On Tue, Mar 3, 2020 at 9:03 AM John Roesler <vvcep...@apache.org>
>> > wrote:
>> > > >
>> > > > > Thanks, Guozhang and Bruno!
>> > > > >
>> > > > > 2)
>> > > > > I had a similar though to both of you about the metrics, but I
>> > ultimately
>> > > > > came out with a conclusion like Bruno's. These aren't dropped
>> invalid
>> > > > > records, they're intentionally dropped, valid, but unnecessary,
>> > updates.
>> > > > > A "warning" for this case definitely seems wrong, and I'd also not
>> > > > > recommend
>> > > > > counting these events along with "dropped-records", because those
>> are
>> > > > > all dropped invalid records, e.g., late or null-keyed or couldn't
>> be
>> > > > > deserialized.
>> > > > >
>> > > > > Like Bruno pointed out, an operator should be concerned to see
>> > > > > non-zero "dropped-records", and would then consult the logs for
>> > warnings.
>> > > > > But that same person should be happy to see
>> > "dropped-idempotent-updates"
>> > > > > increasing, since it means they're saving time and money. Maybe
>> the
>> > name
>> > > > > of the metric could be different, but I couldn't think of a better
>> > one.
>> > > > > OTOH,
>> > > > > maybe it just stands out to us because we recently discussed those
>> > other
>> > > > > metrics in KIP-444?
>> > > > >
>> > > > > 1)
>> > > > > Maybe we should discuss this point more. It seems like we should
>> > maintain
>> > > > > an invariant that the following three objects always have exactly
>> the
>> > > > same
>> > > > > state (modulo flush boundaries):
>> > > > > 1. The internal state store
>> > > > > 2. The changelog
>> > > > > 3. The operation's result view
>> > > > >
>> > > > > That is, if I have a materialized Filter, then it seems like I
>> _must_
>> > > > store
>> > > > > exactly the same record in the store and the changelog, and also
>> > forward
>> > > > > the exact same record, including the timestamp, to the downstream
>> > > > > operations.
>> > > > >
>> > > > > If we store something different in the internal state store than
>> the
>> > > > > changelog, we can get a situation where the state is actually
>> > different
>> > > > > after
>> > > > > restoration than it is during processing, and queries against
>> > standbys
>> > > > > would
>> > > > > return different results than queries against the active tasks.
>> > > > >
>> > > > > Regarding storing something different in the store+changelog than
>> we
>> > > > > forward downstream, consider the following topology:
>> > > > > sourceTable
>> > > > >   .filter(someFilter, Materialized.as("f1"))
>> > > > >   .filter(_ -> true, Materialized.as("f2"))
>> > > > >
>> > > > > If we didn't forward exactly the same data we store, then
>> querying f2
>> > > > > would return different results than querying f1, which is clearly
>> not
>> > > > > correct, given the topology.
>> > > > >
>> > > > > It seems like maybe what you have in mind is the preservation of
>> > stream
>> > > > > time across restart/rebalance? This bug is still open, actually:
>> > > > > https://issues.apache.org/jira/browse/KAFKA-9368
>> > > > > It seems like solving that bug would be independent of KIP-557.
>> I.e.,
>> > > > > KIP-557 neither makes that bug worse or better.
>> > > > >
>> > > > > One other thought I had is maybe you were thinking that operators
>> > > > > would update their internally tracked stream time, but still
>> discard
>> > > > > records? I think that _would_ be a bug. That is, if a record gets
>> > > > discarded
>> > > > > as idempotent, it should have no effect at all on the state of the
>> > > > > application.
>> > > > > Reflecting on my prior analysis of stream time, most of the cases
>> > where
>> > > > we
>> > > > > track stream time is in Stream aggregations, and in those cases,
>> if
>> > an
>> > > > > incoming record's timestamp is higher than the previous stream
>> time,
>> > it
>> > > > > would already not be considered idempotent. So we would store,
>> log,
>> > and
>> > > > > forward the result with the new timestamp.
>> > > > > The only other case is Suppress. With respect to idempotence,
>> > Suppress is
>> > > > > equivalent to a stateless no-op transformation. All it does is
>> > collect
>> > > > and
>> > > > > delay
>> > > > > updates. It has no memory of what it previously emitted, so it
>> > wouldn't
>> > > > > be possible for it to check for idempotence anyway.
>> > > > >
>> > > > > Was that what you were thinking?
>> > > > > Thanks,
>> > > > > -John
>> > > > >
>> > > > >
>> > > > > On Tue, Mar 3, 2020, at 02:34, Bruno Cadonna wrote:
>> > > > > > Hi Guozhang,
>> > > > > >
>> > > > > > I also had the same thought about using the existing
>> > "dropped-records"
>> > > > > > metrics. However, I think in this case it would be better to
>> use a
>> > new
>> > > > > > metric because dropped idempotent updates is an optimization,
>> they
>> > do
>> > > > > > not represent missed records. The dropped idempotent updates in
>> > > > > > general do not change the result and so do not need a warn log
>> > > > > > message. Whereas dropped records due to expired windows,
>> > serialization
>> > > > > > errors, or lateness might be something concerning that need a
>> warn
>> > log
>> > > > > > message.
>> > > > > >
>> > > > > > Looking at the metrics, you would be happy to see
>> > > > > > "dropped-idempotent-updates" increase, because that means
>> Streams
>> > gets
>> > > > > > rid of no-ops downstream, but you would be concerned if
>> > > > > > "dropped-records" would increase, because that means your
>> records
>> > or
>> > > > > > the configuration of your app has issues. The
>> > > > > > "dropped-idempotent-updates" metric could also be an indication
>> > that
>> > > > > > you could further optimize your setup, by getting rid of
>> idempotent
>> > > > > > updates further upstream.
>> > > > > >
>> > > > > > Best,
>> > > > > > Bruno
>> > > > > >
>> > > > > > On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang <
>> wangg...@gmail.com>
>> > > > wrote:
>> > > > > > >
>> > > > > > > Hello Richard,
>> > > > > > >
>> > > > > > > Thanks for the KIP. I once reviewed it and was concerned about
>> > its
>> > > > > effects
>> > > > > > > on stream time advancing. After reading the updated KIP I
>> think
>> > it
>> > > > has
>> > > > > > > answered a lot of them already.
>> > > > > > >
>> > > > > > > I have a couple minor comments still, otherwise I'm +1:
>> > > > > > >
>> > > > > > > 1) I want to clarify that for operations resulted in KTables
>> (not
>> > > > only
>> > > > > > > aggregations, but consider KTable#filter that may also result
>> in
>> > a
>> > > > new
>> > > > > > > KTable), even if we drop emissions to the downstream topics we
>> > would
>> > > > > still
>> > > > > > > append to the corresponding changelog if timestamp has
>> changed.
>> > This
>> > > > is
>> > > > > > > because the timestamps on the changelog is read by the standby
>> > tasks
>> > > > > which
>> > > > > > > relies on them to infer its own stream time advancing.
>> > > > > > >
>> > > > > > > 2) About the metrics, in KIP-444 we are consolidating all
>> types
>> > of
>> > > > > > > scenarios that can cause dropped records to the same metrics:
>> > > > > > >
>> > > > >
>> > > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
>> > > > > > >
>> > > > > > > late-records-drop: INFO at processor node level, replaced by
>> INFO
>> > > > > > > task-level "dropped-records".
>> > > > > > >
>> > > > > > > skipped-records: INFO at thread and processor node level,
>> > replaced by
>> > > > > INFO
>> > > > > > > task-level "dropped-records".
>> > > > > > >
>> > > > > > > expired-window-record-drop: DEBUG at state store level,
>> replaced
>> > by
>> > > > > INFO
>> > > > > > > task-level "dropped-records".
>> > > > > > >
>> > > > > > > The main idea is that instead of using different metrics to
>> > indicate
>> > > > > > > different types of scenarios, and users just alert on that
>> single
>> > > > > metrics.
>> > > > > > > When alert triggers, they can look into the log4j for its
>> causes
>> > (we
>> > > > > made
>> > > > > > > sure that all sensor recordings of this metric would be
>> > associated
>> > > > > with a
>> > > > > > > warning log4j).
>> > > > > > >
>> > > > > > > So I'd suggest that instead of introducing a new per-node
>> > > > > > > "dropped-idempotent-updates", we just piggy-back on the
>> existing
>> > > > > task-level
>> > > > > > > metric; unless we think that idempotent drops are more
>> frequent
>> > than
>> > > > > others
>> > > > > > > and also they do not worth a warning log, in that case we can
>> > > > consider
>> > > > > > > break this metric down with different tags for example.
>> > > > > > >
>> > > > > > > Guozhang
>> > > > > > >
>> > > > > > > On Mon, Mar 2, 2020 at 1:59 PM Richard Yu <
>> > > > yohan.richard...@gmail.com>
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi all,
>> > > > > > > >
>> > > > > > > > Thanks for the votes so far!
>> > > > > > > > @Matthias or @Guozhang Wang <guozh...@confluent.io> it
>> would
>> > be
>> > > > > great to
>> > > > > > > > also get your input on this KIP.
>> > > > > > > >
>> > > > > > > > It looks to be pretty close to completion, so the finishing
>> > touches
>> > > > > are all
>> > > > > > > > we need. :)
>> > > > > > > >
>> > > > > > > > Best,
>> > > > > > > > Richard
>> > > > > > > >
>> > > > > > > > On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine <
>> > > > > > > > ghassan.yamm...@bazaarvoice.com> wrote:
>> > > > > > > >
>> > > > > > > > > Hello all,
>> > > > > > > > >
>> > > > > > > > > +1 (non-binding)
>> > > > > > > > >
>> > > > > > > > > Thanks,
>> > > > > > > > >
>> > > > > > > > > Ghassan
>> > > > > > > > >
>> > > > > > > > > On 3/2/20, 12:43 PM, "Bruno Cadonna" <br...@confluent.io
>> >
>> > > > wrote:
>> > > > > > > > >
>> > > > > > > > >     EXTERNAL: This email originated from outside of
>> > Bazaarvoice.
>> > > > > Do not
>> > > > > > > > > click any links or open any attachments unless you trust
>> the
>> > > > > sender and
>> > > > > > > > > know the content is safe.
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >     Hi Richard,
>> > > > > > > > >
>> > > > > > > > >     +1 (non-binding)
>> > > > > > > > >
>> > > > > > > > >     Best,
>> > > > > > > > >     Bruno
>> > > > > > > > >
>> > > > > > > > >     On Mon, Mar 2, 2020 at 4:33 PM John Roesler <
>> > > > > vvcep...@apache.org>
>> > > > > > > > > wrote:
>> > > > > > > > >     >
>> > > > > > > > >     > Hi Richard,
>> > > > > > > > >     >
>> > > > > > > > >     > Thanks for the KIP!
>> > > > > > > > >     >
>> > > > > > > > >     > I'm +1 (binding)
>> > > > > > > > >     >
>> > > > > > > > >     > -john
>> > > > > > > > >     >
>> > > > > > > > >     > On Thu, Feb 27, 2020, at 14:40, Richard Yu wrote:
>> > > > > > > > >     > > Hi all,
>> > > > > > > > >     > >
>> > > > > > > > >     > > I am proposing a new optimization to Kafka Streams
>> > which
>> > > > > would
>> > > > > > > > > greatly
>> > > > > > > > >     > > reduce the number of idempotent updates (or
>> no-ops)
>> > in
>> > > > the
>> > > > > Kafka
>> > > > > > > > > Streams
>> > > > > > > > >     > > DAG.
>> > > > > > > > >     > > A number of users have been interested in this
>> > feature,
>> > > > so
>> > > > > it
>> > > > > > > > > would be nice
>> > > > > > > > >     > > to pass this one in.
>> > > > > > > > >     > >
>> > > > > > > > >     > > For information, the KIP is described below:
>> > > > > > > > >     > >
>> > > > > > > > >
>> > > > > > > >
>> > > > >
>> > > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
>> > > > > > > > >     > >
>> > > > > > > > >     > > We aim to make Kafka Streams more efficient by
>> > adopting
>> > > > > the "emit
>> > > > > > > > > on
>> > > > > > > > >     > > change" reporting strategy.
>> > > > > > > > >     > >
>> > > > > > > > >     > > Please cast your vote!
>> > > > > > > > >     > >
>> > > > > > > > >     > > Best,
>> > > > > > > > >     > > Richard
>> > > > > > > > >     > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > --
>> > > > > > > -- Guozhang
>> > > > > >
>> > > > >
>> > > >
>> > > >
>> > > > --
>> > > > -- Guozhang
>> > > >
>> >
>>
>>
>> --
>> -- Guozhang
>>
>

Reply via email to