ableegoldman commented on a change in pull request #8248: URL: https://github.com/apache/kafka/pull/8248#discussion_r430787299
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ########## @@ -42,48 +41,36 @@ import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore; public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { - // The below are both null for standby tasks - private final StreamTask streamTask; - private final RecordCollector collector; + private StreamTask streamTask; // always null for standby tasks private final ToInternal toInternal = new ToInternal(); private final static To SEND_TO_ALL = To.all(); final Map<String, String> storeToChangelogTopic = new HashMap<>(); - ProcessorContextImpl(final TaskId id, - final StreamTask streamTask, - final StreamsConfig config, - final RecordCollector collector, - final ProcessorStateManager stateMgr, - final StreamsMetricsImpl metrics, - final ThreadCache cache) { + public ProcessorContextImpl(final TaskId id, + final StreamsConfig config, + final ProcessorStateManager stateMgr, + final StreamsMetricsImpl metrics, + final ThreadCache cache) { super(id, config, metrics, stateMgr, cache); - this.streamTask = streamTask; - this.collector = collector; + } - if (streamTask == null && taskType() == TaskType.ACTIVE) { - throw new IllegalStateException("Tried to create context for active task but the streamtask was null"); - } + @Override + public void transitionTaskType(final TaskType newType, + final ThreadCache cache) { + this.cache = cache; Review comment: good point, all we need is `registerNewTaskAndCache` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ########## @@ -109,7 +94,7 @@ public void logChange(final String storeName, final long timestamp) { throwUnsupportedOperationExceptionIfStandby("logChange"); // Sending null headers to changelog topics (KIP-244) - collector.send( + streamTask.recordCollector().send( Review comment: Sure, if the plan is to ultimately remove `streamTask` from the context then it makes sense to just save the collector reference instead ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ########## @@ -42,48 +41,36 @@ import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore; public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { - // The below are both null for standby tasks - private final StreamTask streamTask; - private final RecordCollector collector; + private StreamTask streamTask; // always null for standby tasks private final ToInternal toInternal = new ToInternal(); private final static To SEND_TO_ALL = To.all(); final Map<String, String> storeToChangelogTopic = new HashMap<>(); - ProcessorContextImpl(final TaskId id, - final StreamTask streamTask, - final StreamsConfig config, - final RecordCollector collector, - final ProcessorStateManager stateMgr, - final StreamsMetricsImpl metrics, - final ThreadCache cache) { + public ProcessorContextImpl(final TaskId id, + final StreamsConfig config, + final ProcessorStateManager stateMgr, + final StreamsMetricsImpl metrics, + final ThreadCache cache) { super(id, config, metrics, stateMgr, cache); - this.streamTask = streamTask; - this.collector = collector; + } - if (streamTask == null && taskType() == TaskType.ACTIVE) { - throw new IllegalStateException("Tried to create context for active task but the streamtask was null"); - } + @Override + public void transitionTaskType(final TaskType newType, + final ThreadCache cache) { + this.cache = cache; + streamTask = null; } - ProcessorContextImpl(final TaskId id, - final StreamsConfig config, - final ProcessorStateManager stateMgr, - final StreamsMetricsImpl metrics) { - this( - id, - null, - config, - null, - stateMgr, - metrics, - new ThreadCache( - new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())), - 0, - metrics - ) - ); + @Override + public void registerNewTask(final Task task) { Review comment: Sure, but there's also a non-deprecated `schedule` method. Won't that still need the reference to `streamTask` or do we somehow plan to remove that as well? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ########## @@ -189,38 +190,52 @@ private void prepareClose(final boolean clean) { @Override public void closeClean(final Map<TopicPartition, Long> checkpoint) { Objects.requireNonNull(checkpoint); - close(true); + close(true, false); log.info("Closed clean"); } @Override public void closeDirty() { - close(false); + close(false, false); log.info("Closed dirty"); } - private void close(final boolean clean) { + @Override + public void closeAndRecycleState() { + prepareClose(true); + close(true, true); Review comment: I was debating this a lot actually. Ultimately I decided to reuse the existing `close` methods because I was worried we might change or fix something in `close` and forget to add it to `closeAndRecycle`. Since `closeAndRecycle` should always be a subset of the actions taken in `close` I thought this would be more future proof...WDYT? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ########## @@ -112,6 +103,7 @@ static void closeStateManager(final Logger log, final AtomicReference<ProcessorStateException> firstException = new AtomicReference<>(null); try { if (stateDirectory.lock(id)) { + Review comment: nope 🙂 ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -272,6 +274,36 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, } } + if (taskCloseExceptions.isEmpty()) { + Task oldTask = null; + final Iterator<Task> transitioningTasksIter = tasksToRecycle.iterator(); + try { + while (transitioningTasksIter.hasNext()) { + oldTask = transitioningTasksIter.next(); + final Task newTask; + if (oldTask.isActive()) { + final Set<TopicPartition> partitions = standbyTasksToCreate.remove(oldTask.id()); + newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions); + } else { + final Set<TopicPartition> partitions = activeTasksToCreate.remove(oldTask.id()); + newTask = activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, partitions, mainConsumer); + } + tasks.remove(oldTask.id()); + addNewTask(newTask); + transitioningTasksIter.remove(); + } + } catch (final RuntimeException e) { + final String uncleanMessage = String.format("Failed to recycle task %s cleanly. Attempting to close remaining tasks before re-throwing:", oldTask.id()); + log.error(uncleanMessage, e); + taskCloseExceptions.put(oldTask.id(), e); + + dirtyTasks.addAll(tasksToRecycle); // contains the tasks we have not yet tried to transition + dirtyTasks.addAll(tasks.values()); // contains the new tasks we just created Review comment: Hm. The current logic is to only close dirty those tasks which were in a failed action (eg close, or in the case of a failed commit we will close all currently assigned active tasks), so I guess we never end up closing standbys, which makes sense. So maybe we shouldn't close any successfully recycled tasks at all, and just let them be closed during `handleLostAll` if necessary? ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java ########## @@ -46,7 +46,6 @@ private final SessionKeySchema keySchema; private final SegmentedCacheFunction cacheFunction; private String cacheName; - private ThreadCache cache; Review comment: Well, the underlying `ThreadCache` changes when we transition task type (as standbys have a dummy size-zero cache). So we need to go through the `context` to make sure we call the right cache ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org