[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638182#comment-14638182 ]
Jiangjie Qin commented on KAFKA-2350: ------------------------------------- [~yasuhiro.matsuda] [~jkreps] [~hachikuji] [~gwenshap] If we are using pause/unpause, does that mean when user are not using consumer coordinator, they are equivalent to subscribe(partitions)/unsubscribe(partitions)? I still don't understand why this is an "overloading" of current subscribe/unsubscribe API. The way I see how the KafkaConsumer API works is we have different methods to set different fields of a fetch request(offsets, topic, partitions), then we do a poll() using those settings. To me the definition of all the subscribe/unsubscribe methods stay unchanged: * Subscrube/Unsubscribe to a TOPIC means ** Involve consumer coordinator to do partition assignment ** Consumer rebalance will be triggered * Subscribe/Unsubscribe to a PARTITION means ** Do not involve consumer coordinator ** Consumer rebalance will not be triggered. The only change is that instead of naively reject a PARTITION sub/unsub when consumer coordinator is involved, we allow users to decide whether they want to change the setting for your next poll() to exclude some topic partitions that have been assigned to this consumer. Therefore I don't see why using subscribe(partitions)/unsubscribe(partitions) for pause and unpause consuming is a behavior re-definition. It looks to me that pause/unpause does the exact same thing as partition level subscribe/unsubscribe but we are adding them simply because we think user are using them for different use case. if so, does it mean we need to add yet another pair of interface if people are subscribe/unsubscribe partitions for some other use case? Then we are going to end up with a bunch of interfaces doing very similar or even exact same thing but with different names based on the use case. If the reason we don't like to use sub/unsub is because their names sound like purpose oriented and indicate a particular use case, we can change the name to something like addTopicPartition()/addTopic() (I know I am a terrible name picker, but hopefully you get what I wanted to say). > Add KafkaConsumer pause capability > ---------------------------------- > > Key: KAFKA-2350 > URL: https://issues.apache.org/jira/browse/KAFKA-2350 > Project: Kafka > Issue Type: Improvement > Reporter: Jason Gustafson > Assignee: Jason Gustafson > > There are some use cases in stream processing where it is helpful to be able > to pause consumption of a topic. For example, when joining two topics, you > may need to delay processing of one topic while you wait for the consumer of > the other topic to catch up. The new consumer currently doesn't provide a > nice way to do this. If you skip poll() or if you unsubscribe, then a > rebalance will be triggered and your partitions will be reassigned. > One way to achieve this would be to add two new methods to KafkaConsumer: > {code} > void pause(String... topics); > void unpause(String... topics); > {code} > When a topic is paused, a call to KafkaConsumer.poll will not initiate any > new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)