----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review85644 -----------------------------------------------------------
I think close() isn't quite right, and is probably harder than just wakeup(). Also, I think there are other cases where NetworkClient.poll() is called in a loop that aren't handled, e.g. NetworkCLient.completeAll. I'm not sure these can be handled without pushing the closed flag into NetworkClient (maybe changing the name to "closing" to allow some operations to continue normally so code using NetworkClient can finish up whatever it was doing). clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java <https://reviews.apache.org/r/34789/#comment137308> Missed removing one of the synchronized keywords. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java <https://reviews.apache.org/r/34789/#comment137313> I think this requires a bit more coordination between the threads. As written, won't this wakeup the selector, but this thread could continue running and close metrics/client/serializers before the other thread is done with them? This gets confusing if we need to support both close() from the poll() thread and from another I guess -- in one case you need to wait for another thread to finish, in the other you can proceed immediately. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java <https://reviews.apache.org/r/34789/#comment137314> Why do these all have ensureNotClosed(), but the KafkaConsumer methods don't all have it? - Ewen Cheslack-Postava On May 28, 2015, 10:58 p.m., Jason Gustafson wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34789/ > ----------------------------------------------------------- > > (Updated May 28, 2015, 10:58 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2168 > https://issues.apache.org/jira/browse/KAFKA-2168 > > > Repository: kafka > > > Description > ------- > > KAFKA-2168; Remove synchronization of KafkaConsumer to enable non-blocking > close > > > Diffs > ----- > > > clients/src/main/java/org/apache/kafka/clients/consumer/ClosedConsumerException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > d301be4709f7b112e1f3a39f3c04cfa65f00fa60 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > b2764df11afa7a99fce46d1ff48960d889032d14 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java > ef9dd5238fbc771496029866ece1d85db6d7b7a5 > > Diff: https://reviews.apache.org/r/34789/diff/ > > > Testing > ------- > > > Thanks, > > Jason Gustafson > >