eduwercamacaro commented on code in PR #20954:
URL: https://github.com/apache/kafka/pull/20954#discussion_r2603012438


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -309,6 +311,46 @@ private void closeStartupTasks(final Predicate<Task> 
predicate) {
         }
     }
 
+    public Map<TaskId, Long> taskOffsetSums(final Set<TaskId> tasks) {

Review Comment:
   It would be nice to add a unit test that verifies the output of this method 
after updating the task offsets. 
   
   Also, I think this method should be considered the source of true for the 
current lag of each taskId, so I would suggest modifying this method to return 
an unmodifiable map instead of a modifiable one. However, this will break the 
TaskManager when it tries to put a `Task.LATEST_OFFSET` for the tasks that 
didn't have a cached offset, but we can do that on the scope of this method. 
   
   In other words, it is making this method to return an offset (either a valid 
one or Task.LATEST_OFFSET) for every single TaskId received in the arguments. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to