Correct.
New data needs to get in first.
Old data can get back willed slowly.
How quickly old data gets backfilled depends on how fast Consumer is
relative to Producer.
If Consumer if faster (it better be!) it will always consume the latest N
messages, but each time some portion of those N would be "older" messages,
so with every poll to Kafka Broker it will slowly "chew backwards".

It sounds like I'm not the only one needing this! :)

Otis
--
Performance Monitoring * Log Analytics * Search Analytics
Solr & Elasticsearch Support * http://sematext.com/


On Fri, Dec 6, 2013 at 11:17 AM, Steven Parkes <smpar...@smparkes.net>wrote:

> This functionality  is parallel to the stream being consumed into an
> indexed store: it serves a different point in the
> latency/completeness/fault tolerance space. It's actually combined with
> skipping forward and back, to prioritize new over old data, much as Otis
> describes, I think.
>
> On Dec 6, 2013, at 8:07 AM, Joe Stein <joe.st...@stealth.ly> wrote:
>
> > Steven, you might be better off reading the Kafka stream into Cassandra
> and
> > then doing the reads that way
> >
> > /*******************************************
> > Joe Stein
> > Founder, Principal Consultant
> > Big Data Open Source Security LLC
> > http://www.stealth.ly
> > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > ********************************************/
> >
> >
> > On Fri, Dec 6, 2013 at 11:04 AM, Steven Parkes <smpar...@smparkes.net
> >wrote:
> >
> >> I've figured I may end up doing something like this for reasons that
> sound
> >> similar to what Otis describes.
> >>
> >> In my case, I may end up binary searching the log to match msg #s to
> dates
> >> in the actual messages to find a particular time range of interest.
> >>
> >> So far it's just theory: I haven't gotten to the point of POC/sanity
> >> checking it. But it sounds like it should work ...
> >>
> >> On Dec 6, 2013, at 7:56 AM, Joe Stein <joe.st...@stealth.ly> wrote:
> >>
> >>> You got the hamster on the wheel with this one :)
> >>>
> >>> So one way to make it work without any changes (or at least maybe very
> >>> minor changes if at all) would possibly be to use your max offset and
> >> fetch
> >>> size this way
> >>>
> >>> M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12
> >>>
> >>> to get
> >>>
> >>> get last 3: M10 M11 M12
> >>> get last 3: M7 M8 M9
> >>> get last 3: M4 M5 M6
> >>> get last 3: M1 M2 M3
> >>>
> >>> you would start at the end to get the highwaterk mark offset then you
> >> would
> >>> do
> >>>
> >>> maxOffset (since your at the end) - 3 with a fetch size of 3 and then
> >> keep
> >>> doing that
> >>>
> >>> so technically you are still looking forward but you are making the
> start
> >>> position of your offset 3 behind
> >>>
> >>> so if the offset numbers matched your numbers (so offset of M1 is 1 and
> >>> offset of M2 is 2) for this example...
> >>>
> >>> fetch((12-3),3)
> >>> fetch((12-3-3),3)
> >>> fetch(12-3-3-3),3)
> >>> fetch(12-3-3-3),3)
> >>>
> >>> would produce
> >>>
> >>> M10 M11 M12
> >>> M7 M8 M9
> >>> M4 M5 M6
> >>> M1 M2 M3
> >>>
> >>> This would mean no broker changes :) and just "tricking" the val
> >>> fetchRequest = fetchRequestBuilder in your implementation of the
> >>> SimpleConsumerShell.scala to "look backwards" but you are just moving
> the
> >>> offset backwards from the end looking forward for your fetch size
> >>>
> >>> make sense?
> >>>
> >>>
> >>> /*******************************************
> >>> Joe Stein
> >>> Founder, Principal Consultant
> >>> Big Data Open Source Security LLC
> >>> http://www.stealth.ly
> >>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> >>> ********************************************/
> >>>
> >>>
> >>> On Fri, Dec 6, 2013 at 10:43 AM, Joe Stein <joe.st...@stealth.ly>
> wrote:
> >>>
> >>>> hmmm, I just realized that wouldn't work actually (starting at the end
> >> is
> >>>> fine)... the fetch size being taken in is still going to increment
> >> forward
> >>>> ...
> >>>>
> >>>> The KafkaApi would have to change because in readMessageSet it is
> doing
> >> a
> >>>> log.read of the FileMessageSet ...
> >>>>
> >>>> it should be possible though but not without changing the way the log
> is
> >>>> read when getting the partition with ReplicaManager
> >>>>
> >>>> so let me take that all back and say... can't be done now but I think
> it
> >>>> is feasible to be done with some broker modifications to read the log
> >>>> differently... off the top of my head can't think of how to change the
> >>>> log.read to-do this without digging more down into the code
> >>>>
> >>>>
> >>>>
> >>>> /*******************************************
> >>>> Joe Stein
> >>>> Founder, Principal Consultant
> >>>> Big Data Open Source Security LLC
> >>>> http://www.stealth.ly
> >>>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> >>>> ********************************************/
> >>>>
> >>>>
> >>>> On Fri, Dec 6, 2013 at 10:26 AM, Joe Stein <joe.st...@stealth.ly>
> >> wrote:
> >>>>
> >>>>> The fetch requests are very flexible to-do what you want with them.
> >>>>>
> >>>>> Take a look at SimpleConsumerShell.scala as a reference
> >>>>>
> >>>>> You could pass in OffsetRequest.LatestTime (-1) with a fetch size of
> 3
> >>>>> and then just keep doing that over and over again.
> >>>>>
> >>>>> I think that will do exactly what you are looking to-do.
> >>>>>
> >>>>> /*******************************************
> >>>>> Joe Stein
> >>>>> Founder, Principal Consultant
> >>>>> Big Data Open Source Security LLC
> >>>>> http://www.stealth.ly
> >>>>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> >>>>> ********************************************/
> >>>>>
> >>>>>
> >>>>> On Fri, Dec 6, 2013 at 10:04 AM, Otis Gospodnetic <
> >>>>> otis.gospodne...@gmail.com> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> On Fri, Dec 6, 2013 at 9:38 AM, Tom Brown <tombrow...@gmail.com>
> >> wrote:
> >>>>>>
> >>>>>>> Do you mean you want to start from the most recent data and go
> >>>>>> backwards to
> >>>>>>> the oldest data, or that you want to start with old data and
> consume
> >>>>>>> forwards?
> >>>>>>>
> >>>>>>
> >>>>>> Forwards is the "normal way".  I'm looking for the "abnormal way",
> of
> >>>>>> course ;) i.e. backwards.
> >>>>>> If the following are the messages that came in, oldest to newest:
> >>>>>>
> >>>>>> M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12
> >>>>>>
> >>>>>> Then I'd love to be able to consume from the end, say in batches of
> 3,
> >>>>>> like
> >>>>>> this:
> >>>>>>
> >>>>>> get last 3: M10 M11 M12
> >>>>>> get last 3: M7 M8 M9
> >>>>>> get last 3: M4 M5 M6
> >>>>>> get last 3: M1 M2 M3
> >>>>>>
> >>>>>> Of course, if messages keep coming in, then the new ones that arrive
> >>>>>> would
> >>>>>> get picked up first and, eventually, assuming Consumer can consume
> >> faster
> >>>>>> than messages are produced, all messages will get consumed.
> >>>>>>
> >>>>>> But the important/key part is that any new ones that arrive will get
> >>>>>> picked
> >>>>>> up first.
> >>>>>>
> >>>>>> If the former, it would be difficult or impossible in 0.7.x, but I
> >> think
> >>>>>>> doable in 0.8.x. (They added some sort of message index). If the
> >>>>>> latter,
> >>>>>>> that is easily accomplished in both versions.
> >>>>>>>
> >>>>>>
> >>>>>> I'd love to know if that's really so and how to do it!
> >>>>>>
> >>>>>> We are looking to move to Kafka 0.8 in January and to add
> performance
> >>>>>> monitoring for Kafka 0.8 to SPM (see
> >>>>>>
> >>>>>>
> >>
> http://blog.sematext.com/2013/10/16/announcement-spm-performance-monitoring-for-kafka/
> >>>>>> )
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Otis
> >>>>>> --
> >>>>>> Performance Monitoring * Log Analytics * Search Analytics
> >>>>>> Solr & Elasticsearch Support * http://sematext.com/
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>> On Friday, December 6, 2013, Otis Gospodnetic wrote:
> >>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> Does Kafka offer a way to consume messages in batches, but "from
> the
> >>>>>>> end"?
> >>>>>>>>
> >>>>>>>> This would be valuable to have in all systems where the most
> recent
> >>>>>> data
> >>>>>>> is
> >>>>>>>> a lot more important than older data, such as performance metrics,
> >>>>>> and
> >>>>>>>> maybe even logs....maybe also trading/financial data, and such.
> >>>>>>>>
> >>>>>>>> Any chance of this sort of functionality ever making it into
> Kafka,
> >>>>>> or is
> >>>>>>>> this simply not implementable due to some underlying assumptions,
> or
> >>>>>> data
> >>>>>>>> structures, or ... ?
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Otis
> >>>>>>>> --
> >>>>>>>> Performance Monitoring * Log Analytics * Search Analytics
> >>>>>>>> Solr & Elasticsearch Support * http://sematext.com/
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>
> >>
>
>

Reply via email to