[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14636074#comment-14636074
 ] 

Jiangjie Qin commented on KAFKA-2350:
-------------------------------------

I am thinking that currently we keep two collections of topic partitions in 
KafkaConsumer, one for user subscription, the other for coordinator assignment. 
Can we do something to the existing code to let subscribe/unsubscribe support 
pause/unpause as well?

Maybe we can have one subscription set and one assigned partition validation 
set.
{code}
void subscribe(String topic)
void unsubscribe(String topic)
{code}
will affect both assigned partition set and subscription set. If Kafka based 
partition assignment is not used, assigned partition set will be null.

{code}
void subscribe(TopicPartition... partitions)
void unsubscribe(TopicPartition... partitions)
{code}
will only change the subscription set. Calling them won't trigger rebalance. 
But the topics subscribed to has to be in assigned partition set if it is null.

In this way, user can simply use
{code}
void subscribe(TopicPartitions... partitions)
void unsubscribe(TopicPartitions... partitons)
{code}
to do the pause and unpause.

Some other benefits might be:
1. We don't add two more interface to the already somewhat complicated API.
2. We get validation for manual subscription.

> Add KafkaConsumer pause capability
> ----------------------------------
>
>                 Key: KAFKA-2350
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2350
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to