I'm finding that if I continuously produce values to a topic (say,
once every 2 seconds), and in another thread, query the head and tail
offsets of a topic, then sometimes I see the head offset increasing,
sometimes its frozen. What's up with that?

I'm using scala client: 0.8.2 and server: 2.9.2-0.8.1.1

I querying the head and tail offsets like this:

  private def getOffset(consumer: SimpleConsumer, topic: String,
partition: Int, whichTime: Long): Long = {
    val topicAndPartition = new TopicAndPartition(topic, partition);
    val response = consumer.earliestOrLatestOffset(
       topicAndPartition,
       earliestOrLatest = whichTime,
       consumerId= 0);
    return response;
  }

  case class HeadAndTailOffsets(head: Long, tail: Long)

  def getHeadAndTailOffsets(consumer: SimpleConsumer, topic: String,
partition: Int = 0): HeadAndTailOffsets =
    HeadAndTailOffsets(
      head = getOffset(consumer, topic, partition,
kafka.api.OffsetRequest.EarliestTime),
      tail = getOffset(consumer, topic, partition,
kafka.api.OffsetRequest.LatestTime))

-----
If I run a producer, consumer, and offset reporter threads. On the
first run I might get something like this:


-----
producer           consumer             offsets
                   offset,message           head,tail
  "MSG-0"           0, "MSG-0"              0,1
  "MSG-1"           1, "MSG-1"              0,2
  "MSG-2"           2, "MSG-2"              0,3
    ...              ...                    ...
-----
On subsequent runs, I might see something like this:
-----
     producer          consumer               offsets
                    offset,message          head,tail
      "MSG-0"          10, "MSG-0”             0,21 ** tail is frozen
      "MSG-1"          11, "MSG-1"             0,21
      "MSG-2"          12, "MSG-2”             0,21 ** lies, damn lies
       ..
      "MSG-31"         31,"MSG-21"             0,21
      "MSG-32"         31,"MSG-22"             0,21
        ...              ...                    ...
-----
i.e. the consumer sees increasing offsets with the received messages,
but the thread reporting the topic's head and tail offsets is frozen.

Is this a client bug or an issue with my usage?

I have a fuller code sample here:
  
http://stackoverflow.com/questions/28663714/why-is-kafkas-latest-offset-report-sometimes-frozen

Thanks
- Stuart

Reply via email to