[ https://issues.apache.org/jira/browse/FLINK-9349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16486692#comment-16486692 ]
ASF GitHub Bot commented on FLINK-9349: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190110928 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java --- @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); } + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class); --- End diff -- It is unnecessary to use a power mock here. A dummy implementation of a `SourceContext` will be better. > KafkaConnector Exception while fetching from multiple kafka topics > ------------------------------------------------------------------- > > Key: FLINK-9349 > URL: https://issues.apache.org/jira/browse/FLINK-9349 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.4.0, 1.5.0 > Reporter: Vishal Santoshi > Assignee: Sergey Nuyanzin > Priority: Critical > Attachments: Flink9349Test.java > > > ./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java > > It seems the List subscribedPartitionStates was being modified when > runFetchLoop iterated the List. > This can happen if, e.g., FlinkKafkaConsumer runs the following code > concurrently: > kafkaFetcher.addDiscoveredPartitions(discoveredPartitions); > > {code:java} > java.util.ConcurrentModificationException > at > java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966) > at java.util.LinkedList$ListItr.next(LinkedList.java:888) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)