[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638182#comment-14638182
 ] 

Jiangjie Qin commented on KAFKA-2350:
-------------------------------------

[~yasuhiro.matsuda] [~jkreps] [~hachikuji] [~gwenshap] If we are using 
pause/unpause, does that mean when user are not using consumer coordinator, 
they are equivalent to subscribe(partitions)/unsubscribe(partitions)?

I still don't understand why this is an "overloading" of current 
subscribe/unsubscribe API. The way I see how the KafkaConsumer API works is we 
have different methods to set different fields of a fetch request(offsets, 
topic, partitions), then we do a poll() using those settings. 

To me the definition of all the subscribe/unsubscribe methods stay unchanged:
* Subscrube/Unsubscribe to a TOPIC means
** Involve consumer coordinator to do partition assignment
** Consumer rebalance will be triggered
* Subscribe/Unsubscribe to a PARTITION means
** Do not involve consumer coordinator
** Consumer rebalance will not be triggered.

The only change is that instead of naively reject a PARTITION sub/unsub when 
consumer coordinator is involved, we allow users to decide whether they want to 
change the setting for your next poll() to exclude some topic partitions that 
have been assigned to this consumer.

Therefore I don't see why using subscribe(partitions)/unsubscribe(partitions) 
for pause and unpause consuming is a behavior re-definition. It looks to me 
that pause/unpause does the exact same thing as partition level 
subscribe/unsubscribe but we are adding them simply because we think user are 
using them for different use case. if so, does it mean we need to add yet 
another pair of interface if people are subscribe/unsubscribe partitions for 
some other use case? Then we are going to end up with a bunch of interfaces 
doing very similar or even exact same thing but with different names based on 
the use case. 

If the reason we don't like to use sub/unsub is because their names sound like 
purpose oriented and indicate a particular use case, we can change the name to 
something like addTopicPartition()/addTopic() (I know I am a terrible name 
picker, but hopefully you get what I wanted to say).

> Add KafkaConsumer pause capability
> ----------------------------------
>
>                 Key: KAFKA-2350
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2350
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to