vvcephei commented on a change in pull request #8248: URL: https://github.com/apache/kafka/pull/8248#discussion_r432109207
########## File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ########## @@ -1247,4 +1270,33 @@ public void waitForNextStableAssignment(final long maxWaitMs) throws Interrupted ); } } + + public static class TrackingStateRestoreListener implements StateRestoreListener { Review comment: This isn't threadsafe, but it looks like we're using it from multiple threads during the tests. ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java ########## @@ -79,25 +88,27 @@ private static final String APPID = "restore-test"; - @ClassRule - public static final EmbeddedKafkaCluster CLUSTER = - new EmbeddedKafkaCluster(NUM_BROKERS); + public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS); Review comment: Why do we need to remove the Rule annotation? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java ########## @@ -1468,9 +1456,6 @@ public void shouldCloseActiveTasksAndIgnoreExceptionsOnUncleanShutdown() { resetToStrict(changeLogReader); expect(changeLogReader.completedChangelogs()).andReturn(emptySet()); - // make sure we also remove the changelog partitions from the changelog reader - changeLogReader.remove(eq(singletonList(changelog))); - expectLastCall(); Review comment: I feel like I missed something here. We don't expect the changelog to get unregistered during close anymore? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -183,12 +180,15 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, "\tExisting standby tasks: {}", activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds()); - final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new TreeMap<>(activeTasks); - final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new TreeMap<>(standbyTasks); Review comment: I don't think there was a reason besides determinism for debugging, etc. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java ########## @@ -263,27 +259,28 @@ private void logValue(final Bytes key, final BufferKey bufferKey, final BufferVa final ByteBuffer buffer = value.serialize(sizeOfBufferTime); buffer.putLong(bufferKey.time()); - collector.send( - changelogTopic, - key, - buffer.array(), - V_2_CHANGELOG_HEADERS, - partition, - null, - KEY_SERIALIZER, - VALUE_SERIALIZER + ((RecordCollector.Supplier) context).recordCollector().send( Review comment: One high-level concern I've developed during this review is whether there's any possibility that something like this could leak into the public API. I.e., is it possible that a Processor, Transformer, or StateStore could have cached some reference from the context that would become invalid when the context gets recycled, similar to the way this recordCollector (which is not public, I know) did. What's your take on that, @ableegoldman ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org