See ZookeeperConsumerConnectorTest.

Thanks,

Jun

On Fri, Dec 12, 2014 at 3:59 PM, Shannon Lloyd <shanl...@gmail.com> wrote:

> Hi Jun,
>
> Thanks for that - can you point me to one of your tests that does this?
> i.e. a test which shuts down the consumer and then re-initialises it (or
> rather creates a new consumer with the same config) and then uses it? I'd
> be keen to see what settings (properties) you're using. Maybe I've just got
> something misconfigured.
>
> Cheers,
> Shannon
>
> On 13 December 2014 at 02:29, Jun Rao <j...@confluent.io> wrote:
>
> > Hmm, that's what we do in our unit tests and it seems that we can cleanly
> > shutdown the consumer.
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Dec 8, 2014 at 9:17 PM, Shannon Lloyd <shanl...@gmail.com>
> wrote:
> >
> > > Not explicitly. Some additional background might help. I'm running an
> > > integration test using an embedded Kafka cluster and ZK quorum (ie all
> in
> > > process). In my @Before method I fire up the cluster. In my @After
> > method I
> > > shut the cluster down. When I call shutdown on the connector during my
> > > test, the test is actually erroring out and shutting down the cluster.
> > BUT
> > > even with a try catch(Throwable) around the shutdown call, nothing gets
> > > caught, so something outside this call thread is throwing an error and
> > > killing the test.
> > > On 9 Dec 2014 14:45, "Jun Rao" <j...@confluent.io> wrote:
> > >
> > > > kafka.network.Processor is on the broker. Are you killing the brokers
> > as
> > > > well?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, Dec 4, 2014 at 5:33 PM, Shannon Lloyd <shanl...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I am using the high-level consumer on 0.8.2-beta. I'm attempting to
> > > > close a
> > > > > ConsumerConnector (actually a handful of connectors), but am not
> > having
> > > > > much luck actually getting it to close cleanly. When I call
> shutdown
> > on
> > > > the
> > > > > connector, I see an error in my application's log (these are always
> > > > > IOExceptions in kafka.network.Processor - either Broken Pipe in
> > > > > FileDispatcherImpl.write0 or else Connection reset by peer in
> > > > > FileDispatcherImpl.read0), but the shutdown call itself does not
> > return
> > > > > until the socket.timeout.ms has expired (I've tested this by
> setting
> > > > this
> > > > > to all sorts of different values and confirmed that shutdown()
> always
> > > > > returns after this timeout, but never before).
> > > > >
> > > > > I don't know if it matters, but my code that works with the
> connector
> > > is
> > > > > running on a separate thread via an ExecutorService (essentially
> I'm
> > > > > consuming with one thread per group/topic combination (yes, one
> > thread
> > > > for
> > > > > all partitions within the topic)).
> > > > >
> > > > > FWIW, everything else seems to work fine - I can connect, set up
> the
> > > > > KafkaStream, pull down messages etc. It's just the shutting down
> that
> > > > > doesn't seem to be working. The reason I need this to work cleanly
> is
> > > > that
> > > > > my use case requires me to shut down specific connectors and
> > re-create
> > > > them
> > > > > later, potentially numerous times during the running of my
> > > application. I
> > > > > could potentially redesign things to keep each connector around
> after
> > > it
> > > > is
> > > > > no longer needed, cache it and re-use it later, but this still
> > doesn't
> > > > > solve the problem of how I eventually shut everything down cleanly.
> > > > >
> > > > > Thanks,
> > > > > Shannon
> > > > >
> > > >
> > >
> >
>

Reply via email to