> On March 3, 2015, 4:10 a.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, > > line 560 > > <https://reviews.apache.org/r/29467/diff/4/?file=882247#file882247line560> > > > > This seems to call initiateClose() twice, once in initiateClose and > > then again from forceClose. This seems like it depends on all the things > > getting closed being idempotent to repeated calls (e.g. record accumulator > > etc). Would it make more sense to have forceClose() just set the force flag?
The issue with that is someone can just call sender.forceClose and it will never call accumulator.close which is part of initiate close. Also shouldn't the calls be idempotent given this can be called from multiple threds multiple times? > On March 3, 2015, 4:10 a.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, > > line 554 > > <https://reviews.apache.org/r/29467/diff/4/?file=882247#file882247line554> > > > > It's probably worth adding an > > if(timeout > 0) > > on this. Added. On March 3, 2015, 4:10 a.m., Parth Brahmbhatt wrote: > > Two minor changes I noted, but otherwise looks good to me. Needs some unit > > tests, as you mentioned. > > Jay Kreps wrote: > Actually one probably I didn't think of is that forceClose() leaves the > in-flight requests forever incomplete. A better approach would be to fail > them all with TimeoutException. To do this correctly I will need to get the imcomplete and unsent RecordBatches from RecordAccumulator. I can add methods to get these with default scope. The sender will need these to emit correct metrics and failing the bathces. For unit testing I need someway to mock RecordAccumulator as the Seneder's run method where the force close logic lives is a while(true) loop which dependes on the values of record accumulator. RecordAccumulator is a final class right now, is it ok to change that so I can create a MockRecordAccumulator? - Parth ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29467/#review74884 ----------------------------------------------------------- On March 2, 2015, 6:41 p.m., Parth Brahmbhatt wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/29467/ > ----------------------------------------------------------- > > (Updated March 2, 2015, 6:41 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1660 > https://issues.apache.org/jira/browse/KAFKA-1660 > > > Repository: kafka > > > Description > ------- > > Merge remote-tracking branch 'origin/trunk' into KAFKA-1660 > > Conflicts: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > > Merge remote-tracking branch 'origin/trunk' into KAFKA-1660 > > > Changing log levels as suggested. > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 7397e565fd865214529ffccadd4222d835ac8110 > clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java > 6913090af03a455452b0b5c3df78f266126b3854 > clients/src/main/java/org/apache/kafka/clients/producer/Producer.java > 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > ed9c63a6679e3aaf83d19fde19268553a4c107c2 > > Diff: https://reviews.apache.org/r/29467/diff/ > > > Testing > ------- > > existing unit tests passed. > > > Thanks, > > Parth Brahmbhatt > >