This functionality is parallel to the stream being consumed into an indexed store: it serves a different point in the latency/completeness/fault tolerance space. It's actually combined with skipping forward and back, to prioritize new over old data, much as Otis describes, I think.
On Dec 6, 2013, at 8:07 AM, Joe Stein <joe.st...@stealth.ly> wrote: > Steven, you might be better off reading the Kafka stream into Cassandra and > then doing the reads that way > > /******************************************* > Joe Stein > Founder, Principal Consultant > Big Data Open Source Security LLC > http://www.stealth.ly > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > ********************************************/ > > > On Fri, Dec 6, 2013 at 11:04 AM, Steven Parkes <smpar...@smparkes.net>wrote: > >> I've figured I may end up doing something like this for reasons that sound >> similar to what Otis describes. >> >> In my case, I may end up binary searching the log to match msg #s to dates >> in the actual messages to find a particular time range of interest. >> >> So far it's just theory: I haven't gotten to the point of POC/sanity >> checking it. But it sounds like it should work ... >> >> On Dec 6, 2013, at 7:56 AM, Joe Stein <joe.st...@stealth.ly> wrote: >> >>> You got the hamster on the wheel with this one :) >>> >>> So one way to make it work without any changes (or at least maybe very >>> minor changes if at all) would possibly be to use your max offset and >> fetch >>> size this way >>> >>> M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12 >>> >>> to get >>> >>> get last 3: M10 M11 M12 >>> get last 3: M7 M8 M9 >>> get last 3: M4 M5 M6 >>> get last 3: M1 M2 M3 >>> >>> you would start at the end to get the highwaterk mark offset then you >> would >>> do >>> >>> maxOffset (since your at the end) - 3 with a fetch size of 3 and then >> keep >>> doing that >>> >>> so technically you are still looking forward but you are making the start >>> position of your offset 3 behind >>> >>> so if the offset numbers matched your numbers (so offset of M1 is 1 and >>> offset of M2 is 2) for this example... >>> >>> fetch((12-3),3) >>> fetch((12-3-3),3) >>> fetch(12-3-3-3),3) >>> fetch(12-3-3-3),3) >>> >>> would produce >>> >>> M10 M11 M12 >>> M7 M8 M9 >>> M4 M5 M6 >>> M1 M2 M3 >>> >>> This would mean no broker changes :) and just "tricking" the val >>> fetchRequest = fetchRequestBuilder in your implementation of the >>> SimpleConsumerShell.scala to "look backwards" but you are just moving the >>> offset backwards from the end looking forward for your fetch size >>> >>> make sense? >>> >>> >>> /******************************************* >>> Joe Stein >>> Founder, Principal Consultant >>> Big Data Open Source Security LLC >>> http://www.stealth.ly >>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> >>> ********************************************/ >>> >>> >>> On Fri, Dec 6, 2013 at 10:43 AM, Joe Stein <joe.st...@stealth.ly> wrote: >>> >>>> hmmm, I just realized that wouldn't work actually (starting at the end >> is >>>> fine)... the fetch size being taken in is still going to increment >> forward >>>> ... >>>> >>>> The KafkaApi would have to change because in readMessageSet it is doing >> a >>>> log.read of the FileMessageSet ... >>>> >>>> it should be possible though but not without changing the way the log is >>>> read when getting the partition with ReplicaManager >>>> >>>> so let me take that all back and say... can't be done now but I think it >>>> is feasible to be done with some broker modifications to read the log >>>> differently... off the top of my head can't think of how to change the >>>> log.read to-do this without digging more down into the code >>>> >>>> >>>> >>>> /******************************************* >>>> Joe Stein >>>> Founder, Principal Consultant >>>> Big Data Open Source Security LLC >>>> http://www.stealth.ly >>>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> >>>> ********************************************/ >>>> >>>> >>>> On Fri, Dec 6, 2013 at 10:26 AM, Joe Stein <joe.st...@stealth.ly> >> wrote: >>>> >>>>> The fetch requests are very flexible to-do what you want with them. >>>>> >>>>> Take a look at SimpleConsumerShell.scala as a reference >>>>> >>>>> You could pass in OffsetRequest.LatestTime (-1) with a fetch size of 3 >>>>> and then just keep doing that over and over again. >>>>> >>>>> I think that will do exactly what you are looking to-do. >>>>> >>>>> /******************************************* >>>>> Joe Stein >>>>> Founder, Principal Consultant >>>>> Big Data Open Source Security LLC >>>>> http://www.stealth.ly >>>>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> >>>>> ********************************************/ >>>>> >>>>> >>>>> On Fri, Dec 6, 2013 at 10:04 AM, Otis Gospodnetic < >>>>> otis.gospodne...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> On Fri, Dec 6, 2013 at 9:38 AM, Tom Brown <tombrow...@gmail.com> >> wrote: >>>>>> >>>>>>> Do you mean you want to start from the most recent data and go >>>>>> backwards to >>>>>>> the oldest data, or that you want to start with old data and consume >>>>>>> forwards? >>>>>>> >>>>>> >>>>>> Forwards is the "normal way". I'm looking for the "abnormal way", of >>>>>> course ;) i.e. backwards. >>>>>> If the following are the messages that came in, oldest to newest: >>>>>> >>>>>> M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12 >>>>>> >>>>>> Then I'd love to be able to consume from the end, say in batches of 3, >>>>>> like >>>>>> this: >>>>>> >>>>>> get last 3: M10 M11 M12 >>>>>> get last 3: M7 M8 M9 >>>>>> get last 3: M4 M5 M6 >>>>>> get last 3: M1 M2 M3 >>>>>> >>>>>> Of course, if messages keep coming in, then the new ones that arrive >>>>>> would >>>>>> get picked up first and, eventually, assuming Consumer can consume >> faster >>>>>> than messages are produced, all messages will get consumed. >>>>>> >>>>>> But the important/key part is that any new ones that arrive will get >>>>>> picked >>>>>> up first. >>>>>> >>>>>> If the former, it would be difficult or impossible in 0.7.x, but I >> think >>>>>>> doable in 0.8.x. (They added some sort of message index). If the >>>>>> latter, >>>>>>> that is easily accomplished in both versions. >>>>>>> >>>>>> >>>>>> I'd love to know if that's really so and how to do it! >>>>>> >>>>>> We are looking to move to Kafka 0.8 in January and to add performance >>>>>> monitoring for Kafka 0.8 to SPM (see >>>>>> >>>>>> >> http://blog.sematext.com/2013/10/16/announcement-spm-performance-monitoring-for-kafka/ >>>>>> ) >>>>>> >>>>>> Thanks, >>>>>> Otis >>>>>> -- >>>>>> Performance Monitoring * Log Analytics * Search Analytics >>>>>> Solr & Elasticsearch Support * http://sematext.com/ >>>>>> >>>>>> >>>>>> >>>>>>> On Friday, December 6, 2013, Otis Gospodnetic wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Does Kafka offer a way to consume messages in batches, but "from the >>>>>>> end"? >>>>>>>> >>>>>>>> This would be valuable to have in all systems where the most recent >>>>>> data >>>>>>> is >>>>>>>> a lot more important than older data, such as performance metrics, >>>>>> and >>>>>>>> maybe even logs....maybe also trading/financial data, and such. >>>>>>>> >>>>>>>> Any chance of this sort of functionality ever making it into Kafka, >>>>>> or is >>>>>>>> this simply not implementable due to some underlying assumptions, or >>>>>> data >>>>>>>> structures, or ... ? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Otis >>>>>>>> -- >>>>>>>> Performance Monitoring * Log Analytics * Search Analytics >>>>>>>> Solr & Elasticsearch Support * http://sematext.com/ >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >> >>