Le 11/12/2013 10:34, Vincent Rischmann a écrit :
Hello,
I am writing a simple program in Java using the Kafka 0.8.0 jar
compiled with Scala 2.10.
I have designed my program with a singleton class which holds a map
of (consumer group, ConsumerConnector) and a map of (topic, Producer).
This singleton class provides two methods, send(topic, object) and
receive(topic, consumerGroup, Class<?> klass).
The receive() method retrieves a ConsumerConnector from the map and
then calls createMessageStreams() to get a new ConsumerIterator.
The program launches a producer thread and a consumer thread, the
producer produces 100 messages, the consumer consumes 100 messages and
then stops.
The program keeps running, and reruns the producer and consumer
threads some time after (between 5 and 10 minutes).
The first run works great, but my problem happens when it runs the
second time: the producer works fine again, but the consumer DOES NOT
consume anything, and I see the following in my logs;
09:19:11,689 INFO ~
[simple_pojo_consumer_group_fenrir-1386749817005-7c45b826], begin
registering consumer
simple_pojo_consumer_group_fenrir-1386749817005-7c45b826 in ZK
09:19:11,709 INFO ~ conflict in
/consumers/simple_pojo_consumer_group/ids/simple_pojo_consumer_group_fenrir-1386749817005-7c45b826
data: { "pattern":"static", "subscription":{ "simple_pojo": 1 },
"timestamp":"1386749951689", "version":1 } stored data: {
"pattern":"static", "subscription":{ "simple_pojo": 1 },
"timestamp":"1386749836099", "version":1 }
09:19:11,712 INFO ~ I wrote this conflicted ephemeral node [{
"pattern":"static", "subscription":{ "simple_pojo": 1 },
"timestamp":"1386749951689", "version":1 }] at
/consumers/simple_pojo_consumer_group/ids/simple_pojo_consumer_group_fenrir-1386749817005-7c45b826
a while back in a different session, hence I will backoff for this
node to be deleted by Zookeeper and retry
This happens every 5 second.
The Zookeeper server logs contains the following, each time a conflict
is logged in the application:
[2013-12-11 09:19:11,622] INFO Got user-level KeeperException when
processing sessionid:0x142e0b5d3d80001 type:create cxid:0x1b
zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error
Path:/consumers/simple_pojo_consumer_group/ids/simple_pojo_consumer_group_fenrir-1386749817005-7c45b826
Error:KeeperErrorCode = NodeExists for
/consumers/simple_pojo_consumer_group/ids/simple_pojo_consumer_group_fenrir-1386749817005-7c45b826
(org.apache.zookeeper.server.PrepRequestProcessor)
I tried searching for solutions, but did not found anything which
could help me. I'm thinking I'm not using the library correctly, but
can't see how.
Is it not ok to keep ConsumerConnector objects in a map and reuses
them with createMessageStreams() ?
If it is ok, do you have any idea what could be the problem here ?
Thank you,
Vincent.
Hello,
I'm reading the source code right now, and there's something I'm not
clear with:
In kafka.consumer.javaapi.ZookeeperConsumerConnector.scala, the
createMessageStreams() directly calls underlying.consume() (line 80)
In kafka.consumer.ZookeeperConsumerConnector.scala, the
createMessageStreams() throws an exception if it has been called more
than once (line 133).
I'm guessing the javaapi should do the same, throwing an exception if it
is called more than once ?
If it is the case, then there is a bug in the method
createMessageStreals() of javaapi.ZookeeperConsumerConnector.scala
Also, if it is the case, do I need to pre-create all streams ? How
costly is it to create a ConsumerConnector, does it reopen a connection
to the ZK server or the Kafka broker ?
Thanks.