Pankraz76 commented on code in PR #21160:
URL: https://github.com/apache/kafka/pull/21160#discussion_r2628502136
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -2103,4 +2120,46 @@ Admin adminClient() {
Optional<StreamsRebalanceData> streamsRebalanceData() {
return streamsRebalanceData;
}
+
+ /**
+ * Initialize both WindowedSum instances at exactly the same timestamp so
+ * their windows are aligned from the very beginning.
+ */
+ private void initLatencyWindowsIfNeeded(final long now) {
+ if (!latencyWindowsInitialized) {
+ // Start both windows at the same instant with a zero record
+ pollLatencyWindowedSum.record(metricsConfig, 0.0, now);
+ totalCommitLatencyWindowedSum.record(metricsConfig, 0, now);
+ processLatencyWindowedSum.record(metricsConfig, 0, now);
+ punctuateLatencyWindowedSum.record(metricsConfig, 0, now);
+ runOnceLatencyWindowedSum.record(metricsConfig, 0.0, now);
+ latencyWindowsInitialized = true;
+ }
+ }
+
+ private void recordWindowedSum(final long now,
+ final double pollLatency,
+ final double totalCommitLatency,
+ final double processLatency,
+ final double punctuateLatency,
+ final double runOnceLatency) {
+ pollLatencyWindowedSum.record(metricsConfig, pollLatency, now);
+ totalCommitLatencyWindowedSum.record(metricsConfig,
totalCommitLatency, now);
+ processLatencyWindowedSum.record(metricsConfig, processLatency, now);
+ punctuateLatencyWindowedSum.record(metricsConfig, punctuateLatency,
now);
+ runOnceLatencyWindowedSum.record(metricsConfig, runOnceLatency, now);
+ }
+
+ private void recordRatio(final long now, final WindowedSum windowedSum,
final Sensor ratioSensor) {
+ final double runOnceLatencyWindow =
+ runOnceLatencyWindowedSum.measure(metricsConfig, now);
+
+ if (runOnceLatencyWindow > 0.0) {
+ final double latencyWindow =
+ windowedSum.measure(metricsConfig, now);
+ ratioSensor.record(latencyWindow / runOnceLatencyWindow);
Review Comment:
```suggestion
ratioSensor.record(windowedSum.measure(metricsConfig, now) /
runOnceLatencyWindow);
```
##########
docs/upgrade.html:
##########
@@ -66,6 +66,12 @@ <h5><a id="upgrade_420_notable"
href="#upgrade_420_notable">Notable changes in 4
<li>The behavior of
<code>org.apache.kafka.streams.KafkaStreams#removeStreamThread</code> has been
changed. The consumer has no longer remove once <code>removeStreamThread</code>
finished.
Instead, consumer would be kicked off from the group after
<code>org.apache.kafka.streams.processor.internals.StreamThread</code>
completes its <code>run</code> function.
</li>
+ <li>The streams thread metrics <code>commit-ratio</code>,
<code>process-ratio</code>, <code>punctuate-ratio</code>, and
<code>poll-ratio</code> have been updated.
+ They now report, over a rolling measurement window,
+ the ratio of time this thread spends performing the given action
(<code>{action}</code>) to the total elapsed time in that window.
+ The effective window duration is determined by your metrics
configuration: <code>metrics.sample.window.ms</code> (per-sample window length)
Review Comment:
```suggestion
The effective window duration is determined by the metrics
configuration: <code>metrics.sample.window.ms</code> (per-sample window length)
```
sry idk as well your might be kind of the only usage here.
Could use the generic phrase.
<img width="390" height="77" alt="Image"
src="https://github.com/user-attachments/assets/0c232347-740e-4aff-ba6f-fdf61538fe21"
/>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]