If there is a TTL/expiration hook, that's another possibility as well. Add a hook that when a message times out, it gets put onto another "low priority" queue. Or something along those lines.
So the main queue only has recent messages to consume. -Suren On Fri, Dec 6, 2013 at 11:11 AM, Surendranauth Hiraman < suren.hira...@sociocast.com> wrote: > Depending on your exact requirements, you may consider priority queues as > well. I'm not sure if Kafka has this (without rolling your own) but it is > in JMS implementations. > > I guess you aren't looking for strict LIFO order, since messages are > constantly being put on the queue, so "LIFO" is only with respect to a very > transient moment in time. > > -Suren > > > > On Fri, Dec 6, 2013 at 11:07 AM, Joe Stein <joe.st...@stealth.ly> wrote: > >> Steven, you might be better off reading the Kafka stream into Cassandra >> and >> then doing the reads that way >> >> /******************************************* >> Joe Stein >> Founder, Principal Consultant >> Big Data Open Source Security LLC >> http://www.stealth.ly >> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> >> ********************************************/ >> >> >> On Fri, Dec 6, 2013 at 11:04 AM, Steven Parkes <smpar...@smparkes.net >> >wrote: >> >> > I've figured I may end up doing something like this for reasons that >> sound >> > similar to what Otis describes. >> > >> > In my case, I may end up binary searching the log to match msg #s to >> dates >> > in the actual messages to find a particular time range of interest. >> > >> > So far it's just theory: I haven't gotten to the point of POC/sanity >> > checking it. But it sounds like it should work ... >> > >> > On Dec 6, 2013, at 7:56 AM, Joe Stein <joe.st...@stealth.ly> wrote: >> > >> > > You got the hamster on the wheel with this one :) >> > > >> > > So one way to make it work without any changes (or at least maybe very >> > > minor changes if at all) would possibly be to use your max offset and >> > fetch >> > > size this way >> > > >> > > M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12 >> > > >> > > to get >> > > >> > > get last 3: M10 M11 M12 >> > > get last 3: M7 M8 M9 >> > > get last 3: M4 M5 M6 >> > > get last 3: M1 M2 M3 >> > > >> > > you would start at the end to get the highwaterk mark offset then you >> > would >> > > do >> > > >> > > maxOffset (since your at the end) - 3 with a fetch size of 3 and then >> > keep >> > > doing that >> > > >> > > so technically you are still looking forward but you are making the >> start >> > > position of your offset 3 behind >> > > >> > > so if the offset numbers matched your numbers (so offset of M1 is 1 >> and >> > > offset of M2 is 2) for this example... >> > > >> > > fetch((12-3),3) >> > > fetch((12-3-3),3) >> > > fetch(12-3-3-3),3) >> > > fetch(12-3-3-3),3) >> > > >> > > would produce >> > > >> > > M10 M11 M12 >> > > M7 M8 M9 >> > > M4 M5 M6 >> > > M1 M2 M3 >> > > >> > > This would mean no broker changes :) and just "tricking" the val >> > > fetchRequest = fetchRequestBuilder in your implementation of the >> > > SimpleConsumerShell.scala to "look backwards" but you are just moving >> the >> > > offset backwards from the end looking forward for your fetch size >> > > >> > > make sense? >> > > >> > > >> > > /******************************************* >> > > Joe Stein >> > > Founder, Principal Consultant >> > > Big Data Open Source Security LLC >> > > http://www.stealth.ly >> > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> >> > > ********************************************/ >> > > >> > > >> > > On Fri, Dec 6, 2013 at 10:43 AM, Joe Stein <joe.st...@stealth.ly> >> wrote: >> > > >> > >> hmmm, I just realized that wouldn't work actually (starting at the >> end >> > is >> > >> fine)... the fetch size being taken in is still going to increment >> > forward >> > >> ... >> > >> >> > >> The KafkaApi would have to change because in readMessageSet it is >> doing >> > a >> > >> log.read of the FileMessageSet ... >> > >> >> > >> it should be possible though but not without changing the way the >> log is >> > >> read when getting the partition with ReplicaManager >> > >> >> > >> so let me take that all back and say... can't be done now but I >> think it >> > >> is feasible to be done with some broker modifications to read the log >> > >> differently... off the top of my head can't think of how to change >> the >> > >> log.read to-do this without digging more down into the code >> > >> >> > >> >> > >> >> > >> /******************************************* >> > >> Joe Stein >> > >> Founder, Principal Consultant >> > >> Big Data Open Source Security LLC >> > >> http://www.stealth.ly >> > >> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> >> > >> ********************************************/ >> > >> >> > >> >> > >> On Fri, Dec 6, 2013 at 10:26 AM, Joe Stein <joe.st...@stealth.ly> >> > wrote: >> > >> >> > >>> The fetch requests are very flexible to-do what you want with them. >> > >>> >> > >>> Take a look at SimpleConsumerShell.scala as a reference >> > >>> >> > >>> You could pass in OffsetRequest.LatestTime (-1) with a fetch size >> of 3 >> > >>> and then just keep doing that over and over again. >> > >>> >> > >>> I think that will do exactly what you are looking to-do. >> > >>> >> > >>> /******************************************* >> > >>> Joe Stein >> > >>> Founder, Principal Consultant >> > >>> Big Data Open Source Security LLC >> > >>> http://www.stealth.ly >> > >>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> >> > >>> ********************************************/ >> > >>> >> > >>> >> > >>> On Fri, Dec 6, 2013 at 10:04 AM, Otis Gospodnetic < >> > >>> otis.gospodne...@gmail.com> wrote: >> > >>> >> > >>>> Hi, >> > >>>> >> > >>>> On Fri, Dec 6, 2013 at 9:38 AM, Tom Brown <tombrow...@gmail.com> >> > wrote: >> > >>>> >> > >>>>> Do you mean you want to start from the most recent data and go >> > >>>> backwards to >> > >>>>> the oldest data, or that you want to start with old data and >> consume >> > >>>>> forwards? >> > >>>>> >> > >>>> >> > >>>> Forwards is the "normal way". I'm looking for the "abnormal way", >> of >> > >>>> course ;) i.e. backwards. >> > >>>> If the following are the messages that came in, oldest to newest: >> > >>>> >> > >>>> M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12 >> > >>>> >> > >>>> Then I'd love to be able to consume from the end, say in batches >> of 3, >> > >>>> like >> > >>>> this: >> > >>>> >> > >>>> get last 3: M10 M11 M12 >> > >>>> get last 3: M7 M8 M9 >> > >>>> get last 3: M4 M5 M6 >> > >>>> get last 3: M1 M2 M3 >> > >>>> >> > >>>> Of course, if messages keep coming in, then the new ones that >> arrive >> > >>>> would >> > >>>> get picked up first and, eventually, assuming Consumer can consume >> > faster >> > >>>> than messages are produced, all messages will get consumed. >> > >>>> >> > >>>> But the important/key part is that any new ones that arrive will >> get >> > >>>> picked >> > >>>> up first. >> > >>>> >> > >>>> If the former, it would be difficult or impossible in 0.7.x, but I >> > think >> > >>>>> doable in 0.8.x. (They added some sort of message index). If the >> > >>>> latter, >> > >>>>> that is easily accomplished in both versions. >> > >>>>> >> > >>>> >> > >>>> I'd love to know if that's really so and how to do it! >> > >>>> >> > >>>> We are looking to move to Kafka 0.8 in January and to add >> performance >> > >>>> monitoring for Kafka 0.8 to SPM (see >> > >>>> >> > >>>> >> > >> http://blog.sematext.com/2013/10/16/announcement-spm-performance-monitoring-for-kafka/ >> > >>>> ) >> > >>>> >> > >>>> Thanks, >> > >>>> Otis >> > >>>> -- >> > >>>> Performance Monitoring * Log Analytics * Search Analytics >> > >>>> Solr & Elasticsearch Support * http://sematext.com/ >> > >>>> >> > >>>> >> > >>>> >> > >>>>> On Friday, December 6, 2013, Otis Gospodnetic wrote: >> > >>>>> >> > >>>>>> Hi, >> > >>>>>> >> > >>>>>> Does Kafka offer a way to consume messages in batches, but "from >> the >> > >>>>> end"? >> > >>>>>> >> > >>>>>> This would be valuable to have in all systems where the most >> recent >> > >>>> data >> > >>>>> is >> > >>>>>> a lot more important than older data, such as performance >> metrics, >> > >>>> and >> > >>>>>> maybe even logs....maybe also trading/financial data, and such. >> > >>>>>> >> > >>>>>> Any chance of this sort of functionality ever making it into >> Kafka, >> > >>>> or is >> > >>>>>> this simply not implementable due to some underlying >> assumptions, or >> > >>>> data >> > >>>>>> structures, or ... ? >> > >>>>>> >> > >>>>>> Thanks, >> > >>>>>> Otis >> > >>>>>> -- >> > >>>>>> Performance Monitoring * Log Analytics * Search Analytics >> > >>>>>> Solr & Elasticsearch Support * http://sematext.com/ >> > >>>>>> >> > >>>>> >> > >>>> >> > >>> >> > >>> >> > >> >> > >> > >> > > > > -- > ___________________________________________ > Available at these partners: > [image: CloudFlare | shopify | Bigcommerce] > > SUREN HIRAMAN, VP TECHNOLOGY > SOCIOCAST > Simple. Powerful. Predictions. > > 96 SPRING STREET, 7TH FLOOR > NEW YORK, NY 10012 > O: (917) 525-2466 ext. 105 > F: 646.349.4063 > E: suren.hira...@sociocast.com > W: www.sociocast.com > > Increase Conversion Rates up to 500%. Go to www.sociocast.com and enter > your URL for a free trial! > -- ___________________________________________ Available at these partners: [image: CloudFlare | shopify | Bigcommerce] SUREN HIRAMAN, VP TECHNOLOGY SOCIOCAST Simple. Powerful. Predictions. 96 SPRING STREET, 7TH FLOOR NEW YORK, NY 10012 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hira...@sociocast.com W: www.sociocast.com Increase Conversion Rates up to 500%. Go to www.sociocast.com and enter your URL for a free trial!