We are developing a new java consumer that addresses the offset reset issue better.
Thanks, Jun On Thu, Feb 19, 2015 at 11:53 PM, Arunkumar Srambikkal (asrambik) < asram...@cisco.com> wrote: > If I may use the same thread to discuss the exact same issue .... > > Assuming one can store the offset in an external location (redis/db etc), > along with the rest of the state that a program requires, wouldn't it be > possible to manage things such that, you use the High Level API with auto > commit turned off and do your custom offset management followed by the > kafka commit api call (probably delayed to give a breather to zookeeper)? > > That way in the failure scenario, the high level consumer offset would > ALWAYS be only smaller than what is actually valid and you can skip forward > and avoid using the simple consumer. > > I assume one needs the simple consumer in the offset management use case, > only we want to skip back to an older offset / use Kafka for storing > offsets? > > I was trying to handle the customer failure scenario but avoiding the > simple consumer and all the complexities it ensues. > > Does this work or is there anything wrong with this picture? > > Thanks > Arun > > On Thu, Feb 19, 2015 at 03:29:19PM +0000, Suren wrote: > > We are using the High Level Consumer API to interact with Kafka for our > normal use cases. > > > > However, on consumer restart in the case of consumer failures, we want > > to be able to manually reset offsets in certain situations. > > And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-) It > > looked like instantiating a SimpleConsumer just to reset offsets on > restart was a viable option, while continuing to use the High Level > Consumer for our normal operations. Not sure if there is a better way that > is compatible across 0.8.1 and 0.8.2. > > -Suren > > > > > > On Thursday, February 19, 2015 10:25 AM, Joel Koshy < > jjkosh...@gmail.com> wrote: > > > > > > Not sure what you mean by using the SimpleConsumer on failure > > recovery. Can you elaborate on this? > > > > On Thu, Feb 19, 2015 at 03:04:47PM +0000, Suren wrote: > > > Haven't used either one now. Sounds like 0.8.2.1 will help. > > > We are using the High Level Consumer generally but are thinking to use > the SimpleConsumer on failure recovery to set the offsets. > > > Is that the recommended approach for this use case? > > > Thanks. > > > -Suren > > > > > > > > > On Thursday, February 19, 2015 9:40 AM, Joel Koshy < > jjkosh...@gmail.com> wrote: > > > > > > > > > Are you using it from Java or Scala? i.e., are you using the > > >javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer > > > > > > In 0.8.2 javaapi we explicitly set version 0 of the > > > OffsetCommitRequest/OffsetFetchRequest which means it will > > > commit/fetch to/from ZooKeeper only. If you use the scala API you > > > can create an OffsetCommitRequest with version set to 1 (which will > > > allow you to commit to Kafka). > > > > > > Since we are doing an 0.8.2.1 release we will make the above more > > > consistent. i.e., you can create OffsetCommitRequests with version 1 > > > even from the javaapi. I will be updating the documentation on this > > > to make it clearer. > > > > > > Thanks, > > > > > > Joel > > > > > > On Thu, Feb 19, 2015 at 02:28:32PM +0000, Suren wrote: > > > > Joel, > > > > Looking at SimpleConsumer in the 0.8.2 code, it is using > OffsetCommitRequest and sending that over to a broker. > > > > Is the broker storing that in ZK? > > > > -Suren > > > > > > > > > > > > On Tuesday, February 17, 2015 12:22 PM, Joel Koshy < > jjkosh...@gmail.com> wrote: > > > > > > > > > > > > Hi Chris, > > > > > > > > In 0.8.2, the simple consumer Java API supports > > > > committing/fetching offsets that are stored in ZooKeeper. You > > > > don't need to issue any ConsumerMetadataRequest for this. > > > > Unfortunately, the API currently does not support fetching offsets > that are stored in Kafka. > > > > > > > > Thanks, > > > > > > > > Joel > > > > > > > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: > > > > > Hi, > > > > > > > > > > I am still using 0.8.1.1 because of the CPU use concerns. > > > > > > > > > > I'm confused about why the SimpleConsumer has: > > > > > > > > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request) > > > > > > > > > > and > > > > > > > > > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) > > > > > > > > > > but no way that I can see to issue a ConsumerMetadataRequest, > > > > > which is what I think when restarting my consumers so that they > > > > > can begin working where they last left off (in the event that > > > > > they were stopped for a while then restarted some time later, > > > > > and new messages had come in). > > > > > > > > > > The fetchOffsets() works on time, usually it looks like you send > > > > > it Earliest or Latest (beginning or end of what's currently in > > > > > the stream). > > > > > > > > > > I realize the documentation says this: > > > > > > > > > > > > > > > > *Downsides of using SimpleConsumer*The SimpleConsumer does > require a significant amount of work not needed in the Consumer Groups: > > > > > > > > > > > > 1. You must keep track of the offsets in your application to > know where you left off consuming. > > > > > > > > > > > > But that's not really quite true ... not as long as > commitOffsets() has been provided. It seems the SimpleConsumer provides > you with a solution to only one half of the problem of offset management. > > > > > > > > > > Using some zookeeper python scripts I wrote I can see that the > > > > > commitOffsets() is doing its job and writing to > > > > > > > > > > > > > > > /consumers/myGroupId/offsets/myTopic/0 > > > > > > > > > > > > > > > That has this value: > > > > > > > > > > ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, > > > > > ctime=1423777630972, > > > > > > mtime=1424122117397, version=12568262, cversion=0, aversion=0, > > > > > > ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679)) > > > > > > > > > > > > > > > Now the question is just how to retrieve that - do I really have > > > > > to have my client connect to ZK directly? If that's the case, > > > > > future upgrades would break (e.g. 0.8.2 having its own storage > > > > > for commit watermarks). > > > > > > > > > > > > > > > What was the intent here, and what's the advice on how to > > > > > proceed being that 0.8.2 is in an iffy state right now? > > > > > > > > > > > > > > > --Chris > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >