vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r851853450
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, fina } } + private void commitActiveTasks(final Set<Task> activeTasksNeedCommit, final AtomicReference<RuntimeException> activeTasksCommitException) { + + final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask = new HashMap<>(); + prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, consumedOffsetsPerTask); + + final Set<Task> dirtyTasks = new HashSet<>(); + try { + taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask); + } catch (final TaskCorruptedException e) { + log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", + e.corruptedTasks()); + + // If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here + dirtyTasks.addAll(tasks.tasks(e.corruptedTasks())); + closeDirtyAndRevive(dirtyTasks, true); + } catch (final RuntimeException e) { + log.error("Exception caught while committing active tasks: " + consumedOffsetsPerTask.keySet(), e); + activeTasksCommitException.compareAndSet(null, e); + dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); + } + + // for non-revoking active tasks, we should not enforce checkpoint + // as it's EOS enabled in which case no checkpoint should be written while + // the task is in RUNNING tate + for (final Task task : activeTasksNeedCommit) { + if (!dirtyTasks.contains(task)) { + try { + task.postCommit(false); Review Comment: hi.. bumping this thread again... plz review whenever you get the chance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org