[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639080#comment-14639080 ]
Jay Kreps commented on KAFKA-2350: ---------------------------------- [~becket_qin] I think there are three proposals: 1. Add pause/unpause 2. Allow subscribe(topic) followed by unsubscribe(partition) to subscribe to a topic but suppress a partition 3. Either of the above but making the use of group management explicit using an enable.group.management flag I'm in favor of (2) if it is possible to work out the corner cases and the implementation doesn't cause us to go crazy. I think you are saying that having unsubscribe mean "suppress" is actually somewhat sensible. I'm in favor of (1) otherwise. I'm not in favor of (3) because the two different modes. Let me point out a few of the weird bits of (2), though. Since we now allow mingling of subscribe(topic) and subscribe(partition) we have to work out all the combinations. The case where you do subscribe(topic) unsubscribe(partition) is really clear it means give me a set of partitions but suppress this particular partition. Likewise I think subscribe(partition) subscribe(topic) also makes sense. You are saying give me this particular partition plus whatever else the coordinator assigns me. But what about unsubscribe(partition) subscribe(topic) do you still get the same suppression effect? But now this is a bit weird: subscribe(partition) subscribe(topic) unsubscribe(partition) Does the unsubscribe call suppress the partition or not? The first two calls normally mean subscribe me to whatever the co-ordinator gives me plus a given partition. The last two calls normally mean subscribe me to whatever the co-ordinator gives me except this given partition. But is the result of combining these two the same as subscribe(partition) unsubscribe(partition) subscribe(topic) In other words I think all this implies that there are now three states for a partition: SUBSCRIBED, NOT_SUBSCRIBED, SUPPRESSED? This is what I think we'd have to work out to make your proposal feasible. > Add KafkaConsumer pause capability > ---------------------------------- > > Key: KAFKA-2350 > URL: https://issues.apache.org/jira/browse/KAFKA-2350 > Project: Kafka > Issue Type: Improvement > Reporter: Jason Gustafson > Assignee: Jason Gustafson > > There are some use cases in stream processing where it is helpful to be able > to pause consumption of a topic. For example, when joining two topics, you > may need to delay processing of one topic while you wait for the consumer of > the other topic to catch up. The new consumer currently doesn't provide a > nice way to do this. If you skip poll() or if you unsubscribe, then a > rebalance will be triggered and your partitions will be reassigned. > One way to achieve this would be to add two new methods to KafkaConsumer: > {code} > void pause(String... topics); > void unpause(String... topics); > {code} > When a topic is paused, a call to KafkaConsumer.poll will not initiate any > new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)