We are trying to use the Kafka 0.9 consumer API to poll specific partitions. We consume partitions based on our own logic instead of delegating that to Kafka. One of our use cases is handling a change in the partitions that we consume. This means that sometimes we need to consume additional partitions and other times we need to stop consuming (not pause but stop entirely) some of the partitions that we are currently polling.
The semantics of the assign() call at http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html is that we need to provide the entire list of subscriptions. So when we want to add or remove partitions we call the assignment() method to get the existing set of TopicPartitions being polled, and then modify this set and pass it back to the assign() call. However it seems weird that the assign() call takes a List<TopicPartitions> whereas the assignment call returns a Set<TopicPartitions>. Further the Set returned by the method is an unmodifiable set which means to change this set we need to create a new List/Set from it and then modify the new collection. Looking at the code for the assignment() method further shows that a copy of the underlying set is made and then wrapped in an unmodifiable set. The wrapping seems unnecessary given that a copy is already being made. Excerpt here: public Set<TopicPartition> assignment() { acquire(); try { return Collections.unmodifiableSet(new HashSet<>(this. subscriptions.assignedPartitions())); } finally { release(); } } Ideally the API would take and return a Set instead of taking in a List and returning a Set. Further given that the Set returned is a copy of the existing assignments, wrapping it in an unmodifiable set seems overkill which requires the user of the API to make yet another copy just to modify what is already a copy. Thanks, Rajiv