ableegoldman commented on a change in pull request #11675:
URL: https://github.com/apache/kafka/pull/11675#discussion_r787472259



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -84,6 +84,7 @@
     private final StateDirectory stateDirectory;
     private final StreamThread.ProcessingMode processingMode;
     private final Tasks tasks;
+    private final List<Task> successfullyProcessed = new ArrayList<>();

Review comment:
       nit: put this in the `Tasks` class with the other metadata

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -788,7 +788,15 @@ void runOnce() {
              */
             do {
                 log.debug("Processing tasks with {} iterations.", 
numIterations);
-                final int processed = taskManager.process(numIterations, time);
+                final int processed;
+                try {
+                    processed = taskManager.process(numIterations, time);
+                } catch (final Exception e) {
+                    log.error("encountered an error when processing tasks." +
+                        " Will commit all previously successfully processed 
tasks", e);
+                    taskManager.commitSuccessfullyProcessedTasks();

Review comment:
       nit: why not just keep this logic within `taskManager#process` and make 
`#commitSuccessfullyProcessedTasks` private?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1321,11 +1322,22 @@ int process(final int maxNumRecords, final Time time) {
                 totalProcessed += processed;
                 task.recordProcessBatchTime(now - then);
             }
+            successfullyProcessed.add(task);
         }
-
+        successfullyProcessed.clear();
         return totalProcessed;
     }
 
+    public int commitSuccessfullyProcessedTasks() {
+        final int committed = commit(successfullyProcessed);

Review comment:
       @mjsax please double check my understanding here since it's been a while 
since I thought about/worked with this stuff, but IIRC it's important to note 
that under the current Producer semantics at least, we can only do this kind of 
partial commit when using ALOS or EOS-v1. In KIP-447/EOS-v2, if one 
task/partition is bad then the transaction will need to be aborted, and thus 
all partitions/tasks will unfortunately need to be aborted as well. 
   
   The API is a bit misleading since it implies you can choose which offsets to 
send to the transaction, but I recall @mjsax mentioning that the transaction is 
applied across all partitions at a time

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1321,11 +1322,22 @@ int process(final int maxNumRecords, final Time time) {
                 totalProcessed += processed;
                 task.recordProcessBatchTime(now - then);
             }
+            successfullyProcessed.add(task);
         }
-
+        successfullyProcessed.clear();

Review comment:
       nit: if we just clear it at the beginning of `#process` then we only 
need to clear it in one place, vs clearing it here and also in 
`#commitSuccessfullyProcessedTasks`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to