Dave, Why can't you create multiple independent Consumers ? They will share the same resources (memory pools, thread pools)
Enrico Il giorno ven 8 lug 2022 alle ore 15:00 Dave Maughan <dave.maug...@streamnative.io.invalid> ha scritto: > > Hi Pulsar community, > > I've opened a PIP to discuss a new Java client option to allow setting > topic specific consumer priorityLevel. > > Proposal Link: https://github.com/apache/pulsar/issues/16481 > > --- > > ## Motivation > > The Pulsar Java consumer supports setting a priority level for priority > message > dispatch in shared subscription consumers and priority assignment in > failover > subscription consumers. See the [ConsumerBuilder.html#priorityLevel(int) > Javadoc]( > https://javadoc.io/static/org.apache.pulsar/pulsar-client-api/2.10.1/org/apache/pulsar/client/api/ConsumerBuilder.html#priorityLevel(int) > ) > for a detailed functional description. The Pulsar Java consumer also > supports > consuming from multiple topics. However, it is not possible to set a > different > priority level for different topics in the same Consumer instance. > > This behaviour is desirable in some use cases. For example, a consumer > processing region specific topics might wish to configure region stickiness > - A > multi-region application might be consuming from topics events-us-east-1 and > events-eu-west-1. Consumers in all regions should be configured to consume > all > topics to ensure data completeness. However, to ensure low latency, the > us-east-1 consumer would need to set a higher priority level for the > us-east-1 > topic. Similarly, the eu-west-1 consumer would need to set a higher priority > level for the eu-west-1 topic. > > ## Goal > > Update the Java client API to allow the configuration of different priority > levels for different topics. > > Do so in such a way that supports the addition of other topic specific > configuration options or overrides in the future. > > Issues will be created to track feature parity in the other client > implementations for this PIP. > > ## API Changes > > In pulsar-client-api, update `ConsumerBuilder` to include two new methods: > > ```java > public interface ConsumerBuilder<T> extends Cloneable { > ... > > TopicConsumerBuilder<T> topicConfiguration(String topicNameOrPattern); > > ConsumerBuilder<T> topicConfiguration(String topicNameOrPattern, > java.util.function.Consumer<TopicConsumerBuilder<T>> > builderConsumer); > } > ``` > > Create a new interface: > > ```java > public interface TopicConsumerBuilder<T> { > TopicConsumerBuilder<T> priorityLevel(int priorityLevel); > > ConsumerBuilder<T> build(); > } > ``` > > In pulsar-client-original, update `ConsumerConfigurationData` to include a > new field: > > ```java > @Data > public class ConsumerConfigurationData<T> implements Serializable, > Cloneable { > ... > > private List<TopicConsumerConfigurationData> topicConfigurations = new > ArrayList<>(); > } > ``` > > Create a topic configuration class: > > ```java > @Data > public class TopicConsumerConfigurationData implements Serializable { > private static final long serialVersionUID = 1L; > > private String topicNameOrPattern; > private int priorityLevel; > > public boolean matchesTopicName(String topicName) { > return > Pattern.compile(topicNameOrPattern).matcher(topicName).matches(); > } > > public static TopicConsumerConfigurationData of(String > topicNameOrPattern, > ConsumerConfigurationData<?> conf) { > return new TopicConsumerConfigurationData(topicNameOrPattern, > conf.getPriorityLevel()); > } > } > ``` > > Then, in `ConsumerImpl` the appropriate topic configuration can be selected > based on the topic being subscribed to. Since the topic configuration is > effectively keyed by a topic name or pattern, it’s possible for the user to > be > able configure multiple topic configurations that match the same concrete > topic > name. In this case the first topic name match should be selected. > > ```java > TopicConsumerConfigurationData getMatchingTopicConfiguration(String > topicName, > ConsumerConfigurationData conf) { > return topicConfigurations.stream() > .filter(topicConf -> topicConf.matchesTopicName(topicName)) > .findFirst() > .orElseGet(() -> TopicConsumerConfigurationData.of(topicName, > conf)); > } > ``` > > Example Usage: > > ```java > pulsarClient.newConsumer() > .topicsPattern("events.*") > .priorityLevel(1) > .topicConfiguration(".*us-east-1", b -> b.priorityLevel(0)) > .subscribe(); > ``` > > or > > ```java > pulsarClient.newConsumer() > .topicsPattern("events.*") > .priorityLevel(1) > .topicConfiguration(".*us-east-1") > .priorityLevel(0) > .build() > .subscribe(); > ``` > > ## Rejected Alternatives > > * Extend the existing `ConsumerBuilder` rather than introducing a nested, > topic specific builder class. > > Rejection reason: Does not provide a clear API to discover and extend other > topic specific configuration options and overrides. > > ```java > public interface ConsumerBuilder<T> extends Cloneable { > ... > > ConsumerBuilder<T> topicPriorityLevel(String topicNameOrPattern, int > priorityLevel); > } > ``` > > Example usage: > > ```java > pulsarClient.newConsumer() > .topicsPattern("events.*") > .priorityLevel(1) > .topicPriorityLevel(".*us-east-1", 0) > .subscribe(); > ``` > > * Provide a configurator interface to configure options and overrides at > runtime > > Rejection reason: Not compatible with `ConsumerBuilder.loadConf`. > > ```java > @Data > class TopicConsumerConfigurationData { > private int priorityLevel; > } > ``` > > ```java > interface TopicConsumerConfigurator extends Serializable { > void configure(String topicName, TopicConsumerConfigurationData > topicConf); > } > ``` > > ```java > public interface ConsumerBuilder<T> extends Cloneable { > ... > > ConsumerBuilder<T> topicConfigurator(TopicConsumerConfigurator > configurator); > } > ``` > > Example usage: > > ```java > pulsarClient.newConsumer() > .topicsPattern("events.*") > .priorityLevel(1) > .topicConfigurator((topicName, topicConf) -> { > if (topicName.endsWith("us-east-1") { > topicConf.setPriorityLevel(0); > } > }) > .subscribe(); > ```