Hello Bruno, Thanks for the feedbacks, replied inline.
On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io> wrote: > Hi Guozhang, > > Thank you for the KIP. > > 1) As far as I understand, the StreamsMetrics interface is there for > user-defined processors. Would it make sense to also add a method to > the interface to specify a sensor that records skipped records? > > Not sure I follow.. if users want to add a specific skipped records sensor, she can still do that as a "throughput" sensor via " addThroughputSensor" and then "record" right? As an after-thought, maybe it's better to rename `throughput` to `rate` in the public APIs since it is really meant for the latter semantics. I did not change it just to make less API changes / deprecate fewer functions. But if we feel it is important we can change it as well. > 2) What are the semantics of active-task-process and standby-task-process > > Ah good catch, I think I made it in the wrong column. Just some explanations here: Within a thread's looped iterations, it will first try to process some records from the active tasks, and then see if there are any standby-tasks that can be processed as well (i.e. just reading from the restore consumer and apply to the local stores). The ratio metrics are for indicating 1) what tasks (active or standby) does this thread own so far, and 2) how much time in percentage does it spend on each of them. But this metric should really be a task-level one that includes both the thread-id and task-id, and upon task migrations they will be dynamically deleted / (re)-created. For each task-id it may be owned by multiple threads as one active and others standby, and hence the separation of active / standby seems still necessary. > 3) How do dropped-late-records and expired-window-record-drop relate > to each other? I guess the former is for records that fall outside the > grace period and the latter is for records that are processed after > the retention period of the window. Is this correct? > > Yes, that's correct. The names are indeed a bit confusing since they are added at different releases historically.. More precisely, the `grace period` is a notion of the operator (hence the metric is node-level, though it would only be used for DSL operators) while the `retention` is a notion of the store (hence the metric is store-level). Usually grace period will be smaller than store retention though. Processor node is aware of `grace period` and when received a record that is older than grace deadline, it will be dropped immediately; otherwise it will still be processed a maybe a new update is "put" into the store. The store is aware of its `retention period` and then upon a "put" call if it realized it is older than the retention deadline, that put call would be ignored and metric is recorded. We have to separate them here since the window store can be used in both DSL and PAPI, and for the former case it would likely to be already ignored at the processor node level due to the grace period which is usually smaller than retention; but for PAPI there's no grace period and hence the processor would likely still process and call "put" on the store. > 4) Is there an actual difference between skipped and dropped records? > If not, shall we unify the terminology? > > There is. Dropped records are only due to lateness; where as skipped records can be due to serde errors (and user's error handling indicate "skip and continue"), timestamp errors, etc. I've considered maybe a better (more extensible) way would be defining a single metric name, say skipped-records, but use different tags to indicate if its skipping reason (errors, windowing semantics, etc). But there's still a tricky difference: for serde caused skipping for example, they will be skipped at the very beginning and there's no effects taken at all. For some others e.g. null-key / value at the reduce operator, it is only skipped at the middle of the processing, i.e. some effects may have already been taken in up-stream sub-topologies. And that's why for skipped-records I've defined it on both task-level and node-level and the aggregate of the latter may still be smaller than the former, whereas for dropped-records it is only for node-level. So how about an even more significant change then: we enlarge the `dropped-late-records` to `dropped-records` which is node-level only, but includes reasons form lateness to semantics (like null-key) as well; and then we have a task-level-only `skipped-records` which only record those dropped at the very beginning and did not make it at all to the processing topology. I feel this is a clearer distinguishment but also a bigger change to users. > 5) What happens with removed metrics when the user sets the version of > "built.in.metrics.version" to 2.2- > > I think for those redundant ones like ""forward-rate" and "destroy-rate" we can still remove them with 2.2- as well; for other ones that are removed / replaced like thread-level skipped-records we should still maintain them. Best, > Bruno > > On Thu, Jun 27, 2019 at 6:11 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > > Hello folks, > > > > As 2.3 is released now, I'd like to bump up this KIP discussion again for > > your reviews. > > > > > > Guozhang > > > > > > On Thu, May 23, 2019 at 4:44 PM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hello Patrik, > > > > > > Since we are rolling out 2.3 and everyone is busy with the release now > > > this KIP does not have much discussion involved yet and will slip into > the > > > next release cadence. > > > > > > This KIP itself contains several parts itself: 1. refactoring the > existing > > > metrics hierarchy to cleanup some redundancy and also get more > clarity; 2. > > > add instance-level metrics like rebalance and state metrics, as well as > > > other static metrics. > > > > > > > > > Guozhang > > > > > > > > > > > > On Thu, May 23, 2019 at 5:34 AM Patrik Kleindl <pklei...@gmail.com> > wrote: > > > > > >> Hi Guozhang > > >> Thanks for the KIP, this looks very helpful. > > >> Could you please provide more detail on the metrics planned for the > state? > > >> We were just considering how to implement this ourselves because we > need > > >> to > > >> track the history of stage changes. > > >> The idea was to have an accumulated "seconds in state x" metric for > every > > >> state. > > >> The new rebalance metric might solve part of our use case, but it is > > >> interesting what you have planned for the state metric. > > >> best regards > > >> Patrik > > >> > > >> On Fri, 29 Mar 2019 at 18:56, Guozhang Wang <wangg...@gmail.com> > wrote: > > >> > > >> > Hello folks, > > >> > > > >> > I'd like to propose the following KIP to improve the Kafka Streams > > >> metrics > > >> > mechanism to users. This includes 1) a minor change in the public > > >> > StreamsMetrics API, and 2) a major cleanup on the Streams' own > built-in > > >> > metrics hierarchy. > > >> > > > >> > Details can be found here: > > >> > > > >> > > > >> > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams > > >> > > > >> > I'd love to hear your thoughts and feedbacks. Thanks! > > >> > > > >> > -- > > >> > -- Guozhang > > >> > > > >> > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > > -- Guozhang > -- -- Guozhang