Hi Cody,

The problem with that code is in `seekToBeginning()` followed by
`subscribe(topic)`.

Since `subscribe` call is lazy evaluated, by the time `seekToBeginning()`
is called no partition is assigned yet, and hence it is effectively an
no-op.

Try

    consumer.subscribe(topics)
    consumer.poll(0);  // get assigned partitions
    consumer.seekToBeginning()
    consumer.poll(0)

to see if that works.

I think it is a valid issue that can be fixed in the new consumer that,
upon calling seekToEnd/Beginning with no parameter, while no assigned is
done yet, do the coordination behind the scene; it will though change the
behavior of the functions as they are no longer always lazily evaluated.


Guozhang


On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Using the 0.9 consumer, I would like to start consuming at the
> beginning or end, without specifying auto.offset.reset.
>
> This does not seem to be possible:
>
>     val kafkaParams = Map[String, Object](
>       "bootstrap.servers" -> conf.getString("kafka.brokers"),
>       "key.deserializer" -> classOf[StringDeserializer],
>       "value.deserializer" -> classOf[StringDeserializer],
>       "group.id" -> "example",
>       "auto.offset.reset" -> "none"
>     ).asJava
>     val topics = conf.getString("kafka.topics").split(",").toList.asJava
>     val consumer = new KafkaConsumer[String, String](kafkaParams)
>     consumer.subscribe(topics)
>     consumer.seekToBeginning()
>     consumer.poll(0)
>
>
> Results in:
>
> Exception in thread "main"
> org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
> Undefined offset with no reset policy for partition: testtwo-4
>         at
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
>         at
> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>         at example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)
>
>
> I'm assuming this is because, at the time seekToBeginning() is called,
> subscriptions.assignedPartitions isn't populated.  But polling in
> order to assign topicpartitions results in an error, which creates a
> chicken-or-the-egg situation.
>
> I don't want to set auto.offset.reset, because I want a hard error if
> the offsets are out of range at any other time during consumption.
>



-- 
-- Guozhang

Reply via email to