> On July 9, 2015, 7:19 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 71 > > <https://reviews.apache.org/r/34965/diff/2/?file=977751#file977751line71> > > > > As you probably noticed synchronization in the > > AbstractFetcherManager/Thread classes are unfortunately pretty nightmarish. > > Since the simple consumer is force-closed without the SimpleConsumer’s lock > > consider the following sequence: > > - You call forceClose > > - In the mean time (before isClosed is set to true) an ongoing call to > > sendRequest recreates the connection > > - The fetcher thread will subsequently exit (since the > > ShutdownableThread’s isRunning flag is false) > > - So even though the SimpleConsumer is _closed_ at that point, the > > connection will remain > > > > Can you verify or is it a non-issue?
Thanks for the catch! Yes this is an issue. After looking through the code carefully I think we have to keep the simpleConsumer.close() to avoid this problem. I have also changed the function name and added comments to document the use of the new API. > On July 9, 2015, 7:19 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 76 > > <https://reviews.apache.org/r/34965/diff/2/?file=977751#file977751line76> > > > > You could get around the above by retaining this call to > > simpleConsumer.close (although it would be mostly redundant). However this > > is still not ideal, since it is a caveat that the user of the (public) > > forceClose API needs to be aware of. I agree. I have updated the code and comments to hopefully avoid any confusion to user. - Dong ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34965/#review91159 ----------------------------------------------------------- On July 9, 2015, 10:35 p.m., Dong Lin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34965/ > ----------------------------------------------------------- > > (Updated July 9, 2015, 10:35 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2241 > https://issues.apache.org/jira/browse/KAFKA-2241 > > > Repository: kafka > > > Description > ------- > > KAFKA-2241; AbstractFetcherThread.shutdown() should not block on > ReadableByteChannel.read(buffer) > > > Diffs > ----- > > core/src/main/scala/kafka/consumer/SimpleConsumer.scala > c16f7edd322709060e54c77eb505c44cbd77a4ec > core/src/main/scala/kafka/server/AbstractFetcherThread.scala > 83fc47417dd7fe4edf030217fa7fd69d99b170b0 > > Diff: https://reviews.apache.org/r/34965/diff/ > > > Testing > ------- > > > Thanks, > > Dong Lin > >