[ https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846912#comment-17846912 ]
Chia-Ping Tsai commented on KAFKA-16774: ---------------------------------------- [~mjsax] nice question. It seems to me that is a flaky. IIRC, `onPartitionsAssigned` should be executed by thread same to the one calling consumer#poll` [0]. However, in that test case, `onPartitionsAssigned` is called by junit tread [1] and so it causes race condition. We can fix it by returning a copy of `pendingTasksToInit` [2]. That is similar to `allTasksPerId` [3]. WDYT? [0] https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L147 [1] https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L1408 [2] https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L112 [3] https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L302 > fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled > ------------------------------------------------------------------------- > > Key: KAFKA-16774 > URL: https://issues.apache.org/jira/browse/KAFKA-16774 > Project: Kafka > Issue Type: Test > Components: streams, unit tests > Reporter: Chia-Ping Tsai > Priority: Minor > Labels: flaky-test > > java.util.ConcurrentModificationException > at > java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) > at > org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686) > at > org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79) > at > org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) -- This message was sent by Atlassian Jira (v8.20.10#820010)