[ 
https://issues.apache.org/jira/browse/KAFKA-8279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin updated KAFKA-8279:
-------------------------
    Description: 
Per 
[https://kafka.apache.org/22/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html],
 the "position" offset is $latest_offset_customer_has_seem_from_poll + 1; i.e., 
it is the offset of the next message the consumer will see. If this is the case 
though, there would be a bug - both KafkaConsumer and MockConsumer on v2.2 and 
v2.0.1 (my version) have 'commitSync' methods that are:

'commitOffsetsSync(subscriptions.allConsumed(), timeout.toMillis())'

This would mean that each commit actually commits one message per owned 
partition that the consumer has yet to see. This wouldn't be a problem under 
normal operation as the consumer would still get that message on its next poll, 
but if the consumer were to die between the commitSync and the next poll then 
it would restart and skip that message. This would be a pretty serious bug!

However, digging into SubscriptionState itself, I see:
{code:java}
* Once assigned, the partition is not considered "fetchable" until its initial 
position has
* been set with {@link #seek(TopicPartition, long)}. Fetchable partitions track 
a fetch
* position which is used to set the offset of the next fetch, and a consumed 
position
* which is the last offset that has been returned to the user{code}
Implying that the "position" offset is actually the last position returned to 
the user, which would make the bug only in MockConsumer which sets its 
subscriptions.position to be $highest_offset_seen + 1.

Which meaning do we expect "position" to be? And could there be any other bugs 
lurking due to the confusion?

  was:
Per 
[https://kafka.apache.org/22/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html],
 the "position" offset is $latest_offset_customer_has_seem_from_poll + 1; i.e., 
it is the offset of the next message the consumer will see. If this is the case 
though, there would be a bug - both KafkaConsumer and MockConsumer on v2.2 and 
v2.0.1 (my version) have 'commitSync' methods that are:

'commitOffsetsSync(subscriptions.allConsumed(), timeout.toMillis())'

This would mean that each commit actually commits one message per owned 
partition that the consumer has yet to see. This wouldn't be a problem under 
normal operation as the consumer would still get that message on its next poll, 
but if the consumer were to die between the commitSync and the next poll then 
it would restart and skip that message. This would be a pretty serious bug!

However, digging into SubscriptionState itself, I see:
{code:java}
* Once assigned, the partition is not considered "fetchable" until its initial 
position has
* been set with {@link #seek(TopicPartition, long)}. Fetchable partitions track 
a fetch
* position which is used to set the offset of the next fetch, and a consumed 
position
* which is the last offset that has been returned to the user{code}
Implying that the "position" offset is actually the last position returned to 
the user, which would make the bug only in MockConsumer which sets its 
subscriptions.position to be $highest_offset_seen + 1.

Which meaning do we expect "position" to be?


> Discrepancy around what SubscriptionState.allConsumed returns
> -------------------------------------------------------------
>
>                 Key: KAFKA-8279
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8279
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.0.1
>            Reporter: Kevin
>            Priority: Major
>
> Per 
> [https://kafka.apache.org/22/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html],
>  the "position" offset is $latest_offset_customer_has_seem_from_poll + 1; 
> i.e., it is the offset of the next message the consumer will see. If this is 
> the case though, there would be a bug - both KafkaConsumer and MockConsumer 
> on v2.2 and v2.0.1 (my version) have 'commitSync' methods that are:
> 'commitOffsetsSync(subscriptions.allConsumed(), timeout.toMillis())'
> This would mean that each commit actually commits one message per owned 
> partition that the consumer has yet to see. This wouldn't be a problem under 
> normal operation as the consumer would still get that message on its next 
> poll, but if the consumer were to die between the commitSync and the next 
> poll then it would restart and skip that message. This would be a pretty 
> serious bug!
> However, digging into SubscriptionState itself, I see:
> {code:java}
> * Once assigned, the partition is not considered "fetchable" until its 
> initial position has
> * been set with {@link #seek(TopicPartition, long)}. Fetchable partitions 
> track a fetch
> * position which is used to set the offset of the next fetch, and a consumed 
> position
> * which is the last offset that has been returned to the user{code}
> Implying that the "position" offset is actually the last position returned to 
> the user, which would make the bug only in MockConsumer which sets its 
> subscriptions.position to be $highest_offset_seen + 1.
> Which meaning do we expect "position" to be? And could there be any other 
> bugs lurking due to the confusion?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to