[ https://issues.apache.org/jira/browse/KAFKA-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14517553#comment-14517553 ]
Tim Brooks commented on KAFKA-2129: ----------------------------------- It kind of looks like this is a larger issue than maybe I realized when I first opened this ticket. Let's say that thread 1 is calling poll() on the KafkaClient. And then thread 2 calls partionsFor() a topic that is not locally known on the KafkaClient. Both threads will make it into the poll() method on the Selector since partitionsFor() is not synchronized. If Thread 1 is in the middle of a poll(), tons of intermediate state will be lost when Thread 2 calls the clear() method on the selector: this.completedSends.clear(); this.completedReceives.clear(); this.connected.clear(); this.disconnected.clear(); this.disconnected.addAll(this.failedSends); this.failedSends.clear(); I can generate a number of failing integration tests by adding: scala.concurrent.future { Thread.sleep(30) consumer.partitionsFor("weird-topic") } in consumeRecords() in the ConsumerTest right before the call is made to poll(). If I add the synchronize keyword to the partionsFor() method these errors go away. Is this the correct approach to this ticket? Obviously those errors are an issue since the KafkaConsumer documentation indicates that the class is threadsafe. But adding synchronize to the method means that calling partitionsFor() will be blocked on a poll() that is in progress. And hopefully, the majority of the time partitionsFor() will not require a network call. Anyway, I added a patch to synchronize that method. But if the we are interested in a non synchronized method to get locally-known partitions for that topic, we will need a different change. > Consumer could make multiple concurrent metadata requests > --------------------------------------------------------- > > Key: KAFKA-2129 > URL: https://issues.apache.org/jira/browse/KAFKA-2129 > Project: Kafka > Issue Type: Bug > Components: clients > Reporter: Tim Brooks > Priority: Minor > > The NetworkClient's metadataFetchInProgress is neither volatile nor atomic. > This protects against multiple metadata requests being made and is read on > poll() on the NetworkClient. It is written to when a request is initiated. > This is fine for the producer. Which seems to have one thread writing. The > KafkaConsumer's poll() method is synchronized, so there will not be more > than one writer entering from there. However, the NetworkClient's poll() > method is also accessed on the Consumer's partitionsFor() method. Which could > be access by a separate thread. -- This message was sent by Atlassian JIRA (v6.3.4#6332)