[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14992208#comment-14992208
 ] 

Jiangjie Qin commented on KAFKA-2350:
-------------------------------------

[~hachikuji] [~guozhang] I noticed that currently if we pause a partition, we 
only pause data fetch, but not pausing offset commit. Should we also pause 
offset commit for paused partitions?

The motivation can be explained with the following example:
In mirror maker, if a message send failed due to reason such as message size 
too large, today to ensure no data loss, we need to stop the entire mirror 
maker. But with pause, we can simply pause consumption from that partition 
without shutting down the whole pipeline. The problem is that the failed 
message has already been delivered, so when we do producer.flush then commit, 
that message will be committed. Later on if rebalance occurs and another 
consumer takes over this partition, that failed message will be lost.

The workaround today is to:
1. Disable auto commit
2. Maintain an external offset map to commit offset with a map explicitly.
3. Implement a consumer rebalance listener to clean up the external map once 
rebalance occurs.

This is very involved and we probably don't want to force user to keep an 
external map to solve this issue because it could be a very common 
consumer-process-write_to_downstream pattern.

I suggest we do the following:
1. Stop committing offsets for a paused partition.
2. When pause() is called, if auto commit is turned on, we commit offset for 
the partition being paused.

This way if user saw an error and does not want to pause a partition without 
losing messages, they can simply do
{code}
commitSync(Map(partition->offset_of_failed_message));
pause(partition)
{code}

When rebalance occurs, another consumer will see the failed message again and 
will again pause this partition.

The only difference here is that user need to commit offset for a paused 
partition first if they are not using auto commit. But for those users who 
disable auto commits, they probably do care about the data loss. So it is much 
simpler than asking them to maintain an external offset map.

Thoughts?

> Add KafkaConsumer pause capability
> ----------------------------------
>
>                 Key: KAFKA-2350
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2350
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>             Fix For: 0.9.0.0
>
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
> a rebalance will be triggered and your partitions will be reassigned to 
> another consumer. The desired behavior is instead that you keep the partition 
> assigned and simply 
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(TopicPartition... partitions);
> void resume(TopicPartition... partitions);
> {code}
> Here is the expected behavior of pause/resume:
> * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
> any new fetches for that partition.
> * After the partition is resumed, fetches will begin again. 
> * While a partition is paused, seek() and position() can still be used to 
> advance or query the current position.
> * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to