[ 
https://issues.apache.org/jira/browse/FLINK-9349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16479391#comment-16479391
 ] 

ASF GitHub Bot commented on FLINK-9349:
---------------------------------------

Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/6040
  
    Thanks for the PR @snuyanzin!
    I had some comments, please let me know what you think.
    
    Also, some general contribution tips:
    1. I would suggest the title of the PR to be something along the lines of 
"[FLINK-9349] [kafka] Fix ConcurrentModificationException when add discovered 
partitions". That directly makes it clear what exactly is being fixed.
    2. The message of the first commit of the PR should also be appropriately 
set to be similar to the title (most of the time if it is a 1-commit PR, the 
title of the PR and the commit message can be identical).


> KafkaConnector Exception  while fetching from multiple kafka topics
> -------------------------------------------------------------------
>
>                 Key: FLINK-9349
>                 URL: https://issues.apache.org/jira/browse/FLINK-9349
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.4.0, 1.5.0
>            Reporter: Vishal Santoshi
>            Assignee: Sergey Nuyanzin
>            Priority: Critical
>         Attachments: Flink9349Test.java
>
>
> ./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
>  
> It seems the List subscribedPartitionStates was being modified when 
> runFetchLoop iterated the List.
> This can happen if, e.g., FlinkKafkaConsumer runs the following code 
> concurrently:
>                 kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
>  
> {code:java}
>  java.util.ConcurrentModificationException
>       at 
> java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966)
>       at java.util.LinkedList$ListItr.next(LinkedList.java:888)
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to