[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14636319#comment-14636319
 ] 

Jiangjie Qin commented on KAFKA-2350:
-------------------------------------

[~jkreps][~hachikuji], I actually was not proposing reuse
{code}
void subscribe(String topic)
void unsubscribe(String topic)
{code}
So I was thinking we follow the current convention which is:
1. If you are subscribing/unsubscribing to a partition explicitly, you are on 
your own
2. If you are subscribing/unsubscribing to a topic, we use consumer coordinator 
for partition assignment.

I assume the only use case we are trying to address is when user is using 
consumer coordinator and want to temporarily stop consuming from a topic 
without triggering a consumer rebalance.

If so, to unsubscribe from a topic we can do something like fowllowing
{code}
...
for(TopicPartition tp : consumer.assignedTopicPartitions.get(topic)) {
    unsubscribe(tp);
}
{code}
To resubscribe, we can do the similar but just call subscribe(tp) instead

This approach might need to expose an interface of assignedTopicPartitions(), 
but I can see that useful in quite a few use cases.

> Add KafkaConsumer pause capability
> ----------------------------------
>
>                 Key: KAFKA-2350
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2350
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to