Have you looked at https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped%2Cwhy%3F?
Thanks, Jun On Wed, Dec 11, 2013 at 3:59 AM, shahab <shahab.mok...@gmail.com> wrote: > Hi, > > I have a problem in fetching messages from Kafka. I am using simple > consumer API in Java to fetch messages from kafka ( the same one which is > stated in Kafka introduction example). The problem is that after a while > (could be 30min or couple of hours), the consumer does not receive any > messages from Kafka, while the data exist there (while the streaming of > data to Kafka still running, so Kafka has inputs). > I can see that data exist in Kafka by just running the following command > and getting the list of messages exist in Kafka, Each message is around 80 > bytes : > > *bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test > --from-beginning* > > > Any idea what could be the source of problem?? I also notices that if i > stress the input to kafka (sending 1000 messages per scond) for an hour or > more , the same situation happens again. ?? It seems that something is > wrong with fetching (consumer) part, right? > > best, > /Shahab > > > > The kafka is run in one machine, no clusters, replications,....etc, very > basic configuration. > > The consumer config file is ; > > "zookeeper.connect", myserver:2181); > "group.id", group1); > "zookeeper.session.timeout.ms", "400"); > "zookeeper.sync.time.ms", "200"); > "auto.commit.interval.ms", "1000"); > "fetch.message.max.bytes", "1048576"); > "auto.offset.reset", "smallest"; > > > > > and the server.config looks like this: > > Boker.id=0 > port=9092 > num.network.threads=5 > num.io.threads=2 > socket.send.buffer.bytes=1048576 > > socket.receive.buffer.bytes=1048576 > > socket.request.max.bytes=104857600 > > log.dirs=/tmp/kafka-logs > > num.partitions=2 > > ############################# Log Flush Policy > ############################# > log.flush.interval.messages=1000 > > # The maximum amount of time a message can sit in a log before we force a > flush > log.flush.interval.ms=1000 > > ############################ Log Retention Policy > ############################# > > # The minimum age of a log file to be eligible for deletion > log.retention.hours=1 > > > log.retention.bytes=10485760 > > # The maximum size of a log segment file. When this size is reached a new > log segment will be created. > log.segment.bytes=536870912 > > # The interval at which log segments are checked to see if they can be > deleted according > # to the retention policies > log.cleanup.interval.mins=1 > ookeeper.connect=localhost:2181 > > # Timeout in ms for connecting to zookeeper > zookeeper.connection.timeout.ms=1000000 >