That suggestion doesn't work, for pretty much the same reason - at the
time poll is first called, there is no reset policy and no committed
offset, so NoOffsetForPartitionException is thrown

I feel like the underlying problem isn't so much that seekToEnd needs
special case behavior.  It's more that  topic metadata fetches,
consumer position fetches, and message fetches are all lumped together
under a single poll() call, with no way to do them individually if
necessary.

What does "work" in this situation is to just catch the exception
(which leaves the consumer in a state where topics are assigned) and
then seek.  But that is not exactly an elegant interface.

    consumer.subscribe(topics)
    try {
      consumer.poll(0)
    } catch {
      case x: Throwable =>
    }
    consumer.seekToBeginning()
    consumer.poll(0)




On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> Hi Cody,
>
> The problem with that code is in `seekToBeginning()` followed by
> `subscribe(topic)`.
>
> Since `subscribe` call is lazy evaluated, by the time `seekToBeginning()`
> is called no partition is assigned yet, and hence it is effectively an
> no-op.
>
> Try
>
>     consumer.subscribe(topics)
>     consumer.poll(0);  // get assigned partitions
>     consumer.seekToBeginning()
>     consumer.poll(0)
>
> to see if that works.
>
> I think it is a valid issue that can be fixed in the new consumer that,
> upon calling seekToEnd/Beginning with no parameter, while no assigned is
> done yet, do the coordination behind the scene; it will though change the
> behavior of the functions as they are no longer always lazily evaluated.
>
>
> Guozhang
>
>
> On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> Using the 0.9 consumer, I would like to start consuming at the
>> beginning or end, without specifying auto.offset.reset.
>>
>> This does not seem to be possible:
>>
>>     val kafkaParams = Map[String, Object](
>>       "bootstrap.servers" -> conf.getString("kafka.brokers"),
>>       "key.deserializer" -> classOf[StringDeserializer],
>>       "value.deserializer" -> classOf[StringDeserializer],
>>       "group.id" -> "example",
>>       "auto.offset.reset" -> "none"
>>     ).asJava
>>     val topics = conf.getString("kafka.topics").split(",").toList.asJava
>>     val consumer = new KafkaConsumer[String, String](kafkaParams)
>>     consumer.subscribe(topics)
>>     consumer.seekToBeginning()
>>     consumer.poll(0)
>>
>>
>> Results in:
>>
>> Exception in thread "main"
>> org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
>> Undefined offset with no reset policy for partition: testtwo-4
>>         at
>> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
>>         at
>> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
>>         at
>> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
>>         at
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
>>         at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>>         at example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)
>>
>>
>> I'm assuming this is because, at the time seekToBeginning() is called,
>> subscriptions.assignedPartitions isn't populated.  But polling in
>> order to assign topicpartitions results in an error, which creates a
>> chicken-or-the-egg situation.
>>
>> I don't want to set auto.offset.reset, because I want a hard error if
>> the offsets are out of range at any other time during consumption.
>>
>
>
>
> --
> -- Guozhang

Reply via email to