Hi,

obviously this should be build different IMHO (unless I fail to see something that prevents you from doing this).
When you realize you fall behind do this:

1. remember your current
2. get the latest offset
3. fork a process to replicate from the current offset +1 to the latest one just fetched.
4. reset your offset to the latest +1

I fail to see the reason why a topic change would be needed. If the downstream app cant handle the data, you are in trouble anyways, and if your message processing is the bottleneck more partitions might also be an option. I would try to not do any tricks and find out what the lag is caused by and then fix whatever causes the lag. Its 1 am in Germany there might be off by one errors in the algorithm above.

Best
Jan


On 04.11.2015 18:13, Otis Gospodnetić wrote:
This is an aaaaaancient thread, but I thought I'd point to
http://blog.sematext.com/2015/11/04/kafka-real-time-stream-multi-topic-catch-up-trick/
which gives a short description of how we ended up implementing this.
It seems to work well for us, but if there are better ways to do it, esp.
now with 0.9 around the corner, I'm all eyeballs!

Otis
--
Monitoring - Log Management - Alerting - Anomaly Detection
Solr & Elasticsearch Consulting Support Training - http://sematext.com/


On Fri, Dec 6, 2013 at 7:09 PM, Joe Stein <joe.st...@stealth.ly> wrote:

yes, also you can read backwards on the stream if you do this in the
consumer

val maoList: Iterable[MessageAndOffset] = for(messageAndOffset <-
messageSet if(numMessagesConsumed < maxMessages)) yield messageAndOffset

for(messageAndOffset <- maoList.toList.reverse) {

this way every read is the latest before the next earliest so when you
fetch 18,19,20 you will see them coming in as 20,19,18

/*******************************************
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/


On Fri, Dec 6, 2013 at 7:02 PM, Steven Parkes <smpar...@smparkes.net>
wrote:

Right. If you're not reading contiguously, you need to remember the
ranges
that you have/haven't read. As long as you do that, it all works out, I
think.

A kafka client always has to store the last offset it read. In the
simplest "both directions" case where you start with current and read in
both directions, you just need to remember the first offset you've read
as
well.

On Dec 6, 2013, at 3:50 PM, Joe Stein <joe.st...@stealth.ly> wrote:

you have to allow the fetchSize to be variable so in your example since
the
new highwatermark is 12 and the last consumsed message is 10

fetchSize = if (highwatermark - lastConsumedOffset < 3) highwatermark -
lastConsumedOffset else 3

the real trick though is missing messages having to keep track of more
than
one index

so lets say now you have 7 more published (13,14,15,16,17,18,19)

you then read 17,18,19 (and while that happens 5 more are published ...
20,21,22,23,24)

now when you read 22,23,24 ... you have to keep track of not only 22 as
the
last read so you scoop up 20 and 21 but also remember still 17 so you
get
16,15,14,13

so it can be done with some fancy logic to manage the index and offsets
(and persist them)


On Fri, Dec 6, 2013 at 6:44 PM, Otis Gospodnetic <
otis.gospodne...@gmail.com
wrote:
Hi,

On Fri, Dec 6, 2013 at 6:32 PM, Steven Parkes <smpar...@smparkes.net>
wrote:

On Dec 6, 2013, at 2:03 PM, Otis Gospodnetic <
otis.gospodne...@gmail.com
wrote:

but I think the
problem is that each time we grab we could get some of the same
messages
we
already processed
Doesn't setting the fetchSize to "how far back we need to grab"
handle
that?

I *think* it doesn't, but I'm wrong every day.... N times, so....

I think this is what would happen:
1) imagine 10 messages in the broker m1 - m10
2) consumer grabs last N (=3): m8, m9, m10
3) while it's doing that and before consumer polls for more messages
producer publishes 2 more: m11 and m12
4) consumer now polls again. It asks broker for publisher offset and
gets
the answer: 12
5) good, says consumer, let me then fetch everything after offset
12-3=9:
m10, m11, m12

Problem: consumer got m10 again, but it was already processed in 2).

No?  Please correct me if I'm wrong anywhere.

Thanks,
Otis
--
Performance Monitoring * Log Analytics * Search Analytics
Solr & Elasticsearch Support * http://sematext.com/



Reply via email to