Hi Jun, Thanks for that - can you point me to one of your tests that does this? i.e. a test which shuts down the consumer and then re-initialises it (or rather creates a new consumer with the same config) and then uses it? I'd be keen to see what settings (properties) you're using. Maybe I've just got something misconfigured.
Cheers, Shannon On 13 December 2014 at 02:29, Jun Rao <j...@confluent.io> wrote: > Hmm, that's what we do in our unit tests and it seems that we can cleanly > shutdown the consumer. > > Thanks, > > Jun > > On Mon, Dec 8, 2014 at 9:17 PM, Shannon Lloyd <shanl...@gmail.com> wrote: > > > Not explicitly. Some additional background might help. I'm running an > > integration test using an embedded Kafka cluster and ZK quorum (ie all in > > process). In my @Before method I fire up the cluster. In my @After > method I > > shut the cluster down. When I call shutdown on the connector during my > > test, the test is actually erroring out and shutting down the cluster. > BUT > > even with a try catch(Throwable) around the shutdown call, nothing gets > > caught, so something outside this call thread is throwing an error and > > killing the test. > > On 9 Dec 2014 14:45, "Jun Rao" <j...@confluent.io> wrote: > > > > > kafka.network.Processor is on the broker. Are you killing the brokers > as > > > well? > > > > > > Thanks, > > > > > > Jun > > > > > > On Thu, Dec 4, 2014 at 5:33 PM, Shannon Lloyd <shanl...@gmail.com> > > wrote: > > > > > > > Hi, > > > > > > > > I am using the high-level consumer on 0.8.2-beta. I'm attempting to > > > close a > > > > ConsumerConnector (actually a handful of connectors), but am not > having > > > > much luck actually getting it to close cleanly. When I call shutdown > on > > > the > > > > connector, I see an error in my application's log (these are always > > > > IOExceptions in kafka.network.Processor - either Broken Pipe in > > > > FileDispatcherImpl.write0 or else Connection reset by peer in > > > > FileDispatcherImpl.read0), but the shutdown call itself does not > return > > > > until the socket.timeout.ms has expired (I've tested this by setting > > > this > > > > to all sorts of different values and confirmed that shutdown() always > > > > returns after this timeout, but never before). > > > > > > > > I don't know if it matters, but my code that works with the connector > > is > > > > running on a separate thread via an ExecutorService (essentially I'm > > > > consuming with one thread per group/topic combination (yes, one > thread > > > for > > > > all partitions within the topic)). > > > > > > > > FWIW, everything else seems to work fine - I can connect, set up the > > > > KafkaStream, pull down messages etc. It's just the shutting down that > > > > doesn't seem to be working. The reason I need this to work cleanly is > > > that > > > > my use case requires me to shut down specific connectors and > re-create > > > them > > > > later, potentially numerous times during the running of my > > application. I > > > > could potentially redesign things to keep each connector around after > > it > > > is > > > > no longer needed, cache it and re-use it later, but this still > doesn't > > > > solve the problem of how I eventually shut everything down cleanly. > > > > > > > > Thanks, > > > > Shannon > > > > > > > > > >