[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639307#comment-14639307
 ] 

Jiangjie Qin commented on KAFKA-2350:
-------------------------------------

Ah, I see your concern. Originally I was thinking that we are still going to 
make self partition control and consumer coordinator partition assignment 
mutually exclusive. So if subscribe(partition) has been called, 
subscribe(topic) will throw exception.

Now I think allowing mixed mode might be an interesting feature to support. Let 
me take a shot to address the use cases you raised.

We maintain the following two data structure:
{code}
Map<Topic, TopicPartition> assignedPartitions    - A
Set<Topic, TopicPartition> subscribedPartitions  - B
{code}
The only rule we have is that you can not do both MANUAL and AUTO partition 
subscription on the same topic. 
The validity of sub/unsub to a topic are as below:
* In A, in B - partition is assigned by coordinator, sub/unsub works for both 
topic and partition level
* In A, not in B - Topic is subscribed manually, sub/unsub at topic level will 
get exception, sub/unsub at partition level works.
* Not in A, in B - Topic is assigned by coordinator, but suppressed by user, 
sub/unsub to topic level works, sub/unsub to partition level only works when 
partition is in B.
* Not in A, not in B - brand new topic, sub/unsub at topic and partition level 
works.

We can discuss whether it is legitimate to unsubscribe from a partition it is 
not assigned/subscribed to, but that is an orthogonal issue.

So for the cases you mentioned.
* case #1
{code}
subscribe(topic)
unsubscribe(partition)
{code}
means give me a set of partitions and suppress one of them - just like what you 
said.

* case 2
{code}
subscribe(partition)
subscribe(topic)
{code}
means give me this partition, plus whatever coordinator gives me.
However, if the topic user is trying to subscribe to happened to be the same 
topic of the partition it has already subscribed to in the first line, that is 
an exception - you can not do manual and automatic partition subscription at 
the same time. (In this case, the topic would be in subscribedPartitions but 
not in assignedPartitions)

* case 3
{code}
unsubscribe(partition)
subscribe(topic)
{code}
is a little bit weird sequence at the first place. How could you suppress a 
partition while you don't even know if the partition will be assigned to you?
Anyway, so here is the behavior. Unsubscribe to a partition will go through. 
But after calling subscribe(topic), the assigned topic partition will not be 
suppressed.
However, if user is currently subscribing to a partition manually from that 
topic (in subscribedParititions, not in assignedPartitions)

* case 4
{code}
subscribe(partition)
subscribe(topic)
unsubscribe(partition)
{code}
The first two subscribe is described in case 2. The partition will be 
suppressed after unsubscribe call.

* case 5
{code}
subscribe(partition)
unsubscribe(partition)
subscribe(topic)
{code}
If the first two lines are called for the same partition. it is equivalent to 
just call subscribe(topic) - so the partition won't be suppressed. If the first 
two lines are called for different partitions. It is then again a little bit 
weird for use to suppress a partition that it does not even know whether it 
will be assigned. But the partition won't be suppressed.
 


> Add KafkaConsumer pause capability
> ----------------------------------
>
>                 Key: KAFKA-2350
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2350
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to