Hi, Dave Thanks for your proposals. Overall looks good to me. Just a minor comment:
What about using the Pattern type to store the compiled Pattern in the TopicConsumerConfigurationData? Like below: ``` public class TopicConsumerConfigurationData implements Serializable { private static final long serialVersionUID = 1L; private Pattern topicsPattern; private int priorityLevel; } ``` Just like what we did for the topicsPattern in the ConsumerConfigurationData. [0] [0] https://github.com/apache/pulsar/blob/bbf2a47867ff54327c1f2940e72f08a44a5dc5f7/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java#L58 Zike Yang Zike Yang On Sat, Jul 9, 2022 at 12:10 AM Dave Maughan <dave.maug...@streamnative.io.invalid> wrote: > > Hi Enrico, > > Why can't you create multiple independent Consumers ? > > They will share the same resources (memory pools, thread pools) > > > You can do that but it means the user has to manage the consumption from > each of the consumer instances manually. This is already solved in the > existing multi topic consumer. This change adds user convenience - to > continue consuming from multiple topics in a single consumer instance, but > allowing them to customise the configuration. > > - Dave > > On Fri, Jul 8, 2022 at 4:26 PM Enrico Olivelli <eolive...@gmail.com> wrote: > > > Dave, > > Why can't you create multiple independent Consumers ? > > They will share the same resources (memory pools, thread pools) > > > > > > > > > > Enrico > > > > Il giorno ven 8 lug 2022 alle ore 15:00 Dave Maughan > > <dave.maug...@streamnative.io.invalid> ha scritto: > > > > > > Hi Pulsar community, > > > > > > I've opened a PIP to discuss a new Java client option to allow setting > > > topic specific consumer priorityLevel. > > > > > > Proposal Link: https://github.com/apache/pulsar/issues/16481 > > > > > > --- > > > > > > ## Motivation > > > > > > The Pulsar Java consumer supports setting a priority level for priority > > > message > > > dispatch in shared subscription consumers and priority assignment in > > > failover > > > subscription consumers. See the [ConsumerBuilder.html#priorityLevel(int) > > > Javadoc]( > > > > > https://javadoc.io/static/org.apache.pulsar/pulsar-client-api/2.10.1/org/apache/pulsar/client/api/ConsumerBuilder.html#priorityLevel(int) > > > ) > > > for a detailed functional description. The Pulsar Java consumer also > > > supports > > > consuming from multiple topics. However, it is not possible to set a > > > different > > > priority level for different topics in the same Consumer instance. > > > > > > This behaviour is desirable in some use cases. For example, a consumer > > > processing region specific topics might wish to configure region > > stickiness > > > - A > > > multi-region application might be consuming from topics events-us-east-1 > > and > > > events-eu-west-1. Consumers in all regions should be configured to > > consume > > > all > > > topics to ensure data completeness. However, to ensure low latency, the > > > us-east-1 consumer would need to set a higher priority level for the > > > us-east-1 > > > topic. Similarly, the eu-west-1 consumer would need to set a higher > > priority > > > level for the eu-west-1 topic. > > > > > > ## Goal > > > > > > Update the Java client API to allow the configuration of different > > priority > > > levels for different topics. > > > > > > Do so in such a way that supports the addition of other topic specific > > > configuration options or overrides in the future. > > > > > > Issues will be created to track feature parity in the other client > > > implementations for this PIP. > > > > > > ## API Changes > > > > > > In pulsar-client-api, update `ConsumerBuilder` to include two new > > methods: > > > > > > ```java > > > public interface ConsumerBuilder<T> extends Cloneable { > > > ... > > > > > > TopicConsumerBuilder<T> topicConfiguration(String > > topicNameOrPattern); > > > > > > ConsumerBuilder<T> topicConfiguration(String topicNameOrPattern, > > > java.util.function.Consumer<TopicConsumerBuilder<T>> > > > builderConsumer); > > > } > > > ``` > > > > > > Create a new interface: > > > > > > ```java > > > public interface TopicConsumerBuilder<T> { > > > TopicConsumerBuilder<T> priorityLevel(int priorityLevel); > > > > > > ConsumerBuilder<T> build(); > > > } > > > ``` > > > > > > In pulsar-client-original, update `ConsumerConfigurationData` to include > > a > > > new field: > > > > > > ```java > > > @Data > > > public class ConsumerConfigurationData<T> implements Serializable, > > > Cloneable { > > > ... > > > > > > private List<TopicConsumerConfigurationData> topicConfigurations = > > new > > > ArrayList<>(); > > > } > > > ``` > > > > > > Create a topic configuration class: > > > > > > ```java > > > @Data > > > public class TopicConsumerConfigurationData implements Serializable { > > > private static final long serialVersionUID = 1L; > > > > > > private String topicNameOrPattern; > > > private int priorityLevel; > > > > > > public boolean matchesTopicName(String topicName) { > > > return > > > Pattern.compile(topicNameOrPattern).matcher(topicName).matches(); > > > } > > > > > > public static TopicConsumerConfigurationData of(String > > > topicNameOrPattern, > > > ConsumerConfigurationData<?> conf) { > > > return new TopicConsumerConfigurationData(topicNameOrPattern, > > > conf.getPriorityLevel()); > > > } > > > } > > > ``` > > > > > > Then, in `ConsumerImpl` the appropriate topic configuration can be > > selected > > > based on the topic being subscribed to. Since the topic configuration is > > > effectively keyed by a topic name or pattern, it’s possible for the user > > to > > > be > > > able configure multiple topic configurations that match the same concrete > > > topic > > > name. In this case the first topic name match should be selected. > > > > > > ```java > > > TopicConsumerConfigurationData getMatchingTopicConfiguration(String > > > topicName, > > > ConsumerConfigurationData conf) { > > > return topicConfigurations.stream() > > > .filter(topicConf -> topicConf.matchesTopicName(topicName)) > > > .findFirst() > > > .orElseGet(() -> TopicConsumerConfigurationData.of(topicName, > > > conf)); > > > } > > > ``` > > > > > > Example Usage: > > > > > > ```java > > > pulsarClient.newConsumer() > > > .topicsPattern("events.*") > > > .priorityLevel(1) > > > .topicConfiguration(".*us-east-1", b -> b.priorityLevel(0)) > > > .subscribe(); > > > ``` > > > > > > or > > > > > > ```java > > > pulsarClient.newConsumer() > > > .topicsPattern("events.*") > > > .priorityLevel(1) > > > .topicConfiguration(".*us-east-1") > > > .priorityLevel(0) > > > .build() > > > .subscribe(); > > > ``` > > > > > > ## Rejected Alternatives > > > > > > * Extend the existing `ConsumerBuilder` rather than introducing a nested, > > > topic specific builder class. > > > > > > Rejection reason: Does not provide a clear API to discover and extend > > other > > > topic specific configuration options and overrides. > > > > > > ```java > > > public interface ConsumerBuilder<T> extends Cloneable { > > > ... > > > > > > ConsumerBuilder<T> topicPriorityLevel(String topicNameOrPattern, int > > > priorityLevel); > > > } > > > ``` > > > > > > Example usage: > > > > > > ```java > > > pulsarClient.newConsumer() > > > .topicsPattern("events.*") > > > .priorityLevel(1) > > > .topicPriorityLevel(".*us-east-1", 0) > > > .subscribe(); > > > ``` > > > > > > * Provide a configurator interface to configure options and overrides at > > > runtime > > > > > > Rejection reason: Not compatible with `ConsumerBuilder.loadConf`. > > > > > > ```java > > > @Data > > > class TopicConsumerConfigurationData { > > > private int priorityLevel; > > > } > > > ``` > > > > > > ```java > > > interface TopicConsumerConfigurator extends Serializable { > > > void configure(String topicName, TopicConsumerConfigurationData > > > topicConf); > > > } > > > ``` > > > > > > ```java > > > public interface ConsumerBuilder<T> extends Cloneable { > > > ... > > > > > > ConsumerBuilder<T> topicConfigurator(TopicConsumerConfigurator > > > configurator); > > > } > > > ``` > > > > > > Example usage: > > > > > > ```java > > > pulsarClient.newConsumer() > > > .topicsPattern("events.*") > > > .priorityLevel(1) > > > .topicConfigurator((topicName, topicConf) -> { > > > if (topicName.endsWith("us-east-1") { > > > topicConf.setPriorityLevel(0); > > > } > > > }) > > > .subscribe(); > > > ``` > >