[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14992208#comment-14992208 ]
Jiangjie Qin commented on KAFKA-2350: ------------------------------------- [~hachikuji] [~guozhang] I noticed that currently if we pause a partition, we only pause data fetch, but not pausing offset commit. Should we also pause offset commit for paused partitions? The motivation can be explained with the following example: In mirror maker, if a message send failed due to reason such as message size too large, today to ensure no data loss, we need to stop the entire mirror maker. But with pause, we can simply pause consumption from that partition without shutting down the whole pipeline. The problem is that the failed message has already been delivered, so when we do producer.flush then commit, that message will be committed. Later on if rebalance occurs and another consumer takes over this partition, that failed message will be lost. The workaround today is to: 1. Disable auto commit 2. Maintain an external offset map to commit offset with a map explicitly. 3. Implement a consumer rebalance listener to clean up the external map once rebalance occurs. This is very involved and we probably don't want to force user to keep an external map to solve this issue because it could be a very common consumer-process-write_to_downstream pattern. I suggest we do the following: 1. Stop committing offsets for a paused partition. 2. When pause() is called, if auto commit is turned on, we commit offset for the partition being paused. This way if user saw an error and does not want to pause a partition without losing messages, they can simply do {code} commitSync(Map(partition->offset_of_failed_message)); pause(partition) {code} When rebalance occurs, another consumer will see the failed message again and will again pause this partition. The only difference here is that user need to commit offset for a paused partition first if they are not using auto commit. But for those users who disable auto commits, they probably do care about the data loss. So it is much simpler than asking them to maintain an external offset map. Thoughts? > Add KafkaConsumer pause capability > ---------------------------------- > > Key: KAFKA-2350 > URL: https://issues.apache.org/jira/browse/KAFKA-2350 > Project: Kafka > Issue Type: Improvement > Components: consumer > Reporter: Jason Gustafson > Assignee: Jason Gustafson > Fix For: 0.9.0.0 > > > There are some use cases in stream processing where it is helpful to be able > to pause consumption of a topic. For example, when joining two topics, you > may need to delay processing of one topic while you wait for the consumer of > the other topic to catch up. The new consumer currently doesn't provide a > nice way to do this. If you skip calls to poll() or if you unsubscribe, then > a rebalance will be triggered and your partitions will be reassigned to > another consumer. The desired behavior is instead that you keep the partition > assigned and simply > One way to achieve this would be to add two new methods to KafkaConsumer: > {code} > void pause(TopicPartition... partitions); > void resume(TopicPartition... partitions); > {code} > Here is the expected behavior of pause/resume: > * When a partition is paused, calls to KafkaConsumer.poll will not initiate > any new fetches for that partition. > * After the partition is resumed, fetches will begin again. > * While a partition is paused, seek() and position() can still be used to > advance or query the current position. > * Rebalance does not preserve pause/resume state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)