[ 
https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846912#comment-17846912
 ] 

Chia-Ping Tsai commented on KAFKA-16774:
----------------------------------------

[~mjsax] nice question. It seems to me that is a flaky. IIRC, 
`onPartitionsAssigned` should be executed by thread same to the one calling 
consumer#poll` [0]. However, in that test case, `onPartitionsAssigned` is 
called by junit tread  [1] and so it causes race condition.

We can fix it by returning a copy of `pendingTasksToInit` [2]. That is similar 
to `allTasksPerId` [3]. WDYT?

[0] 
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L147

[1] 
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L1408

 [2] 
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L112

[3] 
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L302

> fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-16774
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16774
>             Project: Kafka
>          Issue Type: Test
>          Components: streams, unit tests
>            Reporter: Chia-Ping Tsai
>            Priority: Minor
>              Labels: flaky-test
>
> java.util.ConcurrentModificationException
>  at 
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to