Hi,

I am following this link:

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

for a consumer design using kafka_2.9.2 version 0.8.1.1 (what I can find in
maven central).  I have a couple of questions about the consumer.  I
checked the archives and didn't see these exact questions asked already,
but I may have missed them -- I apologize if that is the case.


When I create a consumer I give it a consumer ID.  I assumed that it would
store my consumer's name as well as the last readOffset in zookeeper, but
looking in zookeeper that doesn't seem to be the case.  So it seems to me
that when my consumers come up they need to either get the entire history
from the start of time (which could take a long time, as I have 14 day
durability); or else they need to somehow keep track of the read offset
themselves.

I have redis in my system already, so I have the choice of keeping track of
this in either redis or zookeeper.  It seems like zookeeper would be a
better idea.  Am I right, though, that the SimpleConsumer and the example I
linked above don't keep track of this, so if I want to do that I would have
to do it myself?

Second question: in the example consumer, there is an error handler that
checks if you received an OffsetOutOfRange response from kafka.  If so, it
gets a new read offset .LatestTime().  My interpretation of this is that
you have asked it for an offset which doesn't make sense, so it just scans
you to the end of the stream.  That's a guaranteed data loss.  A simple
alternative would be to take the beginning of the stream, which if you have
idempotent processing would be fine - it would be a replay - but it could
take a long time.

I don't know for sure what would cause you to get an OffsetOutOfRange - the
only thing I can really think of is that someone has changed the underlying
stream on you (like they deleted and recreated it and didn't tell all the
consumers).  I guess it's possible that if I have a 1 day stream durability
and I stop my consumer for 3 days that it could ask for a readOffset that
no longer exists; it's not clear to me whether or not that would result in
an OffsetOutOfRange error or not.

Does that all make sense?

Third question: I set a .maxWait(1000) interpreting that to mean that when
I make my fetch request the consumer will time out if there are no new
messages in 1 second.  It doesn't seem tow ork - my call to
consumer.fetch() seems to return immediately.  Is that expected?

Final question: just to confirm:

    new FetchRequestBuilder().addFetch( topic, shardNum, readOffset,
FETCH_SIZE )

FETCH_SIZE is in bytes, not number of messages, so presumably it fetches as
many messages as will fit into that many byte buffer?  Is that right?

Thanks.


Christopher Piggott
Sr. Staff Engineer
Golisano Institute for Sustainability
Rochester Institute of Technology

Reply via email to