ableegoldman commented on a change in pull request #11738:
URL: https://github.com/apache/kafka/pull/11738#discussion_r809780324



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.slf4j.Logger;
+
+import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA;
+import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
+
+/**
+ * Single-threaded executor class for the active tasks assigned to this thread.
+ */
+public class TaskExecutor {
+
+    private final Logger log;
+
+    private final ProcessingMode processingMode;
+    private final Tasks tasks;
+
+    public TaskExecutor(final Tasks tasks, final ProcessingMode 
processingMode, final LogContext logContext) {
+        this.tasks = tasks;
+        this.processingMode = processingMode;
+        this.log = logContext.logger(getClass());
+    }
+
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     * @throws StreamsException      if any task threw an exception while 
processing
+     */
+    int process(final int maxNumRecords, final Time time) {
+        int totalProcessed = 0;
+
+        for (final Task task : tasks.activeTasks()) {
+            totalProcessed += processTask(task, maxNumRecords, time);
+        }
+
+        return totalProcessed;
+    }
+
+    private long processTask(final Task task, final int maxNumRecords, final 
Time time) {
+        int processed = 0;
+        long now = time.milliseconds();
+
+        final long then = now;
+        try {
+            while (processed < maxNumRecords && task.process(now)) {
+                task.clearTaskTimeout();
+                processed++;
+            }
+        } catch (final TimeoutException timeoutException) {
+            task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
+            log.debug(
+                String.format(
+                    "Could not complete processing records for %s due to the 
following exception; will move to next task and retry later",
+                    task.id()),
+                timeoutException
+            );
+        } catch (final TaskMigratedException e) {
+            log.info("Failed to process stream task {} since it got migrated 
to another thread already. " +
+                "Will trigger a new rebalance and close all tasks as zombies 
together.", task.id());
+            throw e;
+        } catch (final StreamsException e) {
+            log.error("Failed to process stream task {} due to the following 
error:", task.id(), e);
+            e.setTaskId(task.id());
+            throw e;
+        } catch (final RuntimeException e) {
+            log.error("Failed to process stream task {} due to the following 
error:", task.id(), e);
+            throw new StreamsException(e, task.id());
+        } finally {
+            now = time.milliseconds();
+            task.recordProcessBatchTime(now - then);
+        }
+        return processed;
+    }
+
+    /**
+     * @throws TaskMigratedException if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
+     * @throws TimeoutException if committing offsets failed due to 
TimeoutException (non-EOS)
+     * @throws TaskCorruptedException if committing offsets failed due to 
TimeoutException (EOS)
+     * @param consumedOffsetsAndMetadata an empty map that will be filled in 
with the prepared offsets
+     * @return number of committed offsets, or -1 if we are in the middle of a 
rebalance and cannot commit
+     */
+    int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> 
tasksToCommit,
+                                                            final Map<Task, 
Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadata) {

Review comment:
       ack, I'll fix in the followup




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to