I'm doing some testing to reconcile the results of mirror maker replication 
between two Kafka clusters across an unreliable (Internet) link using Clojure. 
In this case, we run our production tests, wait for MM replication to finish, 
then drain the topics on both sides of the network and compare the messages. 
This differs from the standard Kafka use case (I think) by being a task which 
has a stopping point. 

I'm able to drain all of the messages from a particular topic on one cluster, 
then after my timeout period, I get a timeout exception. So far so good. After 
adding more data to a topic I tried to kick the job off again but just got a 
Timeout Exception the second time I tried to run the consumer.  Looking at the 
logs it showed the connection was closed by the Broker with INFO logging level, 
i.e. No errors on broker side. 

Looking at the running threads I can see that there are a half dozen threads 
active that look like Kafka Consumers and connections to ZooKeeper. I would 
have thought that they would shut down after the Timeout but they haven't. What 
is the correct way of handling Kafka consumer threads and shutting them down 
when you are finished with them? In a long running Kafka job, how would I 
restart a consumer process after a timeout? Is a try/catch the idiomatic way to 
handle this?

Thanks,

---
Daniel

Reply via email to