ableegoldman commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r430787299



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -42,48 +41,36 @@
 import static 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
 public class ProcessorContextImpl extends AbstractProcessorContext implements 
RecordCollector.Supplier {
-    // The below are both null for standby tasks
-    private final StreamTask streamTask;
-    private final RecordCollector collector;
+    private StreamTask streamTask;     // always null for standby tasks
 
     private final ToInternal toInternal = new ToInternal();
     private final static To SEND_TO_ALL = To.all();
 
     final Map<String, String> storeToChangelogTopic = new HashMap<>();
 
-    ProcessorContextImpl(final TaskId id,
-                         final StreamTask streamTask,
-                         final StreamsConfig config,
-                         final RecordCollector collector,
-                         final ProcessorStateManager stateMgr,
-                         final StreamsMetricsImpl metrics,
-                         final ThreadCache cache) {
+    public ProcessorContextImpl(final TaskId id,
+                                final StreamsConfig config,
+                                final ProcessorStateManager stateMgr,
+                                final StreamsMetricsImpl metrics,
+                                final ThreadCache cache) {
         super(id, config, metrics, stateMgr, cache);
-        this.streamTask = streamTask;
-        this.collector = collector;
+    }
 
-        if (streamTask == null && taskType() == TaskType.ACTIVE) {
-            throw new IllegalStateException("Tried to create context for 
active task but the streamtask was null");
-        }
+    @Override
+    public void transitionTaskType(final TaskType newType,
+                                   final ThreadCache cache) {
+        this.cache = cache;

Review comment:
       good point, all we need is `registerNewTaskAndCache`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -109,7 +94,7 @@ public void logChange(final String storeName,
                           final long timestamp) {
         throwUnsupportedOperationExceptionIfStandby("logChange");
         // Sending null headers to changelog topics (KIP-244)
-        collector.send(
+        streamTask.recordCollector().send(

Review comment:
       Sure, if the plan is to ultimately remove `streamTask` from the context 
then it makes sense to just save the collector reference instead

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -42,48 +41,36 @@
 import static 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
 public class ProcessorContextImpl extends AbstractProcessorContext implements 
RecordCollector.Supplier {
-    // The below are both null for standby tasks
-    private final StreamTask streamTask;
-    private final RecordCollector collector;
+    private StreamTask streamTask;     // always null for standby tasks
 
     private final ToInternal toInternal = new ToInternal();
     private final static To SEND_TO_ALL = To.all();
 
     final Map<String, String> storeToChangelogTopic = new HashMap<>();
 
-    ProcessorContextImpl(final TaskId id,
-                         final StreamTask streamTask,
-                         final StreamsConfig config,
-                         final RecordCollector collector,
-                         final ProcessorStateManager stateMgr,
-                         final StreamsMetricsImpl metrics,
-                         final ThreadCache cache) {
+    public ProcessorContextImpl(final TaskId id,
+                                final StreamsConfig config,
+                                final ProcessorStateManager stateMgr,
+                                final StreamsMetricsImpl metrics,
+                                final ThreadCache cache) {
         super(id, config, metrics, stateMgr, cache);
-        this.streamTask = streamTask;
-        this.collector = collector;
+    }
 
-        if (streamTask == null && taskType() == TaskType.ACTIVE) {
-            throw new IllegalStateException("Tried to create context for 
active task but the streamtask was null");
-        }
+    @Override
+    public void transitionTaskType(final TaskType newType,
+                                   final ThreadCache cache) {
+        this.cache = cache;
+        streamTask = null;
     }
 
-    ProcessorContextImpl(final TaskId id,
-                         final StreamsConfig config,
-                         final ProcessorStateManager stateMgr,
-                         final StreamsMetricsImpl metrics) {
-        this(
-            id,
-            null,
-            config,
-            null,
-            stateMgr,
-            metrics,
-            new ThreadCache(
-                new LogContext(String.format("stream-thread [%s] ", 
Thread.currentThread().getName())),
-                0,
-                metrics
-            )
-        );
+    @Override
+    public void registerNewTask(final Task task) {

Review comment:
       Sure, but there's also a non-deprecated `schedule` method. Won't that 
still need the reference to `streamTask` or do we somehow plan to remove that 
as well?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -189,38 +190,52 @@ private void prepareClose(final boolean clean) {
     @Override
     public void closeClean(final Map<TopicPartition, Long> checkpoint) {
         Objects.requireNonNull(checkpoint);
-        close(true);
+        close(true, false);
 
         log.info("Closed clean");
     }
 
     @Override
     public void closeDirty() {
-        close(false);
+        close(false, false);
 
         log.info("Closed dirty");
     }
 
-    private void close(final boolean clean) {
+    @Override
+    public void closeAndRecycleState() {
+        prepareClose(true);
+        close(true, true);

Review comment:
       I was debating this a lot actually. Ultimately I decided to reuse the 
existing `close` methods because I was worried we might change or fix something 
in `close` and forget to add it to `closeAndRecycle`.  Since `closeAndRecycle` 
should always be a subset of the actions taken in `close` I thought this would 
be more future proof...WDYT?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -112,6 +103,7 @@ static void closeStateManager(final Logger log,
         final AtomicReference<ProcessorStateException> firstException = new 
AtomicReference<>(null);
         try {
             if (stateDirectory.lock(id)) {
+

Review comment:
       nope 🙂 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -272,6 +274,36 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
             }
         }
 
+        if (taskCloseExceptions.isEmpty()) {
+            Task oldTask = null;
+            final Iterator<Task> transitioningTasksIter = 
tasksToRecycle.iterator();
+            try {
+                while (transitioningTasksIter.hasNext()) {
+                    oldTask = transitioningTasksIter.next();
+                    final Task newTask;
+                    if (oldTask.isActive()) {
+                        final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(oldTask.id());
+                        newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
+                    } else {
+                        final Set<TopicPartition> partitions = 
activeTasksToCreate.remove(oldTask.id());
+                        newTask = 
activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, 
partitions, mainConsumer);
+                    }
+                    tasks.remove(oldTask.id());
+                    addNewTask(newTask);
+                    transitioningTasksIter.remove();
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format("Failed to recycle 
task %s cleanly. Attempting to close remaining tasks before re-throwing:", 
oldTask.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(oldTask.id(), e);
+
+                dirtyTasks.addAll(tasksToRecycle); // contains the tasks we 
have not yet tried to transition
+                dirtyTasks.addAll(tasks.values());         // contains the new 
tasks we just created

Review comment:
       Hm. The current logic is to only close dirty those tasks which were in a 
failed action (eg close, or in the case of a failed commit we will close all 
currently assigned active tasks), so I guess we never end up closing standbys, 
which makes sense. 
   
   So maybe we shouldn't close any successfully recycled tasks at all, and just 
let them be closed during `handleLostAll` if necessary? 
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
##########
@@ -46,7 +46,6 @@
     private final SessionKeySchema keySchema;
     private final SegmentedCacheFunction cacheFunction;
     private String cacheName;
-    private ThreadCache cache;

Review comment:
       Well, the underlying `ThreadCache` changes when we transition task type 
(as standbys have a dummy size-zero cache). So we need to go through the 
`context` to make sure we call the right cache




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to