I'm doing some testing to reconcile the results of mirror maker replication between two Kafka clusters across an unreliable (Internet) link using Clojure. In this case, we run our production tests, wait for MM replication to finish, then drain the topics on both sides of the network and compare the messages. This differs from the standard Kafka use case (I think) by being a task which has a stopping point.
I'm able to drain all of the messages from a particular topic on one cluster, then after my timeout period, I get a timeout exception. So far so good. After adding more data to a topic I tried to kick the job off again but just got a Timeout Exception the second time I tried to run the consumer. Looking at the logs it showed the connection was closed by the Broker with INFO logging level, i.e. No errors on broker side. Looking at the running threads I can see that there are a half dozen threads active that look like Kafka Consumers and connections to ZooKeeper. I would have thought that they would shut down after the Timeout but they haven't. What is the correct way of handling Kafka consumer threads and shutting them down when you are finished with them? In a long running Kafka job, how would I restart a consumer process after a timeout? Is a try/catch the idiomatic way to handle this? Thanks, --- Daniel