Hi Pulsar community, I've opened a PIP to discuss a new Java client option to allow setting topic specific consumer priorityLevel.
Proposal Link: https://github.com/apache/pulsar/issues/16481 --- ## Motivation The Pulsar Java consumer supports setting a priority level for priority message dispatch in shared subscription consumers and priority assignment in failover subscription consumers. See the [ConsumerBuilder.html#priorityLevel(int) Javadoc]( https://javadoc.io/static/org.apache.pulsar/pulsar-client-api/2.10.1/org/apache/pulsar/client/api/ConsumerBuilder.html#priorityLevel(int) ) for a detailed functional description. The Pulsar Java consumer also supports consuming from multiple topics. However, it is not possible to set a different priority level for different topics in the same Consumer instance. This behaviour is desirable in some use cases. For example, a consumer processing region specific topics might wish to configure region stickiness - A multi-region application might be consuming from topics events-us-east-1 and events-eu-west-1. Consumers in all regions should be configured to consume all topics to ensure data completeness. However, to ensure low latency, the us-east-1 consumer would need to set a higher priority level for the us-east-1 topic. Similarly, the eu-west-1 consumer would need to set a higher priority level for the eu-west-1 topic. ## Goal Update the Java client API to allow the configuration of different priority levels for different topics. Do so in such a way that supports the addition of other topic specific configuration options or overrides in the future. Issues will be created to track feature parity in the other client implementations for this PIP. ## API Changes In pulsar-client-api, update `ConsumerBuilder` to include two new methods: ```java public interface ConsumerBuilder<T> extends Cloneable { ... TopicConsumerBuilder<T> topicConfiguration(String topicNameOrPattern); ConsumerBuilder<T> topicConfiguration(String topicNameOrPattern, java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer); } ``` Create a new interface: ```java public interface TopicConsumerBuilder<T> { TopicConsumerBuilder<T> priorityLevel(int priorityLevel); ConsumerBuilder<T> build(); } ``` In pulsar-client-original, update `ConsumerConfigurationData` to include a new field: ```java @Data public class ConsumerConfigurationData<T> implements Serializable, Cloneable { ... private List<TopicConsumerConfigurationData> topicConfigurations = new ArrayList<>(); } ``` Create a topic configuration class: ```java @Data public class TopicConsumerConfigurationData implements Serializable { private static final long serialVersionUID = 1L; private String topicNameOrPattern; private int priorityLevel; public boolean matchesTopicName(String topicName) { return Pattern.compile(topicNameOrPattern).matcher(topicName).matches(); } public static TopicConsumerConfigurationData of(String topicNameOrPattern, ConsumerConfigurationData<?> conf) { return new TopicConsumerConfigurationData(topicNameOrPattern, conf.getPriorityLevel()); } } ``` Then, in `ConsumerImpl` the appropriate topic configuration can be selected based on the topic being subscribed to. Since the topic configuration is effectively keyed by a topic name or pattern, it’s possible for the user to be able configure multiple topic configurations that match the same concrete topic name. In this case the first topic name match should be selected. ```java TopicConsumerConfigurationData getMatchingTopicConfiguration(String topicName, ConsumerConfigurationData conf) { return topicConfigurations.stream() .filter(topicConf -> topicConf.matchesTopicName(topicName)) .findFirst() .orElseGet(() -> TopicConsumerConfigurationData.of(topicName, conf)); } ``` Example Usage: ```java pulsarClient.newConsumer() .topicsPattern("events.*") .priorityLevel(1) .topicConfiguration(".*us-east-1", b -> b.priorityLevel(0)) .subscribe(); ``` or ```java pulsarClient.newConsumer() .topicsPattern("events.*") .priorityLevel(1) .topicConfiguration(".*us-east-1") .priorityLevel(0) .build() .subscribe(); ``` ## Rejected Alternatives * Extend the existing `ConsumerBuilder` rather than introducing a nested, topic specific builder class. Rejection reason: Does not provide a clear API to discover and extend other topic specific configuration options and overrides. ```java public interface ConsumerBuilder<T> extends Cloneable { ... ConsumerBuilder<T> topicPriorityLevel(String topicNameOrPattern, int priorityLevel); } ``` Example usage: ```java pulsarClient.newConsumer() .topicsPattern("events.*") .priorityLevel(1) .topicPriorityLevel(".*us-east-1", 0) .subscribe(); ``` * Provide a configurator interface to configure options and overrides at runtime Rejection reason: Not compatible with `ConsumerBuilder.loadConf`. ```java @Data class TopicConsumerConfigurationData { private int priorityLevel; } ``` ```java interface TopicConsumerConfigurator extends Serializable { void configure(String topicName, TopicConsumerConfigurationData topicConf); } ``` ```java public interface ConsumerBuilder<T> extends Cloneable { ... ConsumerBuilder<T> topicConfigurator(TopicConsumerConfigurator configurator); } ``` Example usage: ```java pulsarClient.newConsumer() .topicsPattern("events.*") .priorityLevel(1) .topicConfigurator((topicName, topicConf) -> { if (topicName.endsWith("us-east-1") { topicConf.setPriorityLevel(0); } }) .subscribe(); ```