[ 
https://issues.apache.org/jira/browse/KAFKA-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14517553#comment-14517553
 ] 

Tim Brooks commented on KAFKA-2129:
-----------------------------------

It kind of looks like this is a larger issue than maybe I realized when I first 
opened this ticket. Let's say that thread 1 is calling poll() on the 
KafkaClient. And then thread 2 calls partionsFor() a topic that is not locally 
known on the KafkaClient.

Both threads will make it into the poll() method on the Selector since 
partitionsFor() is not synchronized. If Thread 1 is in the middle of a poll(), 
tons of intermediate state will be lost when Thread 2 calls the clear() method 
on the selector:

this.completedSends.clear();
this.completedReceives.clear();
this.connected.clear();
this.disconnected.clear();
this.disconnected.addAll(this.failedSends);
this.failedSends.clear();

I can generate a number of failing integration tests by adding:

scala.concurrent.future {
      Thread.sleep(30)
      consumer.partitionsFor("weird-topic")
    }

in consumeRecords() in the ConsumerTest right before the call is made to poll().

If I add the synchronize keyword to the partionsFor() method these errors go 
away. Is this the correct approach to this ticket? Obviously those errors are 
an issue since the KafkaConsumer documentation indicates that the class is 
threadsafe.

But adding synchronize to the method means that calling partitionsFor() will be 
blocked on a poll() that is in progress. And hopefully, the majority of the 
time partitionsFor() will not require a network call.

Anyway, I added a patch to synchronize that method. But if the we are 
interested in a non synchronized method to get locally-known partitions for 
that topic, we will need a different change.

> Consumer could make multiple concurrent metadata requests
> ---------------------------------------------------------
>
>                 Key: KAFKA-2129
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2129
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>            Reporter: Tim Brooks
>            Priority: Minor
>
> The NetworkClient's metadataFetchInProgress is neither volatile nor atomic. 
> This protects against multiple metadata requests being made and is read on 
> poll() on the NetworkClient. It is written to when a request is initiated.
> This is fine for the producer. Which seems to have one thread writing. The 
> KafkaConsumer's poll()  method is synchronized, so there will not be more 
> than one writer entering from there. However, the NetworkClient's poll() 
> method is also accessed on the Consumer's partitionsFor() method. Which could 
> be access by a separate thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to