See ZookeeperConsumerConnectorTest. Thanks,
Jun On Fri, Dec 12, 2014 at 3:59 PM, Shannon Lloyd <shanl...@gmail.com> wrote: > Hi Jun, > > Thanks for that - can you point me to one of your tests that does this? > i.e. a test which shuts down the consumer and then re-initialises it (or > rather creates a new consumer with the same config) and then uses it? I'd > be keen to see what settings (properties) you're using. Maybe I've just got > something misconfigured. > > Cheers, > Shannon > > On 13 December 2014 at 02:29, Jun Rao <j...@confluent.io> wrote: > > > Hmm, that's what we do in our unit tests and it seems that we can cleanly > > shutdown the consumer. > > > > Thanks, > > > > Jun > > > > On Mon, Dec 8, 2014 at 9:17 PM, Shannon Lloyd <shanl...@gmail.com> > wrote: > > > > > Not explicitly. Some additional background might help. I'm running an > > > integration test using an embedded Kafka cluster and ZK quorum (ie all > in > > > process). In my @Before method I fire up the cluster. In my @After > > method I > > > shut the cluster down. When I call shutdown on the connector during my > > > test, the test is actually erroring out and shutting down the cluster. > > BUT > > > even with a try catch(Throwable) around the shutdown call, nothing gets > > > caught, so something outside this call thread is throwing an error and > > > killing the test. > > > On 9 Dec 2014 14:45, "Jun Rao" <j...@confluent.io> wrote: > > > > > > > kafka.network.Processor is on the broker. Are you killing the brokers > > as > > > > well? > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Thu, Dec 4, 2014 at 5:33 PM, Shannon Lloyd <shanl...@gmail.com> > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > I am using the high-level consumer on 0.8.2-beta. I'm attempting to > > > > close a > > > > > ConsumerConnector (actually a handful of connectors), but am not > > having > > > > > much luck actually getting it to close cleanly. When I call > shutdown > > on > > > > the > > > > > connector, I see an error in my application's log (these are always > > > > > IOExceptions in kafka.network.Processor - either Broken Pipe in > > > > > FileDispatcherImpl.write0 or else Connection reset by peer in > > > > > FileDispatcherImpl.read0), but the shutdown call itself does not > > return > > > > > until the socket.timeout.ms has expired (I've tested this by > setting > > > > this > > > > > to all sorts of different values and confirmed that shutdown() > always > > > > > returns after this timeout, but never before). > > > > > > > > > > I don't know if it matters, but my code that works with the > connector > > > is > > > > > running on a separate thread via an ExecutorService (essentially > I'm > > > > > consuming with one thread per group/topic combination (yes, one > > thread > > > > for > > > > > all partitions within the topic)). > > > > > > > > > > FWIW, everything else seems to work fine - I can connect, set up > the > > > > > KafkaStream, pull down messages etc. It's just the shutting down > that > > > > > doesn't seem to be working. The reason I need this to work cleanly > is > > > > that > > > > > my use case requires me to shut down specific connectors and > > re-create > > > > them > > > > > later, potentially numerous times during the running of my > > > application. I > > > > > could potentially redesign things to keep each connector around > after > > > it > > > > is > > > > > no longer needed, cache it and re-use it later, but this still > > doesn't > > > > > solve the problem of how I eventually shut everything down cleanly. > > > > > > > > > > Thanks, > > > > > Shannon > > > > > > > > > > > > > > >