ableegoldman commented on a change in pull request #11787:
URL: https://github.com/apache/kafka/pull/11787#discussion_r812573407
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
##########
@@ -63,8 +68,21 @@ public TaskExecutor(final Tasks tasks, final ProcessingMode
processingMode, fina
int process(final int maxNumRecords, final Time time) {
int totalProcessed = 0;
- for (final Task task : tasks.activeTasks()) {
- totalProcessed += processTask(task, maxNumRecords, time);
+ for (final Map.Entry<String, Set<StreamTask>> topologyEntry :
tasks.activeTasksByTopology().entrySet()) {
Review comment:
Guess this was a case of premature optimization -- I'll update with your
suggestion for this PR and work out a better solution that doesn't skew
processing for the later PR where this matters (for some context, I did this in
part because in one of the followups we will back off entire named topologies
when one task is failing recurringly , to avoid getting out of sync, in which
case it seemed wasteful to check each task in the topology if we already know
it's not ready to process.
But we can revisit this when we get to that PR 🙂
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]