[ 
https://issues.apache.org/jira/browse/KAFKA-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14091100#comment-14091100
 ] 

Joel Koshy commented on KAFKA-1510:
-----------------------------------

[~nmarasoi] I realized later there is actually a flaw in how we get rid of 
offset commits from old (non-existent) consumers.

As of now, the offset manager does the following: it periodically goes through 
its entire cache (i.e., hashtable of offsets) and extracts those entries that 
have a timestamp earlier than some staleness threshold. It then proceeds to add 
tombstones for those entries in the offsets commit log.

The problem with this approach as it stands is similar to the original issue 
that this jira intended to address. A live consumer may be consuming a low 
volume topic and its offset may change infrequently. i.e., its offset may not 
move within the staleness threshold. If we delete the offset and a consumer 
rebalance occurs and fetches that offset, then depending on the 
auto.offset.reset configuration, it will pick up the new latest offset of the 
topic (in which case the consumer could lose some messages) or the earliest 
offset (in which case the consumer will see duplicates).

I think the fix for this is the following and I'm backtracking to what I 
earlier wrote and later (incorrectly) thought was unnecessary:

A consumer implementation can optionally choose to selectively commit only 
offsets that have changed since the last commit. HOWEVER, there should be a 
configurable interval at which the consumer should always commit ALL its 
offsets regardless of whether it has changed or not.


> Force offset commits when migrating consumer offsets from zookeeper to kafka
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-1510
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1510
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8.2
>            Reporter: Joel Koshy
>            Assignee: Joel Koshy
>              Labels: newbie
>             Fix For: 0.8.2
>
>         Attachments: kafka-1510.patch
>
>
> When migrating consumer offsets from ZooKeeper to kafka, we have to turn on 
> dual-commit (i.e., the consumers will commit offsets to both zookeeper and 
> kafka) in addition to setting offsets.storage to kafka. However, when we 
> commit offsets we only commit offsets if they have changed (since the last 
> commit). For low-volume topics or for topics that receive data in bursts 
> offsets may not move for a long period of time. Therefore we may want to 
> force the commit (even if offsets have not changed) when migrating (i.e., 
> when dual-commit is enabled) - we can add a minimum interval threshold (say 
> force commit after every 10 auto-commits) as well as on rebalance and 
> shutdown.
> Also, I think it is safe to switch the default for offsets.storage from 
> zookeeper to kafka and set the default to dual-commit (for people who have 
> not migrated yet). We have deployed this to the largest consumers at linkedin 
> and have not seen any issues so far (except for the migration caveat that 
> this jira will resolve).



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to