[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14643349#comment-14643349 ]
Jason Gustafson commented on KAFKA-2350: ---------------------------------------- There's one interesting implementation note that [~yasuhiro.matsuda] and [~guozhang] brought up. When a partition is unpaused, there may be an active fetch which is parked on the broker. In the current implementation, the consumer will not initiate any new fetches until that fetch has completed. This means that the consumer will not be able to immediately process messages from the unpaused partition even if has some available. There are a couple ways this could be handled. We could issue the new fetch from a different socket on the client. We could also implement a way to cancel or override the active fetch on the broker. Since both of these make this patch significantly more complex, I think we should just note this limitation in the documentation and address it later if it becomes a larger problem. > Add KafkaConsumer pause capability > ---------------------------------- > > Key: KAFKA-2350 > URL: https://issues.apache.org/jira/browse/KAFKA-2350 > Project: Kafka > Issue Type: Improvement > Reporter: Jason Gustafson > Assignee: Jason Gustafson > > There are some use cases in stream processing where it is helpful to be able > to pause consumption of a topic. For example, when joining two topics, you > may need to delay processing of one topic while you wait for the consumer of > the other topic to catch up. The new consumer currently doesn't provide a > nice way to do this. If you skip calls to poll() or if you unsubscribe, then > a rebalance will be triggered and your partitions will be reassigned to > another consumer. The desired behavior is instead that you keep the partition > assigned and simply > One way to achieve this would be to add two new methods to KafkaConsumer: > {code} > void pause(TopicPartition... partitions); > void resume(TopicPartition... partitions); > {code} > Here is the expected behavior of pause/resume: > * When a partition is paused, calls to KafkaConsumer.poll will not initiate > any new fetches for that partition. > * After the partition is resumed, fetches will begin again. > * While a partition is paused, seek() and position() can still be used to > advance or query the current position. > * Rebalance does not preserve pause/resume state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)