eduwercamacaro commented on code in PR #20954:
URL: https://github.com/apache/kafka/pull/20954#discussion_r2603012438
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -309,6 +311,46 @@ private void closeStartupTasks(final Predicate<Task>
predicate) {
}
}
+ public Map<TaskId, Long> taskOffsetSums(final Set<TaskId> tasks) {
Review Comment:
It would be nice to add a unit test that verifies the output of this method
after updating the task offsets.
Also, I think this method should be considered the source of true for the
current lag of each taskId, so I would suggest modifying this method to return
an unmodifiable map instead of a modifiable one. However, this will break the
TaskManager when it tries to put a `Task.LATEST_OFFSET` for the tasks that
didn't have a cached offset, but we can do that on the scope of this method.
In other words, it is making this method to return an offset (either a valid
one or Task.LATEST_OFFSET) for every single TaskId received in the arguments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]