mjsax commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r832815852


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection<Task> 
taskWithChangelogs, fina
         }
     }
 
+    private void commitActiveTasks(final Set<Task> activeTasksNeedCommit, 
final AtomicReference<RuntimeException> activeTasksCommitException) {

Review Comment:
   Can't we reuse the existing `commitTasksAndMaybeUpdateCommittableOffsets()` ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -342,7 +342,19 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
 
         maybeThrowTaskExceptions(taskCloseExceptions);
 
-        createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        rebalanceInProgress = true;
+        final Collection<Task> newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        // If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+        // to avoid potential long restoration times.
+        if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+            log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+            final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(tasks.allTasks(), new HashMap<>());

Review Comment:
   We pass in the second parameter to be able to handle timeout exceptions 
correctly (cf. other places where we call 
`commitTasksAndMaybeUpdateCommittableOffsets`) -- here, we don't catch 
`TimeoutException`. What is the impact? Seems it bubble into the consumer and 
crashes us?
   
   Given that we kinda need to commit, it seems there are two things we could 
do: either add a loop here, and retry the commit until we exceed 
`task.timeout.config` (not my personally preferred solution), or actually move 
the whole commit logic into the restore part (not sure if this is easily 
possible) -- ie, each time we enter the restore code, we check if there is an 
open TX and commit. Not sure how this would align this the new state-updated 
core though.
   
   \cc @cadonna @lucasbru -- can you comment?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -342,7 +342,19 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
 
         maybeThrowTaskExceptions(taskCloseExceptions);
 
-        createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        rebalanceInProgress = true;
+        final Collection<Task> newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        // If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+        // to avoid potential long restoration times.
+        if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {

Review Comment:
   Why is this limited to EOS_v2? From my understanding, EOS_v1 would have the 
same problem?
   
   It also only seems to be a potential issue, when we get stateful tasks 
assigned? Not sure if we can limit the last check to 
`!newActiveStatefulTasks.isEmpty()` ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -342,7 +342,19 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
 
         maybeThrowTaskExceptions(taskCloseExceptions);
 
-        createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        rebalanceInProgress = true;
+        final Collection<Task> newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);

Review Comment:
   Seems we only the count (or a boolean) if empty or not, but not the full 
collection. Can we simplify this?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -342,7 +342,19 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
 
         maybeThrowTaskExceptions(taskCloseExceptions);
 
-        createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        rebalanceInProgress = true;

Review Comment:
   If we set `rebalanceInProgress = true`, it seems the later call to 
`commitTasksAndMaybeUpdateCommittableOffsets` would exit early and return `-1`? 
Does not seem right?
   
   I cannot remember the full purpose of `rebalanceInProgress` flag, and given 
cooperator rebalancing and processing record during a rebalance, wondering if 
semantics actually changed?
   
   Edit: just see that there is a discussion about this below.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -342,7 +342,18 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
 
         maybeThrowTaskExceptions(taskCloseExceptions);
 
-        createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        final Collection<Task> newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        // If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+        // to avoid potential long restoration times.
+        if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+            log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+            final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+            if (numCommitted == -1) {

Review Comment:
   Seems we should not mess with the control flow here. Me might end up with 
spagetti-code. I would advocate the move the commit logic into the restore code 
path if possible as mentioned further above -- this way, the existing control 
flow is not changed and we avoid all these compilations.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -493,13 +542,22 @@ boolean tryToCompleteRestoration(final long now, final 
java.util.function.Consum
                     // we found a restoring task that isn't done restoring, 
which is evidence that
                     // not all tasks are running
                     allRunning = false;
+                    restoringTasks.add(task);
                 }
             }
         }
 
         if (allRunning) {
             // we can call resume multiple times since it is idempotent.
             mainConsumer.resume(mainConsumer.assignment());
+        } else {
+            // There are still some tasks in RESTORING phase.
+            final AtomicReference<RuntimeException> activeTasksCommitException 
= new AtomicReference<>(null);
+            commitActiveTasks(restoringTasks, activeTasksCommitException);

Review Comment:
   Why do we commit `restoringTasks`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -342,7 +342,18 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
 
         maybeThrowTaskExceptions(taskCloseExceptions);
 
-        createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        final Collection<Task> newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        // If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+        // to avoid potential long restoration times.
+        if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+            log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");

Review Comment:
   Is INFO the right log level? Seem DEBUG might be more appropriate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to