[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637179#comment-14637179
 ] 

Guozhang Wang commented on KAFKA-2350:
--------------------------------------

Currently there is already a function for retrieving the subscribed topic 
partitions today:

{code}
    public Set<TopicPartition> subscriptions() {
        acquire();
        try {
            return 
Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
        } finally {
            release();
        }
    }
{code}

which will for example remove the partition and hence change the returned 
values if consumer.unsubscribe(partition) is called.

I actually think [~becket_qin]'s approach will not cause much confusion 
regarding the APIs. More explicitly assuming we add another function 
"assignment()" that returns you the assigned partitions, the semantics of the 
other APIs will be:

{code}
consumer.subscribe(topic); // will not throw any exception, but will update the 
assignment as well as subscription in the next poll.

consumer.unsubscribe(topic);  // will throw an exception if the topic is not 
subscribed; otherwise will update the assignment and the subscription in the 
next poll.

consumer.assignment(); // return the assigned partitions

consumer.subscriptions();  // return the subscribed partitions, which is the 
same to the assigned partitions most of the time

consumer.subscribe(partition1);  // will throw an exception if partition is not 
in assignment(), saying "it is not assigned to you"

consumer.unsubscribe(partition2);  // will throw an exception if partition is 
not in subscriptions(), saying "it is not subscribed by yourself"
{code}

What I am more concerned about this approach is about the client 
implementation. Since it allows a client to be both using Kafka partition 
assignment and not during its life cycle, this could possibly make the client 
state more complicated to manage. For example:

{code}
consumer.subscribe(topic1); // using kafka for assignment, say we are assigned 
topic1-partition1 and topic1-partition2
consumer.poll();
consumer.subscribe(topic2-partition1); // subscribe to another partition 
explicitly without letting kafka coordinator to be aware of.
consumer.unsubscribe(topic1-partition1);  // now the subscription is 
topic1-partition2 and topic2-partition1, where the first is from Kafka 
assignment and the second is from explicit subscription.
{code}

> Add KafkaConsumer pause capability
> ----------------------------------
>
>                 Key: KAFKA-2350
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2350
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to