This is an aaaaaancient thread, but I thought I'd point to http://blog.sematext.com/2015/11/04/kafka-real-time-stream-multi-topic-catch-up-trick/ which gives a short description of how we ended up implementing this. It seems to work well for us, but if there are better ways to do it, esp. now with 0.9 around the corner, I'm all eyeballs!
Otis -- Monitoring - Log Management - Alerting - Anomaly Detection Solr & Elasticsearch Consulting Support Training - http://sematext.com/ On Fri, Dec 6, 2013 at 7:09 PM, Joe Stein <joe.st...@stealth.ly> wrote: > yes, also you can read backwards on the stream if you do this in the > consumer > > val maoList: Iterable[MessageAndOffset] = for(messageAndOffset <- > messageSet if(numMessagesConsumed < maxMessages)) yield messageAndOffset > > for(messageAndOffset <- maoList.toList.reverse) { > > this way every read is the latest before the next earliest so when you > fetch 18,19,20 you will see them coming in as 20,19,18 > > /******************************************* > Joe Stein > Founder, Principal Consultant > Big Data Open Source Security LLC > http://www.stealth.ly > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > ********************************************/ > > > On Fri, Dec 6, 2013 at 7:02 PM, Steven Parkes <smpar...@smparkes.net> > wrote: > > > Right. If you're not reading contiguously, you need to remember the > ranges > > that you have/haven't read. As long as you do that, it all works out, I > > think. > > > > A kafka client always has to store the last offset it read. In the > > simplest "both directions" case where you start with current and read in > > both directions, you just need to remember the first offset you've read > as > > well. > > > > On Dec 6, 2013, at 3:50 PM, Joe Stein <joe.st...@stealth.ly> wrote: > > > > > you have to allow the fetchSize to be variable so in your example since > > the > > > new highwatermark is 12 and the last consumsed message is 10 > > > > > > fetchSize = if (highwatermark - lastConsumedOffset < 3) highwatermark - > > > lastConsumedOffset else 3 > > > > > > the real trick though is missing messages having to keep track of more > > than > > > one index > > > > > > so lets say now you have 7 more published (13,14,15,16,17,18,19) > > > > > > you then read 17,18,19 (and while that happens 5 more are published ... > > > 20,21,22,23,24) > > > > > > now when you read 22,23,24 ... you have to keep track of not only 22 as > > the > > > last read so you scoop up 20 and 21 but also remember still 17 so you > get > > > 16,15,14,13 > > > > > > so it can be done with some fancy logic to manage the index and offsets > > > (and persist them) > > > > > > > > > On Fri, Dec 6, 2013 at 6:44 PM, Otis Gospodnetic < > > otis.gospodne...@gmail.com > > >> wrote: > > > > > >> Hi, > > >> > > >> On Fri, Dec 6, 2013 at 6:32 PM, Steven Parkes <smpar...@smparkes.net> > > >> wrote: > > >> > > >>> On Dec 6, 2013, at 2:03 PM, Otis Gospodnetic < > > otis.gospodne...@gmail.com > > >>> > > >>> wrote: > > >>> > > >>>> but I think the > > >>>> problem is that each time we grab we could get some of the same > > >> messages > > >>> we > > >>>> already processed > > >>> > > >>> Doesn't setting the fetchSize to "how far back we need to grab" > handle > > >>> that? > > >> > > >> > > >> I *think* it doesn't, but I'm wrong every day.... N times, so.... > > >> > > >> I think this is what would happen: > > >> 1) imagine 10 messages in the broker m1 - m10 > > >> 2) consumer grabs last N (=3): m8, m9, m10 > > >> 3) while it's doing that and before consumer polls for more messages > > >> producer publishes 2 more: m11 and m12 > > >> 4) consumer now polls again. It asks broker for publisher offset and > > gets > > >> the answer: 12 > > >> 5) good, says consumer, let me then fetch everything after offset > > 12-3=9: > > >> m10, m11, m12 > > >> > > >> Problem: consumer got m10 again, but it was already processed in 2). > > >> > > >> No? Please correct me if I'm wrong anywhere. > > >> > > >> Thanks, > > >> Otis > > >> -- > > >> Performance Monitoring * Log Analytics * Search Analytics > > >> Solr & Elasticsearch Support * http://sematext.com/ > > >> > > > > >