ableegoldman commented on a change in pull request #8775: URL: https://github.com/apache/kafka/pull/8775#discussion_r435585609
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java ########## @@ -302,14 +300,19 @@ public void computeTaskLags(final UUID uuid, final Map<TaskId, Long> allTaskEndO * @return end offset sum - offset sum * Task.LATEST_OFFSET if this was previously an active running task on this client */ - long lagFor(final TaskId task) { - final Long totalLag = taskLagTotals.get(task); + public long lagFor(final TaskId task) { + final Long totalLag; + if (taskLagTotals.isEmpty()) { + // If we couldn't compute the task lags due to failure to fetch offsets, just return a flat constant + totalLag = 0L; Review comment: The value itself doesn't matter, just that it's constant across all tasks. But I'm guessing you meant, why not use the existing `UNKNOWN_OFFSET_SUM` sentinel, in which case the answer is probably just that I forgot about it. Anyway I did a slight additional refactoring beyond this, just fyi: instead of skipping the lag computation when we fail to fetch offsets, we now always initialize the lags and just pass in the `UNKNOWN_OFFSET_SUM` for all stateful tasks when the offset fetch fails. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org