vvcephei commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r432109207



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -1247,4 +1270,33 @@ public void waitForNextStableAssignment(final long 
maxWaitMs) throws Interrupted
             );
         }
     }
+
+    public static class TrackingStateRestoreListener implements 
StateRestoreListener {

Review comment:
       This isn't threadsafe, but it looks like we're using it from multiple 
threads during the tests.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
##########
@@ -79,25 +88,27 @@
 
     private static final String APPID = "restore-test";
 
-    @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER =
-            new EmbeddedKafkaCluster(NUM_BROKERS);
+    public final EmbeddedKafkaCluster cluster = new 
EmbeddedKafkaCluster(NUM_BROKERS);

Review comment:
       Why do we need to remove the Rule annotation?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##########
@@ -1468,9 +1456,6 @@ public void 
shouldCloseActiveTasksAndIgnoreExceptionsOnUncleanShutdown() {
 
         resetToStrict(changeLogReader);
         expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
-        // make sure we also remove the changelog partitions from the 
changelog reader
-        changeLogReader.remove(eq(singletonList(changelog)));
-        expectLastCall();

Review comment:
       I feel like I missed something here. We don't expect the changelog to 
get unregistered during close anymore?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -183,12 +180,15 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), 
standbyTaskIds());
 
-        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new 
TreeMap<>(activeTasks);
-        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new 
TreeMap<>(standbyTasks);

Review comment:
       I don't think there was a reason besides determinism for debugging, etc.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -263,27 +259,28 @@ private void logValue(final Bytes key, final BufferKey 
bufferKey, final BufferVa
         final ByteBuffer buffer = value.serialize(sizeOfBufferTime);
         buffer.putLong(bufferKey.time());
 
-        collector.send(
-            changelogTopic,
-            key,
-            buffer.array(),
-            V_2_CHANGELOG_HEADERS,
-            partition,
-            null,
-            KEY_SERIALIZER,
-            VALUE_SERIALIZER
+        ((RecordCollector.Supplier) context).recordCollector().send(

Review comment:
       One high-level concern I've developed during this review is whether 
there's any possibility that something like this could leak into the public 
API. I.e., is it possible that a Processor, Transformer, or StateStore could 
have cached some reference from the context that would become invalid when the 
context gets recycled, similar to the way this recordCollector (which is not 
public, I know) did.
   
   What's your take on that, @ableegoldman ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to