[ https://issues.apache.org/jira/browse/KAFKA-4226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15530986#comment-15530986 ]
Jason Gustafson commented on KAFKA-4226: ---------------------------------------- [~vahid] Are you working on a clean cluster? I was able to reproduce this with the following test case in {{PlaintextConsumerTest}}: {code} @Test(expected = classOf[NoOffsetForPartitionException]) def testNoOffsetForPausedPartition(): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) consumers += consumer0 consumer0.assign(List(tp).asJava) val partition = new TopicPartition("foo", 0) consumer0.assign(Collections.singleton(partition)) consumer0.pause(Collections.singleton(partition)) consumer0.poll(0) } {code} > Surprising NoOffsetForPartitionException for paused partition with no reset > policy > ---------------------------------------------------------------------------------- > > Key: KAFKA-4226 > URL: https://issues.apache.org/jira/browse/KAFKA-4226 > Project: Kafka > Issue Type: Improvement > Components: consumer > Reporter: Jason Gustafson > Priority: Minor > > If the user has no reset policy defined (i.e. auto.offset.reset is "none"), > then the consumer raises {{NoOffsetForPartitionException}} if it ever > encounters a situation in which it needs to reset the offset for that > partition. For example, this can happen when the consumer needs to set the > partition's initial position or if it encounters an out of range offset error > from a fetch. This option is helpful when you need direct control over the > behavior in these cases. > I was a little surprised that the consumer currently raises this exception > even if the partition is in a paused state. So the following code raises the > exception: > {code} > consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") > val consumer = new KafkaConsumer(consumerConfig) > consumer.assign(singleton(partition)) > consumer.pause(singleton(partition)) > consumer.poll(0) > {code} > Since we do not send any fetches when the partition is paused, it seems like > we could delay setting the offset for the partition until it is resumed. In > that case, the poll(0) would not raise in the example above. This would be a > relatively easy change, but I'm not sure if there are any downsides. -- This message was sent by Atlassian JIRA (v6.3.4#6332)