yes, also you can read backwards on the stream if you do this in the consumer
val maoList: Iterable[MessageAndOffset] = for(messageAndOffset <- messageSet if(numMessagesConsumed < maxMessages)) yield messageAndOffset for(messageAndOffset <- maoList.toList.reverse) { this way every read is the latest before the next earliest so when you fetch 18,19,20 you will see them coming in as 20,19,18 /******************************************* Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> ********************************************/ On Fri, Dec 6, 2013 at 7:02 PM, Steven Parkes <smpar...@smparkes.net> wrote: > Right. If you're not reading contiguously, you need to remember the ranges > that you have/haven't read. As long as you do that, it all works out, I > think. > > A kafka client always has to store the last offset it read. In the > simplest "both directions" case where you start with current and read in > both directions, you just need to remember the first offset you've read as > well. > > On Dec 6, 2013, at 3:50 PM, Joe Stein <joe.st...@stealth.ly> wrote: > > > you have to allow the fetchSize to be variable so in your example since > the > > new highwatermark is 12 and the last consumsed message is 10 > > > > fetchSize = if (highwatermark - lastConsumedOffset < 3) highwatermark - > > lastConsumedOffset else 3 > > > > the real trick though is missing messages having to keep track of more > than > > one index > > > > so lets say now you have 7 more published (13,14,15,16,17,18,19) > > > > you then read 17,18,19 (and while that happens 5 more are published ... > > 20,21,22,23,24) > > > > now when you read 22,23,24 ... you have to keep track of not only 22 as > the > > last read so you scoop up 20 and 21 but also remember still 17 so you get > > 16,15,14,13 > > > > so it can be done with some fancy logic to manage the index and offsets > > (and persist them) > > > > > > On Fri, Dec 6, 2013 at 6:44 PM, Otis Gospodnetic < > otis.gospodne...@gmail.com > >> wrote: > > > >> Hi, > >> > >> On Fri, Dec 6, 2013 at 6:32 PM, Steven Parkes <smpar...@smparkes.net> > >> wrote: > >> > >>> On Dec 6, 2013, at 2:03 PM, Otis Gospodnetic < > otis.gospodne...@gmail.com > >>> > >>> wrote: > >>> > >>>> but I think the > >>>> problem is that each time we grab we could get some of the same > >> messages > >>> we > >>>> already processed > >>> > >>> Doesn't setting the fetchSize to "how far back we need to grab" handle > >>> that? > >> > >> > >> I *think* it doesn't, but I'm wrong every day.... N times, so.... > >> > >> I think this is what would happen: > >> 1) imagine 10 messages in the broker m1 - m10 > >> 2) consumer grabs last N (=3): m8, m9, m10 > >> 3) while it's doing that and before consumer polls for more messages > >> producer publishes 2 more: m11 and m12 > >> 4) consumer now polls again. It asks broker for publisher offset and > gets > >> the answer: 12 > >> 5) good, says consumer, let me then fetch everything after offset > 12-3=9: > >> m10, m11, m12 > >> > >> Problem: consumer got m10 again, but it was already processed in 2). > >> > >> No? Please correct me if I'm wrong anywhere. > >> > >> Thanks, > >> Otis > >> -- > >> Performance Monitoring * Log Analytics * Search Analytics > >> Solr & Elasticsearch Support * http://sematext.com/ > >> > >