mjsax commented on code in PR #21091:
URL: https://github.com/apache/kafka/pull/21091#discussion_r2601293275


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##########
@@ -148,8 +154,15 @@ private void registerMetrics() {
         e2eLatencySensor = 
StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), 
streamsMetrics);
         iteratorDurationSensor = 
StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, 
name(), streamsMetrics);
         StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
-                (config, now) -> openIterators.sum());
-        openIterators = new OpenIterators(taskId, metricsScope, name(), 
streamsMetrics);
+                (config, now) -> numOpenIterators.sum());
+        StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
+            (config, now) -> {
+                try {
+                    return openIterators.isEmpty() ? 0L : 
openIterators.first().startTimestamp();

Review Comment:
   I think we need both.
   
   We should try to avoid to have a throw-catch on the happy path, but using 
only `return iter.hasNext() ? iter.next().startTimestamp() : 0L;` is not 
sufficient because after `iter.hasNext()` was executed, the last iterator could 
get removed before we call `iter.next()`. Cf 
https://issues.apache.org/jira/browse/KAFKA-17954 and corresponding fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to