Thanks a lot, very good hints. I am trying to see what happened in my case.
best, /Shahab On Wed, Dec 11, 2013 at 5:16 PM, Jun Rao <jun...@gmail.com> wrote: > Have you looked at > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped%2Cwhy%3F > ? > > Thanks, > > Jun > > > On Wed, Dec 11, 2013 at 3:59 AM, shahab <shahab.mok...@gmail.com> wrote: > > > Hi, > > > > I have a problem in fetching messages from Kafka. I am using simple > > consumer API in Java to fetch messages from kafka ( the same one which is > > stated in Kafka introduction example). The problem is that after a while > > (could be 30min or couple of hours), the consumer does not receive any > > messages from Kafka, while the data exist there (while the streaming of > > data to Kafka still running, so Kafka has inputs). > > I can see that data exist in Kafka by just running the following command > > and getting the list of messages exist in Kafka, Each message is around > 80 > > bytes : > > > > *bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test > > --from-beginning* > > > > > > Any idea what could be the source of problem?? I also notices that if i > > stress the input to kafka (sending 1000 messages per scond) for an hour > or > > more , the same situation happens again. ?? It seems that something is > > wrong with fetching (consumer) part, right? > > > > best, > > /Shahab > > > > > > > > The kafka is run in one machine, no clusters, replications,....etc, very > > basic configuration. > > > > The consumer config file is ; > > > > "zookeeper.connect", myserver:2181); > > "group.id", group1); > > "zookeeper.session.timeout.ms", "400"); > > "zookeeper.sync.time.ms", "200"); > > "auto.commit.interval.ms", "1000"); > > "fetch.message.max.bytes", "1048576"); > > "auto.offset.reset", "smallest"; > > > > > > > > > > and the server.config looks like this: > > > > Boker.id=0 > > port=9092 > > num.network.threads=5 > > num.io.threads=2 > > socket.send.buffer.bytes=1048576 > > > > socket.receive.buffer.bytes=1048576 > > > > socket.request.max.bytes=104857600 > > > > log.dirs=/tmp/kafka-logs > > > > num.partitions=2 > > > > ############################# Log Flush Policy > > ############################# > > log.flush.interval.messages=1000 > > > > # The maximum amount of time a message can sit in a log before we force a > > flush > > log.flush.interval.ms=1000 > > > > ############################ Log Retention Policy > > ############################# > > > > # The minimum age of a log file to be eligible for deletion > > log.retention.hours=1 > > > > > > log.retention.bytes=10485760 > > > > # The maximum size of a log segment file. When this size is reached a new > > log segment will be created. > > log.segment.bytes=536870912 > > > > # The interval at which log segments are checked to see if they can be > > deleted according > > # to the retention policies > > log.cleanup.interval.mins=1 > > ookeeper.connect=localhost:2181 > > > > # Timeout in ms for connecting to zookeeper > > zookeeper.connection.timeout.ms=1000000 > > >