You got the hamster on the wheel with this one :)

So one way to make it work without any changes (or at least maybe very
minor changes if at all) would possibly be to use your max offset and fetch
size this way

M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12

to get

get last 3: M10 M11 M12
get last 3: M7 M8 M9
get last 3: M4 M5 M6
get last 3: M1 M2 M3

you would start at the end to get the highwaterk mark offset then you would
do

maxOffset (since your at the end) - 3 with a fetch size of 3 and then keep
doing that

so technically you are still looking forward but you are making the start
position of your offset 3 behind

so if the offset numbers matched your numbers (so offset of M1 is 1 and
offset of M2 is 2) for this example...

fetch((12-3),3)
fetch((12-3-3),3)
fetch(12-3-3-3),3)
fetch(12-3-3-3),3)

would produce

M10 M11 M12
M7 M8 M9
M4 M5 M6
M1 M2 M3

This would mean no broker changes :) and just "tricking" the val
fetchRequest = fetchRequestBuilder in your implementation of the
SimpleConsumerShell.scala to "look backwards" but you are just moving the
offset backwards from the end looking forward for your fetch size

make sense?


/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/


On Fri, Dec 6, 2013 at 10:43 AM, Joe Stein <joe.st...@stealth.ly> wrote:

> hmmm, I just realized that wouldn't work actually (starting at the end is
> fine)... the fetch size being taken in is still going to increment forward
> ...
>
> The KafkaApi would have to change because in readMessageSet it is doing a
> log.read of the FileMessageSet ...
>
> it should be possible though but not without changing the way the log is
> read when getting the partition with ReplicaManager
>
> so let me take that all back and say... can't be done now but I think it
> is feasible to be done with some broker modifications to read the log
> differently... off the top of my head can't think of how to change the
> log.read to-do this without digging more down into the code
>
>
>
> /*******************************************
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ********************************************/
>
>
> On Fri, Dec 6, 2013 at 10:26 AM, Joe Stein <joe.st...@stealth.ly> wrote:
>
>> The fetch requests are very flexible to-do what you want with them.
>>
>> Take a look at SimpleConsumerShell.scala as a reference
>>
>> You could pass in OffsetRequest.LatestTime (-1) with a fetch size of 3
>> and then just keep doing that over and over again.
>>
>> I think that will do exactly what you are looking to-do.
>>
>> /*******************************************
>>   Joe Stein
>>  Founder, Principal Consultant
>>  Big Data Open Source Security LLC
>>  http://www.stealth.ly
>>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>> ********************************************/
>>
>>
>> On Fri, Dec 6, 2013 at 10:04 AM, Otis Gospodnetic <
>> otis.gospodne...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> On Fri, Dec 6, 2013 at 9:38 AM, Tom Brown <tombrow...@gmail.com> wrote:
>>>
>>> > Do you mean you want to start from the most recent data and go
>>> backwards to
>>> > the oldest data, or that you want to start with old data and consume
>>> > forwards?
>>> >
>>>
>>> Forwards is the "normal way".  I'm looking for the "abnormal way", of
>>> course ;) i.e. backwards.
>>> If the following are the messages that came in, oldest to newest:
>>>
>>> M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12
>>>
>>> Then I'd love to be able to consume from the end, say in batches of 3,
>>> like
>>> this:
>>>
>>> get last 3: M10 M11 M12
>>> get last 3: M7 M8 M9
>>> get last 3: M4 M5 M6
>>> get last 3: M1 M2 M3
>>>
>>> Of course, if messages keep coming in, then the new ones that arrive
>>> would
>>> get picked up first and, eventually, assuming Consumer can consume faster
>>> than messages are produced, all messages will get consumed.
>>>
>>> But the important/key part is that any new ones that arrive will get
>>> picked
>>> up first.
>>>
>>> If the former, it would be difficult or impossible in 0.7.x, but I think
>>> > doable in 0.8.x. (They added some sort of message index). If the
>>> latter,
>>> > that is easily accomplished in both versions.
>>> >
>>>
>>> I'd love to know if that's really so and how to do it!
>>>
>>> We are looking to move to Kafka 0.8 in January and to add performance
>>> monitoring for Kafka 0.8 to SPM (see
>>>
>>> http://blog.sematext.com/2013/10/16/announcement-spm-performance-monitoring-for-kafka/
>>> )
>>>
>>> Thanks,
>>> Otis
>>> --
>>> Performance Monitoring * Log Analytics * Search Analytics
>>> Solr & Elasticsearch Support * http://sematext.com/
>>>
>>>
>>>
>>> > On Friday, December 6, 2013, Otis Gospodnetic wrote:
>>> >
>>> > > Hi,
>>> > >
>>> > > Does Kafka offer a way to consume messages in batches, but "from the
>>> > end"?
>>> > >
>>> > > This would be valuable to have in all systems where the most recent
>>> data
>>> > is
>>> > > a lot more important than older data, such as performance metrics,
>>> and
>>> > > maybe even logs....maybe also trading/financial data, and such.
>>> > >
>>> > > Any chance of this sort of functionality ever making it into Kafka,
>>> or is
>>> > > this simply not implementable due to some underlying assumptions, or
>>> data
>>> > > structures, or ... ?
>>> > >
>>> > > Thanks,
>>> > > Otis
>>> > > --
>>> > > Performance Monitoring * Log Analytics * Search Analytics
>>> > > Solr & Elasticsearch Support * http://sematext.com/
>>> > >
>>> >
>>>
>>
>>
>

Reply via email to