We are trying to use the Kafka 0.9 consumer API to poll specific
partitions. We consume partitions based on our own logic instead of
delegating that to Kafka. One of our use cases is handling a change in the
partitions that we consume. This means that sometimes we need to consume
additional partitions and other times we need to stop consuming (not pause
but stop entirely) some of the partitions that we are currently polling.
The semantics of the assign() call at
http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
is that we need to provide the entire list of subscriptions. So when we
want to add or remove partitions we call the assignment() method to get the
existing set of TopicPartitions being polled, and then modify this set and
pass it back to the assign() call. However it seems weird that the assign()
call takes a List<TopicPartitions> whereas the assignment call returns a
Set<TopicPartitions>. Further the Set returned by the method is an
unmodifiable set which means to change this set we need to create a new
List/Set from it and then modify the new collection. Looking at the code
for the assignment() method further shows that a copy of the underlying set
is made and then wrapped in an unmodifiable set. The wrapping seems
unnecessary given that a copy is already being made. Excerpt here:

public Set<TopicPartition> assignment() {

        acquire();

        try {

            return Collections.unmodifiableSet(new HashSet<>(this.
subscriptions.assignedPartitions()));

        } finally {

            release();

        }

    }

Ideally the API would take and return a Set instead of taking in a List and
returning a Set. Further given that the Set returned is a copy of the
existing assignments, wrapping it in an unmodifiable set seems overkill
which requires the user of the API to make yet another copy just to modify
what is already a copy.

Thanks,
Rajiv

Reply via email to