On 9/8/15 6:58 PM, Jason Gustafson wrote:
> Hey Phil,
>
> You've stumbled onto one of the tricky aspects of the new consumer that
> we've been talking about recently. KafkaConsumer.subscribe() is
> asynchronous in the sense that it will return before partitions have been
> assigned. We could make it synchronous, but we wouldn't be able to
> guarantee how long the assignment would be active since other members of
> the group or metadata changes can cause the coordinator to rebalance the
> assignment. The best place to perform a seek would probably be in the
> rebalance callback, which can be passed through the alternative subscribe
> API. The code might look something like this:
>
> consumer.subscribe(topics, new RebalanceListener() {
>   void onPartitionsAssigned(List<TopicPartition> partitions) {
>     // seek to the initial offset for the assigned partitions here
>   }
>   void onPartitionsRevoked(List<TopicPartition> partitions) {
>     // commit offsets if you need to
>   }
> });
>
> while (true) {
>   ConsumerRecords records = consumer.poll(100);
>   // do stuff with records
> }
>
> Does that make sense?

Yes, this makes sense.  Thanks!

Phil
>
>
> Thanks,
> Jason
>
>
> On Tue, Sep 8, 2015 at 2:59 PM, Phil Steitz <phil.ste...@gmail.com> wrote:
>
>> I have been experimenting with the KafkaConsumer currently in
>> development [1].  Sorry if this should be a question for the user
>> list, but I am not sure if what I am seeing is something not working
>> yet or if I am misunderstanding the API.  If I use
>> KafkaConsumer#subscribe to subscribe to a topic and then try to use
>> seek(TopicPartion, offset) to position the consumer, I get an
>> IllegalStateException with message "No current assignment for
>> partition ...."  If I use assign instead to connect to the topic,
>> things work fine.  I can see why this is by looking at the
>> SubscriptionState code which is throwing the ISE because
>> SubscriptionState#seek expects to find an assignment, but
>> KafkaConsumer#subscribe does not make any.
>>
>> I know this is unreleased code and I am not looking for help here -
>> actually more like looking *to* help but just learning the code.
>> Happy to open a ticket with a test case if that will help or a patch
>> to the javadoc if I am misunderstanding the API and it can be made
>> clearer.
>>
>> Thanks!
>>
>> Phil
>>
>> [1] ff189fa05ccdacac100f3d15d167dcbe561f57a6
>>
>>


Reply via email to