> On Jan. 6, 2015, 6:43 p.m., Parth Brahmbhatt wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, > > line 225 > > <https://reviews.apache.org/r/29379/diff/1/?file=799704#file799704line225> > > > > sender.completeBatch() is only called as part of produce response > > handling or disconnect. Both of which will never be invoked when there is > > no broker. I could add sender as a member of record accumulator or pass it > > as the callback arg as part of the ready() method. All of which is too > > hecky. > > > > Let me know if you see some other alternative.
Agree that those options are hacky. Maybe return the information in ReadyCheckResult? RecordAccumulator.ready() is only called from Sender.run(), which could then handle calling completeBatch() on any expired batches. This also has the benefit of integrating with the existing retry logic, although I'm not sure if we want to treat this as a retriable error or not. - Ewen ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29379/#review66879 ----------------------------------------------------------- On Jan. 6, 2015, 6:44 p.m., Parth Brahmbhatt wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/29379/ > ----------------------------------------------------------- > > (Updated Jan. 6, 2015, 6:44 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1788 > https://issues.apache.org/jira/browse/KAFKA-1788 > > > Repository: kafka > > > Description > ------- > > Merge remote-tracking branch 'origin/trunk' into KAFKA-1788 > > > KAFKA-1788: addressed Ewen's comments. > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > f61efb35db7e0de590556e6a94a7b5cb850cdae9 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > a893d88c2f4e21509b6c70d6817b4b2cdd0fd657 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > c15485d1af304ef53691d478f113f332fe67af77 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java > dd0af8aee98abed5d4a0dc50989e37888bb353fe > > clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java > 2c9932401d573549c40f16fda8c4e3e11309cb85 > clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java > ef2ca65cabe97b909f17b62027a1bb06827e88fe > > Diff: https://reviews.apache.org/r/29379/diff/ > > > Testing > ------- > > Unit test added. > > > Thanks, > > Parth Brahmbhatt > >