[ 
https://issues.apache.org/jira/browse/KAFKA-4226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15530851#comment-15530851
 ] 

Vahid Hashemian commented on KAFKA-4226:
----------------------------------------

[~hachikuji] The following code does not produce any errors for me. Am I 
missing something?
{code}
    val consumerConfig = new Properties()
    consumerConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092")
    consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
    consumerConfig.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer")
    consumerConfig.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
    val consumer = new KafkaConsumer(consumerConfig)
    val partition = new TopicPartition("foo", 0)
    consumer.assign(Collections.singleton(partition))
    consumer.pause(Collections.singleton(partition))
    consumer.poll(0)
{code}

> Surprising NoOffsetForPartitionException for paused partition with no reset 
> policy
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-4226
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4226
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>            Reporter: Jason Gustafson
>            Priority: Minor
>
> If the user has no reset policy defined (i.e. auto.offset.reset is "none"), 
> then the consumer raises {{NoOffsetForPartitionException}} if it ever 
> encounters a situation in which it needs to reset the offset for that 
> partition. For example, this can happen when the consumer needs to set the 
> partition's initial position or if it encounters an out of range offset error 
> from a fetch. This option is helpful when you need direct control over the 
> behavior in these cases.
> I was a little surprised that the consumer currently raises this exception 
> even if the partition is in a paused state. So the following code raises the 
> exception:
> {code}
> consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
> val consumer = new KafkaConsumer(consumerConfig)
> consumer.assign(singleton(partition))
> consumer.pause(singleton(partition))
> consumer.poll(0)
> {code}
> Since we do not send any fetches when the partition is paused, it seems like 
> we could delay setting the offset for the partition until it is resumed. In 
> that case, the poll(0) would not raise in the example above. This would be a 
> relatively easy change, but I'm not sure if there are any downsides.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to