Steve - thanks for thIs demonstration. As you explained, the way Kafka works makes perfect sense. It uses the next offset.
I will say when I first hit this error a couple of years ago (with Storm Spouts) it startled me; it does feel like a bug to the uninitiated. But it's perfectly normal if one doesn't consume in time. Philip ---------------------------------- http://www.philipotoole.com > On Aug 20, 2014, at 2:28 PM, Steve Miller <st...@idrathernotsay.com> wrote: > > OK, so I recreated this, because I wasn't sure if the offsets were > preserved when the data was evaporated, or if somehow they reset to zero: > > /opt/kafka/bin/kafka-topics.sh --zookeeper whatever --topic test.deletion > --replication-factor 1 --partitions 1 --config "retention.ms=60000" --create > > which makes a topic where messages are deleted after approximately 60 seconds. > > Then I published to the topic: > > /opt/kafka/bin/kafka-console-producer.sh --broker-list mybroker:9092 > --topic test.deletion > type > some > stuff > > And I verified that I saw that stuff: > > /opt/kafka/bin/kafka-console-consumer.sh --zookeeper whatever --topic > test.deletion --from-beginning > > (and I did, I got three messages, with "type", "some", and "stuff", offsets > 0, 1, and 2, or so I remember). > > Then I used a python-kafka program to poke at the topic, since I'm about a > thousand times more familiar with the python API than with the Java one. > (See below. You can get the python API at > https://github.com/mumrah/kafka-python.) > > Then I could use this to print stuff in the topic, or tell it to go to a > specific offset. And sure enough, if I knew that (say) offset 2 existed, > then I let it age out, then I ran this with "-o 2", I'd get the expected > offsetOutOfRangeException. > > But if I feed in data, the next offset is used, as one would expect: that > is, if I'd fed in messages until the offset of the current message was 2, > then I let the data age out, then I fed one more message, I could fetch the > message with offset 3, no problem. You could also see from the filenames of > the log files for that topic that the most-recent offset is preserved -- not > the message, of course, just the fact that the most-recent message was (say) > #2. > > If this did reset the offset to 0, I'd call that a bug, but it doesn't, so > this is reasonable behavior, even if it's a little surprising. In addition > to what Philip suggested (increase retention, consume sooner), you could try > catching this specific exception and doing more or less what's suggested > here, using the SimpleConsumer API: > > > https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example > > (look at Finding Starting Offset For Reads). It's interesting that the > python API makes that quite a bit easier than the Java ones do, since the > python API gives you the handy-dandy seek() method, which behaves more or > less like the seek() syscalls in Unix. > > Note that that section even makes a passing reference to messages aging > out, as it says "Don't assume that offset 0 is the beginning offset, since > messages age out of the log over time." > > I hope this helps. > > -Steve > > (snip) > #!/usr/bin/python > > import os, sys > import time > import StringIO > import socket, struct, msgpack > import logging > # Should use argparse, but we shouldn't use python 2.6, either... > from optparse import OptionParser > > import simplejson as json > > from kafka.client import KafkaClient > from kafka.consumer import SimpleConsumer > > #logging.basicConfig(level=logging.DEBUG) > > def main(): > parser = OptionParser() > parser.add_option('-t', '--topic', dest='toget', > help='topic to which we should subscribe') > parser.add_option('-p', '--partitions', dest='partlist', > help='comma-separated list of partitions we should fetch from') > parser.add_option('-b', '--broker', dest='kafkaHost', > help='Kafka broker to which we should connect', > default='mybroker') > parser.add_option('-o', '--offset', dest='offset', > help='offset of first message we should read') > > (options, args) = parser.parse_args() > > if options.toget: > topic = options.toget > else: > topic = 'test.deletion' > > partitions = [] > if options.partlist is not None and options.partlist != '': > parts = options.partlist.split(',') > for part in parts: > partitions.append(int(part)) > else: > partitions.append(0) > > kafka = KafkaClient('%s:9092' % options.kafkaHost) > > print kafka.topic_partitions[topic] > > consumer = SimpleConsumer(client=kafka, > group="wombat.%s" % socket.gethostname(), > topic=topic, partitions=partitions, > fetch_size_bytes = 1024 * 1024, > auto_commit=False, buffer_size = 256 * 1024, > max_buffer_size = 2048 * 1024) > > if options.offset: > consumer.seek(int(options.offset), 0) > else: > consumer.seek(0, 0) > > while True: > messages = consumer.get_messages(count=1000, block=True) > #print '### got messages' > for message in messages: > print message.message.value > print message > > kafka.close() > > if __name__ == "__main__": > main() > > >> On Wed, Aug 20, 2014 at 09:28:12AM -0700, Philip O'Toole wrote: >> It's not a bug, right? It's the way the system works (if I have been >> following the thread correctly) -- when the retention time passes, the >> message is gone. Either consume your messages sooner, or increase your >> retention time. Kafka is not magic, it can only do what it's told. >> >> In practise I have found compression to be a big help -- big savings on disk >> space. >> >> >> Philip >> >> ? >> ----------------------------------------- >> http://www.philipotoole.com >> >> >> On Wednesday, August 20, 2014 1:42 AM, "pradeep.si...@wipro.com" >> <pradeep.si...@wipro.com> wrote: >> >> >> >> Sure, I would try with setting longer retention hours. But I feel this would >> not be good approach? Should we raise it as a bug? >> >> Thanks, >> Pradeep Simha >> Technical Lead >> >> -----Original Message----- >> From: Manjunath Shivakumar [mailto:manjunath.shivaku...@betfair.com] >> Sent: Wednesday, August 20, 2014 1:31 PM >> To: users@kafka.apache.org >> Subject: RE: Keep on getting kafka.common.OffsetOutOfRangeException: Random >> times >> >> We had a similar issue in our dev environments, where we had to configure >> aggressive log retention to save space. >> And the clients kept failing with this error, on Mondays, because the >> message from friday had got deleted. >> Perhaps compaction would help in this scenario too? >> https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction >> >> ________________________________________ >> From: Steve Miller [st...@idrathernotsay.com] >> Sent: 20 August 2014 08:47 >> To: users@kafka.apache.org >> Subject: Re: Keep on getting kafka.common.OffsetOutOfRangeException: Random >> times >> >> That seems likely.? I'd try either catching the exception and resetting the >> offset, or upping log.retention.hours.? I'd be interested in hearing if that >> fixes the problem. >> >> ? ? -Steve >> >>> On Aug 19, 2014, at 11:54 PM, <pradeep.si...@wipro.com> wrote: >>> >>> Thank you for your reply. Oh is retention hours have affect on this? I >>> didn't knew this. I have log.retention.hours set to 1, and during >>> development we test this once a 15 mins or hour or 2. So do you think this >>> is causing the issue? >>> >>> Thanks, >>> Pradeep Simha >>> Technical Lead >>> >>> -----Original Message----- >>> From: Steve Miller [mailto:st...@idrathernotsay.com] >>> Sent: Tuesday, August 19, 2014 6:13 PM >>> To: users@kafka.apache.org >>> Subject: Re: Keep on getting kafka.common.OffsetOutOfRangeException: >>> Random times >>> >>> ? Also, what do you have log.retention.hours set to?? How often do you >>> publish messages? >>> >>> ? I can envision a scenario in which you don't publish to a topic often, >>> and in fact publish so infrequently that everything in the topic ages out >>> from log.retention.hours first. >>> >>> ? I don't know exactly what happens should that occur, but I've seen some >>> stuff that makes me think that the offsets might go back to zero -- or >>> maybe they do if the broker restarts, so you might check to be sure that's >>> not happening. >>> >>> ? From what I've seen in that regard, I've been wondering if part of the >>> way most long-running Kafka consumers shouldn't be designed is to catch >>> that exception and either set their offset to the first available message >>> or the last available message, depending on whether their priority is to >>> get every message or if it's to get the most recent messages.? Though in >>> that scenario maybe it's that the first and last messages are the same by >>> definition since there aren't any messages left in the topic. (-: >>> >>> ? It's also possible that the specific topic weirdness that my specific >>> installation has been running into is causing that and it only happens for >>> the stuff I work on, so definitely take this with a grain of salt, I'm no >>> expert, just relating the local folklore. >>> >>> ? ? -Steve >>> >>>> On Tue, Aug 19, 2014 at 09:12:30AM +0000, pradeep.si...@wipro.com wrote: >>>> Hi Team, >>>> >>>> Can someone please help me in this? This is really becoming road block to >>>> our project we should decide whether to continue to use Kafka or some >>>> other project as it is becoming? too much of unstable. >>>> >>>> Thanks, >>>> Pradeep Simha >>>> Technical Lead >>>> >>>> -----Original Message----- >>>> From: pradeep.si...@wipro.com [mailto:pradeep.si...@wipro.com] >>>> Sent: Tuesday, August 19, 2014 9:30 AM >>>> To: users@kafka.apache.org >>>> Subject: RE: Keep on getting kafka.common.OffsetOutOfRangeException: >>>> Random times >>>> >>>> Hi Neha, >>>> >>>> Yes, I am using the latest version ie (0.8.1.1). >>>> >>>> Hi Guozhang, >>>> >>>> These are the values: >>>> >>>> #log.retention.bytes= 1073741824 (Yes, this was commented by default) >>>> >>>> log.retention.check.interval.ms=60000 >>>> >>>> Am I doing anything wrong here? Since I haven't touched this properties >>>> file. >>>> >>>> Thanks, >>>> Pradeep Simha >>>> Technical Lead >>>> >>>> -----Original Message----- >>>> From: Neha Narkhede [mailto:neha.narkh...@gmail.com] >>>> Sent: Tuesday, August 19, 2014 2:27 AM >>>> To: users@kafka.apache.org >>>> Subject: Re: Keep on getting kafka.common.OffsetOutOfRangeException: >>>> Random times >>>> >>>> Also, what version of Kafka are you using? 0.8.1.1 is the latest most >>>> stable version. >>>> >>>> >>>>> On Mon, Aug 18, 2014 at 9:36 AM, Guozhang Wang <wangg...@gmail.com> wrote: >>>>> >>>>> Hi Pradeep, >>>>> >>>>> It seems your logs gets deleted due to retention policy. Could you >>>>> check the config values for log.retention.bytes and >>>>> log.retention.check.interval.ms? >>>>> >>>>> http://kafka.apache.org/documentation.html#brokerconfigs >>>>> >>>>> Guozhang >>>>> >>>>> >>>>>> On Mon, Aug 18, 2014 at 5:49 AM, <pradeep.si...@wipro.com> wrote: >>>>>> >>>>>> Hi Team, >>>>>> >>>>>> Of late I am facing strange issue w.r.t Kafka. Random times I keep >>>>>> on getting these strange errors while consuming the topic: >>>>>> >>>>>> >>>>>> kafka.common.OffsetOutOfRangeException: Request for offset 19 but >>>>>> we only have log segments in the range 0 to 0. >>>>>> Sometimes I get like this: >>>>>> >>>>>> >>>>>> kafka.common.OffsetOutOfRangeException: Request for offset 19 but >>>>>> we only have log segments in the range 19 to 22. >>>>>> >>>>>> That number keeps on changing (with random ranges). I don't know >>>>>> what is the problem here. Both producer and consumer will work >>>>>> perfectly, but I keep on getting these errors randomly. In that >>>>>> situation if I clear the logs, remove the broker again it starts working >>>>>> fine again. >>>>>> >>>>>> Can anyone please help me in this regard? This is affecting our >>>>>> application stability, if any more information required I can >>>>>> provide, >>>>> also >>>>>> we are using only the defaults provided by the kafka we didn't >>>>>> changed >>>>> any >>>>>> settings. >>>>>> >>>>>> Thanks, >>>>>> Pradeep Simha >>>>>> Technical Lead >>>>>> >>>>>> The information contained in this electronic message and any >>>>>> attachments to this message are intended for the exclusive use of >>>>>> the addressee(s) >>>>> and >>>>>> may contain proprietary, confidential or privileged information. >>>>>> If you >>>>> are >>>>>> not the intended recipient, you should not disseminate, distribute >>>>>> or >>>>> copy >>>>>> this e-mail. Please notify the sender immediately and destroy all >>>>>> copies >>>>> of >>>>>> this message and any attachments. >>>>>> >>>>>> WARNING: Computer viruses can be transmitted via email. The >>>>>> recipient should check this email and any attachments for the presence >>>>>> of viruses. >>>>>> The company accepts no liability for any damage caused by any virus >>>>>> transmitted by this email. >>>>>> >>>>>> www.wipro.com >>>>> >>>>> >>>>> >>>>> -- >>>>> -- Guozhang >>>> >>>> The information contained in this electronic message and any attachments >>>> to this message are intended for the exclusive use of the addressee(s) and >>>> may contain proprietary, confidential or privileged information. If you >>>> are not the intended recipient, you should not disseminate, distribute or >>>> copy this e-mail. Please notify the sender immediately and destroy all >>>> copies of this message and any attachments. >>>> >>>> WARNING: Computer viruses can be transmitted via email. The recipient >>>> should check this email and any attachments for the presence of viruses. >>>> The company accepts no liability for any damage caused by any virus >>>> transmitted by this email. >>>> >>>> www.wipro.com >>>> >>>> The information contained in this electronic message and any attachments >>>> to this message are intended for the exclusive use of the addressee(s) and >>>> may contain proprietary, confidential or privileged information. If you >>>> are not the intended recipient, you should not disseminate, distribute or >>>> copy this e-mail. Please notify the sender immediately and destroy all >>>> copies of this message and any attachments. >>>> >>>> WARNING: Computer viruses can be transmitted via email. The recipient >>>> should check this email and any attachments for the presence of viruses. >>>> The company accepts no liability for any damage caused by any virus >>>> transmitted by this email. >>>> >>>> www.wipro.com >>> >>> The information contained in this electronic message and any attachments to >>> this message are intended for the exclusive use of the addressee(s) and may >>> contain proprietary, confidential or privileged information. If you are not >>> the intended recipient, you should not disseminate, distribute or copy this >>> e-mail. Please notify the sender immediately and destroy all copies of this >>> message and any attachments. >>> >>> WARNING: Computer viruses can be transmitted via email. The recipient >>> should check this email and any attachments for the presence of viruses. >>> The company accepts no liability for any damage caused by any virus >>> transmitted by this email. >>> >>> www.wipro.com >> >> ________________________________________________________________________ >> In order to protect our email recipients, Betfair Group use SkyScan from >> MessageLabs to scan all Incoming and Outgoing mail for viruses. >> >> >> ________________________________________________________________________ >> >> The information contained in this electronic message and any attachments to >> this message are intended for the exclusive use of the addressee(s) and may >> contain proprietary, confidential or privileged information. If you are not >> the intended recipient, you should not disseminate, distribute or copy this >> e-mail. Please notify the sender immediately and destroy all copies of this >> message and any attachments. >> >> WARNING: Computer viruses can be transmitted via email. The recipient should >> check this email and any attachments for the presence of viruses. The >> company accepts no liability for any damage caused by any virus transmitted >> by this email. >> >> www.wipro.com