I thought about ConsumerRebalanceListener, but seeking to the
beginning any time there's a rebalance for whatever reason is not
necessarily the same thing as seeking to the beginning before first
starting the consumer.

On Wed, Mar 9, 2016 at 2:24 AM, Kamal C <kamaltar...@gmail.com> wrote:
> Cody,
>
> Use ConsumerRebalanceListener to achieve that,
>
> ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
>
>             @Override
>             public void onPartitionsRevoked(Collection<TopicPartition>
> partitions) {
>             }
>
>             @Override
>             public void onPartitionsAssigned(Collection<TopicPartition>
> partitions) {
>                 consumer.seekToBeginning(partitions.toArray(new
> TopicPartition[0]));
>             }
>         };
>
> consumer.subscribe(topics, listener);
>
> On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> That suggestion doesn't work, for pretty much the same reason - at the
>> time poll is first called, there is no reset policy and no committed
>> offset, so NoOffsetForPartitionException is thrown
>>
>> I feel like the underlying problem isn't so much that seekToEnd needs
>> special case behavior.  It's more that  topic metadata fetches,
>> consumer position fetches, and message fetches are all lumped together
>> under a single poll() call, with no way to do them individually if
>> necessary.
>>
>> What does "work" in this situation is to just catch the exception
>> (which leaves the consumer in a state where topics are assigned) and
>> then seek.  But that is not exactly an elegant interface.
>>
>>     consumer.subscribe(topics)
>>     try {
>>       consumer.poll(0)
>>     } catch {
>>       case x: Throwable =>
>>     }
>>     consumer.seekToBeginning()
>>     consumer.poll(0)
>>
>>
>>
>>
>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>> > Hi Cody,
>> >
>> > The problem with that code is in `seekToBeginning()` followed by
>> > `subscribe(topic)`.
>> >
>> > Since `subscribe` call is lazy evaluated, by the time `seekToBeginning()`
>> > is called no partition is assigned yet, and hence it is effectively an
>> > no-op.
>> >
>> > Try
>> >
>> >     consumer.subscribe(topics)
>> >     consumer.poll(0);  // get assigned partitions
>> >     consumer.seekToBeginning()
>> >     consumer.poll(0)
>> >
>> > to see if that works.
>> >
>> > I think it is a valid issue that can be fixed in the new consumer that,
>> > upon calling seekToEnd/Beginning with no parameter, while no assigned is
>> > done yet, do the coordination behind the scene; it will though change the
>> > behavior of the functions as they are no longer always lazily evaluated.
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>> >
>> >> Using the 0.9 consumer, I would like to start consuming at the
>> >> beginning or end, without specifying auto.offset.reset.
>> >>
>> >> This does not seem to be possible:
>> >>
>> >>     val kafkaParams = Map[String, Object](
>> >>       "bootstrap.servers" -> conf.getString("kafka.brokers"),
>> >>       "key.deserializer" -> classOf[StringDeserializer],
>> >>       "value.deserializer" -> classOf[StringDeserializer],
>> >>       "group.id" -> "example",
>> >>       "auto.offset.reset" -> "none"
>> >>     ).asJava
>> >>     val topics = conf.getString("kafka.topics").split(",").toList.asJava
>> >>     val consumer = new KafkaConsumer[String, String](kafkaParams)
>> >>     consumer.subscribe(topics)
>> >>     consumer.seekToBeginning()
>> >>     consumer.poll(0)
>> >>
>> >>
>> >> Results in:
>> >>
>> >> Exception in thread "main"
>> >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
>> >> Undefined offset with no reset policy for partition: testtwo-4
>> >>         at
>> >>
>> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
>> >>         at
>> >>
>> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
>> >>         at
>> >>
>> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
>> >>         at
>> >>
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
>> >>         at
>> >>
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>> >>         at example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)
>> >>
>> >>
>> >> I'm assuming this is because, at the time seekToBeginning() is called,
>> >> subscriptions.assignedPartitions isn't populated.  But polling in
>> >> order to assign topicpartitions results in an error, which creates a
>> >> chicken-or-the-egg situation.
>> >>
>> >> I don't want to set auto.offset.reset, because I want a hard error if
>> >> the offsets are out of range at any other time during consumption.
>> >>
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>>

Reply via email to