You can replay the messages with the high level consumer.... you can even start at whatever position you want.
Prior to your consumers starting call ZkUtils.maybeDeletePath(zkClientConnection, "/consumers/" + groupId) make sure you have in your consumer properties auto.offset.reset="smallest" This way you start at the beginning of the stream once the offsets are gone. If you have many consumers process launching within your group you might want to have a barrier ( http://zookeeper.apache.org/doc/r3.4.6/recipes.html#sc_recipes_eventHandles) so that only one of your launching consumer process does this... if you have only one process or have the ability to-do the operation administratively then no need. You can even trigger this to happen while they are all running...more code to write but 100% doable (and works well if you do it right) have them watch a node, get the notification, stop what they are doing, barrier, delete path (or change the value of the offset so you can start wherever you want), start again... You can also just change the groupId to something brand new when you start up with auto.offset.reset="smallest" in your properties, either way. The above is less lint in zk long term. It is all just 1s and 0s and just a matter of how many you put together yourself vs take out of the box that are given too you =8^) /******************************************* Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> ********************************************/ On Sat, Jan 17, 2015 at 2:11 PM, Manikumar Reddy <ku...@nmsworks.co.in> wrote: > AFAIK, we can not replay the messages with high level consumer. We need to > use simple consumer. > > On Sun, Jan 18, 2015 at 12:15 AM, Christopher Piggott <cpigg...@gmail.com> > wrote: > > > Thanks. That helped clear a lot up in my mind. > > > > I'm trying to high-level consumer now. Occasionally I need to do a > replay > > of the stream. The example is: > > > > KafkaStream.iterator(); > > > > which starts at wherever zookeeper recorded as where you left off. > > > > With the high level interface, can you request an iterator that starts at > > the very beginning? > > > > > > > > On Fri, Jan 16, 2015 at 8:55 PM, Manikumar Reddy <ku...@nmsworks.co.in> > > wrote: > > > > > Hi, > > > > > > 1. In SimpleConsumer, you must keep track of the offsets in your > > > application. > > > In the example code, "readOffset" variable can be saved in > > > redis/zookeeper. > > > You should plugin this logic in your code. High Level Consumer > stores > > > the last > > > read offset information in ZooKeeper. > > > > > > 2. You will get OffsetOutOfRange for any invalid offset. > > > On error, you can decide what to do. i.e read from the latest , > > earliest > > > or some other offset. > > > > > > 3. https://issues.apache.org/jira/browse/KAFKA-1779 > > > > > > 4. Yes > > > > > > > > > Manikumar > > > > > > On Sat, Jan 17, 2015 at 2:25 AM, Christopher Piggott < > cpigg...@gmail.com > > > > > > wrote: > > > > > > > Hi, > > > > > > > > I am following this link: > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example > > > > > > > > for a consumer design using kafka_2.9.2 version 0.8.1.1 (what I can > > find > > > in > > > > maven central). I have a couple of questions about the consumer. I > > > > checked the archives and didn't see these exact questions asked > > already, > > > > but I may have missed them -- I apologize if that is the case. > > > > > > > > > > > > When I create a consumer I give it a consumer ID. I assumed that it > > > would > > > > store my consumer's name as well as the last readOffset in zookeeper, > > but > > > > looking in zookeeper that doesn't seem to be the case. So it seems > to > > me > > > > that when my consumers come up they need to either get the entire > > history > > > > from the start of time (which could take a long time, as I have 14 > day > > > > durability); or else they need to somehow keep track of the read > offset > > > > themselves. > > > > > > > > I have redis in my system already, so I have the choice of keeping > > track > > > of > > > > this in either redis or zookeeper. It seems like zookeeper would be > a > > > > better idea. Am I right, though, that the SimpleConsumer and the > > > example I > > > > linked above don't keep track of this, so if I want to do that I > would > > > have > > > > to do it myself? > > > > > > > > Second question: in the example consumer, there is an error handler > > that > > > > checks if you received an OffsetOutOfRange response from kafka. If > so, > > > it > > > > gets a new read offset .LatestTime(). My interpretation of this is > > that > > > > you have asked it for an offset which doesn't make sense, so it just > > > scans > > > > you to the end of the stream. That's a guaranteed data loss. A > simple > > > > alternative would be to take the beginning of the stream, which if > you > > > have > > > > idempotent processing would be fine - it would be a replay - but it > > could > > > > take a long time. > > > > > > > > I don't know for sure what would cause you to get an > OffsetOutOfRange - > > > the > > > > only thing I can really think of is that someone has changed the > > > underlying > > > > stream on you (like they deleted and recreated it and didn't tell all > > the > > > > consumers). I guess it's possible that if I have a 1 day stream > > > durability > > > > and I stop my consumer for 3 days that it could ask for a readOffset > > that > > > > no longer exists; it's not clear to me whether or not that would > result > > > in > > > > an OffsetOutOfRange error or not. > > > > > > > > Does that all make sense? > > > > > > > > Third question: I set a .maxWait(1000) interpreting that to mean that > > > when > > > > I make my fetch request the consumer will time out if there are no > new > > > > messages in 1 second. It doesn't seem tow ork - my call to > > > > consumer.fetch() seems to return immediately. Is that expected? > > > > > > > > Final question: just to confirm: > > > > > > > > new FetchRequestBuilder().addFetch( topic, shardNum, readOffset, > > > > FETCH_SIZE ) > > > > > > > > FETCH_SIZE is in bytes, not number of messages, so presumably it > > fetches > > > as > > > > many messages as will fit into that many byte buffer? Is that right? > > > > > > > > Thanks. > > > > > > > > > > > > Christopher Piggott > > > > Sr. Staff Engineer > > > > Golisano Institute for Sustainability > > > > Rochester Institute of Technology > > > > > > > > > >