Hi, I am using the high-level consumer on 0.8.2-beta. I'm attempting to close a ConsumerConnector (actually a handful of connectors), but am not having much luck actually getting it to close cleanly. When I call shutdown on the connector, I see an error in my application's log (these are always IOExceptions in kafka.network.Processor - either Broken Pipe in FileDispatcherImpl.write0 or else Connection reset by peer in FileDispatcherImpl.read0), but the shutdown call itself does not return until the socket.timeout.ms has expired (I've tested this by setting this to all sorts of different values and confirmed that shutdown() always returns after this timeout, but never before).
I don't know if it matters, but my code that works with the connector is running on a separate thread via an ExecutorService (essentially I'm consuming with one thread per group/topic combination (yes, one thread for all partitions within the topic)). FWIW, everything else seems to work fine - I can connect, set up the KafkaStream, pull down messages etc. It's just the shutting down that doesn't seem to be working. The reason I need this to work cleanly is that my use case requires me to shut down specific connectors and re-create them later, potentially numerous times during the running of my application. I could potentially redesign things to keep each connector around after it is no longer needed, cache it and re-use it later, but this still doesn't solve the problem of how I eventually shut everything down cleanly. Thanks, Shannon
