Thanks for the help. FWIW, I ended up writing a simple Util that I can use as my consumer is starting up to move the offset back. It *seems* to work decently. Thoughts? Would this be something that would be helpful for contribution back to Kafka, or is the idea just poor?
/** * Attempts to move the consumer offset back. If it has any issues, throws an exception. */ def moveConsumerOffsetBack(@Nonnull groupId: String, @Nonnull topic: String, partition: Long, approximateMessagesBack: Long) { Preconditions.checkNotNull(groupId) Preconditions.checkNotNull(topic) Preconditions.checkArgument(partition >= 0) Preconditions.checkArgument(approximateMessagesBack > 0) val path = ZkUtils.ConsumersPath + "/" + groupId + "/offsets/" + topic + "/" + partition if (zkClient.exists(path)) { val currentOffset = zkClient.readData[Any](path) // We get this from ZK as Any because the exact type is unpredictable. It might be Long or String. val desiredOffset = math.max(0, (currentOffset.toString.toLong - approximateMessagesBack)) zkClient.writeData(path, desiredOffset.toString) warn("Reset the " + topic + " consumer to " + desiredOffset) } else { throw new RuntimeException("Unable to find the move the consumer back in ZK. This may or may not be an issue, depending on whether you expect the path to exist. Path: " + path) } } On Tue, Mar 19, 2013 at 2:05 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > I guess I missed a step between 4 and 5 - > > 4. Replace the exported offsets with these offsets > *Use ImportZkOffsets to import the offsets from the modified export file.* > 5. Restart the consumer. > > Thanks, > Neha > > > On Tue, Mar 19, 2013 at 11:00 AM, S Ahmed <sahmed1...@gmail.com> wrote: > > > I thought since the offsets in .8 are numeric and not byte offsets like > in > > 0.7x, you can simply just take say the current offset - 10000. > > > > > > On Tue, Mar 19, 2013 at 12:16 PM, Neha Narkhede <neha.narkh...@gmail.com > > >wrote: > > > > > Jim, > > > > > > You can leverage the ExportZkOffsets/ImportZkOffsets tools to do this. > > > ExportZkOffsets exports the consumer offsets for your group to a file > in > > a > > > certain format. You can then place the desired offset per partition you > > > want to reset your consumer to in the exported file. > > > > > > 1. Shutdown the consumer > > > 2. Export current offsets > > > 3. Get the desired offset (current offset - 10K). As David mentions, > this > > > is approximate and might get you more than 10K messages if the data is > > > compressed. > > > 4. Replace the exported offsets with these offsets > > > 5. Restart the consumer. > > > > > > HTH, > > > Neha > > > > > > > > > On Tue, Mar 19, 2013 at 8:49 AM, David Arthur <mum...@gmail.com> > wrote: > > > > > > > This API is exposed through the SimpleConsumer scala class. See > > > > https://github.com/apache/**kafka/blob/trunk/core/src/** > > > > main/scala/kafka/consumer/**SimpleConsumer.scala#L60< > > > > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L60 > > > > > > > > > > > > You will need to set earliestOrLatest to -1 for the latest offset. > > > > > > > > There is also a command line tool https://github.com/apache/** > > > > > > > > > > kafka/blob/trunk/core/src/**main/scala/kafka/tools/**GetOffsetShell.scala< > > > > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/GetOffsetShell.scala > > > > > > > > > > > > -David > > > > > > > > > > > > On 3/19/13 11:25 AM, James Englert wrote: > > > > > > > >> I'm still a bit lost. Where is the offsets API? I.e. which class? > > > >> > > > >> > > > >> On Tue, Mar 19, 2013 at 11:16 AM, David Arthur <mum...@gmail.com> > > > wrote: > > > >> > > > >> Using the Offsets API, you can get the latest offset by setting > time > > to > > > >>> -1. Then you subtract 10000 > > > >>> > > > >>> There is no guarantee that 10k prior messages exist of course, so > > you'd > > > >>> need to handle that case. > > > >>> > > > >>> -David > > > >>> > > > >>> > > > >>> On 3/19/13 11:04 AM, James Englert wrote: > > > >>> > > > >>> Hi, > > > >>>> > > > >>>> I'm using Kafka 0.8. I would like to setup a consumer to fetch > the > > > last > > > >>>> 10,000 messages and then start consuming messages. > > > >>>> > > > >>>> I see the configuration autooffset.reset, but that isn't quite > what > > I > > > >>>> want. I want only the last 10,000 messages. > > > >>>> > > > >>>> Is there a good way to achieve this in 0.8, besides just hacking > the > > > >>>> data > > > >>>> in ZK? > > > >>>> > > > >>>> Thanks, > > > >>>> Jim > > > >>>> > > > >>>> > > > >>>> > > > >> > > > > > > > > > > -- Jim Englert Gilt Groupe 2 Park Ave South, 5th Floor New York, NY 10011 M: 847-707-2942 Please accept my invitation to join Gilt: http://www.giltgroupe.com/invite/jenglert