Hello,

I am writing a simple program in Java using the Kafka 0.8.0 jar compiled with Scala 2.10. I have designed my program with a singleton class which holds a map of (consumer group, ConsumerConnector) and a map of (topic, Producer). This singleton class provides two methods, send(topic, object) and receive(topic, consumerGroup, Class<?> klass).

The receive() method retrieves a ConsumerConnector from the map and then calls createMessageStreams() to get a new ConsumerIterator.

The program launches a producer thread and a consumer thread, the producer produces 100 messages, the consumer consumes 100 messages and then stops. The program keeps running, and reruns the producer and consumer threads some time after (between 5 and 10 minutes).

The first run works great, but my problem happens when it runs the second time: the producer works fine again, but the consumer DOES NOT consume anything, and I see the following in my logs;

09:19:11,689 INFO ~ [simple_pojo_consumer_group_fenrir-1386749817005-7c45b826], begin registering consumer simple_pojo_consumer_group_fenrir-1386749817005-7c45b826 in ZK 09:19:11,709 INFO ~ conflict in /consumers/simple_pojo_consumer_group/ids/simple_pojo_consumer_group_fenrir-1386749817005-7c45b826 data: { "pattern":"static", "subscription":{ "simple_pojo": 1 }, "timestamp":"1386749951689", "version":1 } stored data: { "pattern":"static", "subscription":{ "simple_pojo": 1 }, "timestamp":"1386749836099", "version":1 } 09:19:11,712 INFO ~ I wrote this conflicted ephemeral node [{ "pattern":"static", "subscription":{ "simple_pojo": 1 }, "timestamp":"1386749951689", "version":1 }] at /consumers/simple_pojo_consumer_group/ids/simple_pojo_consumer_group_fenrir-1386749817005-7c45b826 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry

This happens every 5 second.
The Zookeeper server logs contains the following, each time a conflict is logged in the application:

[2013-12-11 09:19:11,622] INFO Got user-level KeeperException when processing sessionid:0x142e0b5d3d80001 type:create cxid:0x1b zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/consumers/simple_pojo_consumer_group/ids/simple_pojo_consumer_group_fenrir-1386749817005-7c45b826 Error:KeeperErrorCode = NodeExists for /consumers/simple_pojo_consumer_group/ids/simple_pojo_consumer_group_fenrir-1386749817005-7c45b826 (org.apache.zookeeper.server.PrepRequestProcessor)

I tried searching for solutions, but did not found anything which could help me. I'm thinking I'm not using the library correctly, but can't see how. Is it not ok to keep ConsumerConnector objects in a map and reuses them with createMessageStreams() ?

If it is ok, do you have any idea what could be the problem here ?

Thank you,
Vincent.

Reply via email to