[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14643418#comment-14643418 ]
Jiangjie Qin edited comment on KAFKA-2350 at 7/27/15 9:45 PM: -------------------------------------------------------------- [~hachikuji], I am with [~guozhang] that it is much clear if we only support either auto partition assignment or manual partition assignment, but not mixed mode. I assumed pause/unpause will only be used when auto partition assignment is used, so we can check the assigned partition set. If it is under manual partition assignment, for the seek(), I assume that you will start consume after seek? If so, it might be the same as: {code} subscribe(tp) seek(tp, offset) poll() {code} [~guozhang], I see your point. I am convinced that for people who are using auto partition assignment, pause/unpause is more intuitive than using partition level sub/unsub. I am not really oppose to having them. What I was worrying is that we are adding methods that are intuitive to some particular use case, but potentially open the door for adding APIs that only have subtle differences. If we take a closer look at our API, there are potentially some other cases we can argue for new API. e.g. user might want to have auto commit turn on only for some but not all of the partitions they subscribed to. User might want to find a list of offsets of a partition within a time range. These are all different use cases, but likely can be solved with some lower level API calls instead of having a dedicate intuitive API for each of them. I kind of feel the dilemma we are facing now is that in new consumer, we try to address both the high level consumer and low level consumer use cases. pause/unpause looks to me a medium-to-low level use case. As the higher level requirements can vary a lot and have subtle difference from one to another, the question to be answered is that should we expose a high level interface for each of the high level use case? Or should we just ask user to use a lower level API as long as we support the functionality. My understanding is that for high level consumer use cases, hopefully we don't need user to care too much about the underlying mechanism. For people who wants to deal with lower level concept such as offsets, partition assignment, temporary consumption suspension, instead of having on high level API written for each of the use cases, letting user use a lower level API makes sense to me. In terms of the example you mentioned, can we solve them by throwing appropriate exceptions? {code} // Auto partition assignment subscribe(topic1) // assuming only topic1-partition0 is assigned to this consumer. subscribe(topic1-partition1) // throw IllegalStateException(Cannot subscribe to topic1-partition1 because topic1 is managed by consumer coordinator) unsubscribe(topic1-partition1) // throw IllegalStateException(Topic1-partiion1 is managed by consumer coordinator, and topic1-partition1 is not assigned to this consumer.) {code} {code} // Manual partition assignment subscribe(topic1-partition0) subscribe(topic1) // throw IllegalStateException(Cannot subscribe to topic1 because the assignment of topic1 is manually managed) unsubscribe(topic1-partition1) // throw IllegalStateException(Cannot unsubscribe topic1-partition1 because it is not subscribed) {code} [~nehanarkhede], what you said makes a lot of sense. I guess I'm just looking at the problem from a different angle - user either wants to consume from a topic or not at a certain point, whether temporarily or permanently. So the state I see is only CONSUME/NOT_CONSUME. The PAUSE state for consumer would be pretty much the same as NOT_CONSUME. I might have missed some use case like [~hachikuji] mentioned - seek() on a PAUSE partition, but that can be solved by calling subscribe() first. [~gwenshap], good point about heartbeat. We actually got some feedback from users in LinkedIn and found that putting the responsibility of sending heartbeat on user might be a problem in the first place... We may have pause/unpause as a workaround, but the ultimate issue is that maybe we are asking too much from user to maintain the heartbeat... was (Author: becket_qin): [~hachikuji], I am with [~guozhang] that it is much clear if we only support either auto partition assignment or manual partition assignment, but not mixed mode. I assumed pause/unpause will only be used when auto partition assignment is used, so we can check the assigned partition set. If it is under manual partition assignment, for the seek(), I assume that you will start consume after seek? If so, it might be the same as: {code} subscribe(tp) seek(tp, offset) poll() {code} [~guozhang], I see your point. I am convinced that for people who are using auto partition assignment, pause/unpause is more intuitive than using partition level sub/unsub. I am not really oppose to having them. What I was worrying is that we are adding methods that are intuitive to some particular use case, but potentially open the door for adding APIs that only have subtle differences. If we take a closer look at our API, there are potentially some other cases we can argue for new API. e.g. user might want to have auto commit turn on only for some but not all of the partitions they subscribed to. User might want to find a list of offsets of a partition within a time range. These are all different use cases, but likely can be solved with some lower level API calls instead of having a dedicate intuitive API for each of them. I kind of feel the dilemma we are facing now is that in new consumer, we try to address both the high level consumer and low level consumer use cases. pause/unpause looks to me a medium-to-low level use case. As the higher level requirements can vary a lot and have subtle difference from one to another, the question to be answered is that should we expose a high level interface for each of the high level use case? Or should we just ask user to use a lower level API as long as we support the functionality. My understanding is that for high level consumer use cases, hopefully we don't need user to care too much about the underlying mechanism. For people who wants to deal with lower level concept such as offsets, partition assignment, temporary consumption suspension, instead of having on high level API written for each of the use cases, letting user use a lower level API makes sense to me. In terms of the example you mentioned, can we solve them by throwing appropriate exceptions? {code} // Auto partition assignment subscribe(topic1) // assuming only topic1-partition0 is assigned to this consumer. subscribe(topic1-partition1) // throw IllegalStateException(Cannot subscribe to topic1-partition1 because topic1 is managed by consumer coordinator) unsubscribe(topic1-partition1) // throw IllegalStateException(Topic1-partiion1 is managed by consumer coordinator, and topic1-partition1 is not assigned to this consumer.) {code} {code} subscribe(topic1-partition0) subscribe(topic1) // throw IllegalStateException(Cannot subscribe to topic1 because the assignment of topic1 is manually managed) unsubscribe(topic1-partition1) // throw IllegalStateException(Cannot unsubscribe topic1-partition1 because it is not subscribed) {code} [~nehanarkhede], what you said makes a lot of sense. I guess I'm just looking at the problem from a different angle - user either wants to consume from a topic or not at a certain point, whether temporarily or permanently. So the state I see is only CONSUME/NOT_CONSUME. The PAUSE state for consumer would be pretty much the same as NOT_CONSUME. I might have missed some use case like [~hachikuji] mentioned - seek() on a PAUSE partition, but that can be solved by calling subscribe() first. [~gwenshap], good point about heartbeat. We actually got some feedback from users in LinkedIn and found that putting the responsibility of sending heartbeat on user might be a problem in the first place... We may have pause/unpause as a workaround, but the ultimate issue is that maybe we are asking too much from user to maintain the heartbeat... > Add KafkaConsumer pause capability > ---------------------------------- > > Key: KAFKA-2350 > URL: https://issues.apache.org/jira/browse/KAFKA-2350 > Project: Kafka > Issue Type: Improvement > Reporter: Jason Gustafson > Assignee: Jason Gustafson > > There are some use cases in stream processing where it is helpful to be able > to pause consumption of a topic. For example, when joining two topics, you > may need to delay processing of one topic while you wait for the consumer of > the other topic to catch up. The new consumer currently doesn't provide a > nice way to do this. If you skip calls to poll() or if you unsubscribe, then > a rebalance will be triggered and your partitions will be reassigned to > another consumer. The desired behavior is instead that you keep the partition > assigned and simply > One way to achieve this would be to add two new methods to KafkaConsumer: > {code} > void pause(TopicPartition... partitions); > void resume(TopicPartition... partitions); > {code} > Here is the expected behavior of pause/resume: > * When a partition is paused, calls to KafkaConsumer.poll will not initiate > any new fetches for that partition. > * After the partition is resumed, fetches will begin again. > * While a partition is paused, seek() and position() can still be used to > advance or query the current position. > * Rebalance does not preserve pause/resume state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)