----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23266/#review47334 -----------------------------------------------------------
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/23266/#comment83089> Is there a race condition here? What if the sender immediately executes and the flag isn't set yet? Maybe the usage needs to be something like long version = metadata.version() metadata.requestUpdate() sender.wakeup() metadata.awaitUpdate(version) The version would just be a long counter that we increment in Metadata.update(). clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java <https://reviews.apache.org/r/23266/#comment83087> This method seems a bit ad hoc. Would it better to just have the caller directly do metadata.fetch().partitionsForTopic != null? or if we want to make it a little more readable, add a .hasTopic method to Cluster and do metadata.fetch().hasTopic(t) clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java <https://reviews.apache.org/r/23266/#comment83086> All the java apis that wait with some timeout take a max wait duration not an end millisecond timestamp. Would that be better? clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java <https://reviews.apache.org/r/23266/#comment83088> Not sure about this logic. Imagine that the last refresh was an hour ago and the original max wait time was 30 secs. How long does this wait? - Jay Kreps On July 3, 2014, 11:42 p.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/23266/ > ----------------------------------------------------------- > > (Updated July 3, 2014, 11:42 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1515 > https://issues.apache.org/jira/browse/KAFKA-1515 > > > Repository: kafka > > > Description > ------- > > 1. Move the waiting logic out of Metadata into KafkaProducer (KafkaConsumer > would not wait on fetch metadata, so will not share this code); and wake-up > sender upon waiting metadata 2. Set the refresh timestamp to now instead of 0 > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java > d21f9225539b070f9b50b7a76601d80b83daf7ef > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > d85ca30001dc3d6122a890c34092551654315458 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java > 8890aa2e3ce5fffc159b3c8528138226e8c8cfd3 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > 37b9d1a462d42b811fffd2af3c418e3a9179f00f > clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java > 0d7d04ca5d71d4db22da54ed2153f7c0e10cdf78 > > Diff: https://reviews.apache.org/r/23266/diff/ > > > Testing > ------- > > > Thanks, > > Guozhang Wang > >