[ https://issues.apache.org/jira/browse/FLINK-9349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16486697#comment-16486697 ]
ASF GitHub Bot commented on FLINK-9349: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190111529 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java --- @@ -416,9 +520,16 @@ protected TestFetcher( false); } + /** + * Emulation of partition's iteration which is required for + * {@link AbstractFetcherTest#testConcurrentPartitionsDiscoveryAndLoopFetching}. + * @throws Exception + */ @Override public void runFetchLoop() throws Exception { - throw new UnsupportedOperationException(); + for (KafkaTopicPartitionState ignored: subscribedPartitionStates()) { + Thread.sleep(10L); --- End diff -- This would only let the test fail "occasionally", right? I would like this to be changed, so that we always have the test failing without the copy on write fix. We could do this by having a dummy source context that blocks on record emit. > KafkaConnector Exception while fetching from multiple kafka topics > ------------------------------------------------------------------- > > Key: FLINK-9349 > URL: https://issues.apache.org/jira/browse/FLINK-9349 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.4.0, 1.5.0 > Reporter: Vishal Santoshi > Assignee: Sergey Nuyanzin > Priority: Critical > Attachments: Flink9349Test.java > > > ./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java > > It seems the List subscribedPartitionStates was being modified when > runFetchLoop iterated the List. > This can happen if, e.g., FlinkKafkaConsumer runs the following code > concurrently: > kafkaFetcher.addDiscoveredPartitions(discoveredPartitions); > > {code:java} > java.util.ConcurrentModificationException > at > java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966) > at java.util.LinkedList$ListItr.next(LinkedList.java:888) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)