Dave,
Why can't you create multiple independent Consumers ?
They will share the same resources (memory pools, thread pools)




Enrico

Il giorno ven 8 lug 2022 alle ore 15:00 Dave Maughan
<dave.maug...@streamnative.io.invalid> ha scritto:
>
> Hi Pulsar community,
>
> I've opened a PIP to discuss a new Java client option to allow setting
> topic specific consumer priorityLevel.
>
> Proposal Link: https://github.com/apache/pulsar/issues/16481
>
> ---
>
> ## Motivation
>
> The Pulsar Java consumer supports setting a priority level for priority
> message
> dispatch in shared subscription consumers and priority assignment in
> failover
> subscription consumers. See the [ConsumerBuilder.html#priorityLevel(int)
> Javadoc](
> https://javadoc.io/static/org.apache.pulsar/pulsar-client-api/2.10.1/org/apache/pulsar/client/api/ConsumerBuilder.html#priorityLevel(int)
> )
> for a detailed functional description. The Pulsar Java consumer also
> supports
> consuming from multiple topics. However, it is not possible to set a
> different
> priority level for different topics in the same Consumer instance.
>
> This behaviour is desirable in some use cases. For example, a consumer
> processing region specific topics might wish to configure region stickiness
> - A
> multi-region application might be consuming from topics events-us-east-1 and
> events-eu-west-1. Consumers in all regions should be configured to consume
> all
> topics to ensure data completeness. However, to ensure low latency, the
> us-east-1 consumer would need to set a higher priority level for the
> us-east-1
> topic. Similarly, the eu-west-1 consumer would need to set a higher priority
> level for the eu-west-1 topic.
>
> ## Goal
>
> Update the Java client API to allow the configuration of different priority
> levels for different topics.
>
> Do so in such a way that supports the addition of other topic specific
> configuration options or overrides in the future.
>
> Issues will be created to track feature parity in the other client
> implementations for this PIP.
>
> ## API Changes
>
> In pulsar-client-api, update `ConsumerBuilder` to include two new methods:
>
> ```java
> public interface ConsumerBuilder<T> extends Cloneable {
>     ...
>
>     TopicConsumerBuilder<T> topicConfiguration(String topicNameOrPattern);
>
>     ConsumerBuilder<T> topicConfiguration(String topicNameOrPattern,
>             java.util.function.Consumer<TopicConsumerBuilder<T>>
> builderConsumer);
> }
> ```
>
> Create a new interface:
>
> ```java
> public interface TopicConsumerBuilder<T> {
>     TopicConsumerBuilder<T> priorityLevel(int priorityLevel);
>
>     ConsumerBuilder<T> build();
> }
> ```
>
> In pulsar-client-original, update `ConsumerConfigurationData` to include a
> new field:
>
> ```java
> @Data
> public class ConsumerConfigurationData<T> implements Serializable,
> Cloneable {
>     ...
>
>     private List<TopicConsumerConfigurationData> topicConfigurations = new
> ArrayList<>();
> }
> ```
>
> Create a topic configuration class:
>
> ```java
> @Data
> public class TopicConsumerConfigurationData implements Serializable {
>     private static final long serialVersionUID = 1L;
>
>     private String topicNameOrPattern;
>     private int priorityLevel;
>
>     public boolean matchesTopicName(String topicName) {
>         return
> Pattern.compile(topicNameOrPattern).matcher(topicName).matches();
>     }
>
>     public static TopicConsumerConfigurationData of(String
> topicNameOrPattern,
>             ConsumerConfigurationData<?> conf) {
>         return new TopicConsumerConfigurationData(topicNameOrPattern,
> conf.getPriorityLevel());
>     }
> }
> ```
>
> Then, in `ConsumerImpl` the appropriate topic configuration can be selected
> based on the topic being subscribed to. Since the topic configuration is
> effectively keyed by a topic name or pattern, it’s possible for the user to
> be
> able configure multiple topic configurations that match the same concrete
> topic
> name. In this case the first topic name match should be selected.
>
> ```java
> TopicConsumerConfigurationData getMatchingTopicConfiguration(String
> topicName,
>         ConsumerConfigurationData conf) {
>     return topicConfigurations.stream()
>         .filter(topicConf -> topicConf.matchesTopicName(topicName))
>         .findFirst()
>         .orElseGet(() -> TopicConsumerConfigurationData.of(topicName,
> conf));
> }
> ```
>
> Example Usage:
>
> ```java
> pulsarClient.newConsumer()
>     .topicsPattern("events.*")
>     .priorityLevel(1)
>     .topicConfiguration(".*us-east-1", b -> b.priorityLevel(0))
>     .subscribe();
> ```
>
> or
>
> ```java
> pulsarClient.newConsumer()
>     .topicsPattern("events.*")
>     .priorityLevel(1)
>     .topicConfiguration(".*us-east-1")
>         .priorityLevel(0)
>         .build()
>     .subscribe();
> ```
>
> ## Rejected Alternatives
>
> * Extend the existing `ConsumerBuilder` rather than introducing a nested,
> topic specific builder class.
>
> Rejection reason: Does not provide a clear API to discover and extend other
> topic specific configuration options and overrides.
>
> ```java
> public interface ConsumerBuilder<T> extends Cloneable {
>     ...
>
>     ConsumerBuilder<T> topicPriorityLevel(String topicNameOrPattern, int
> priorityLevel);
> }
> ```
>
> Example usage:
>
> ```java
> pulsarClient.newConsumer()
>     .topicsPattern("events.*")
>     .priorityLevel(1)
>     .topicPriorityLevel(".*us-east-1", 0)
>     .subscribe();
> ```
>
> * Provide a configurator interface to configure options and overrides at
> runtime
>
> Rejection reason: Not compatible with `ConsumerBuilder.loadConf`.
>
> ```java
> @Data
> class TopicConsumerConfigurationData {
>     private int priorityLevel;
> }
> ```
>
> ```java
> interface TopicConsumerConfigurator extends Serializable {
>    void configure(String topicName, TopicConsumerConfigurationData
> topicConf);
> }
> ```
>
> ```java
> public interface ConsumerBuilder<T> extends Cloneable {
>     ...
>
>     ConsumerBuilder<T> topicConfigurator(TopicConsumerConfigurator
> configurator);
> }
> ```
>
> Example usage:
>
> ```java
> pulsarClient.newConsumer()
>     .topicsPattern("events.*")
>     .priorityLevel(1)
>     .topicConfigurator((topicName, topicConf) -> {
>         if (topicName.endsWith("us-east-1") {
>             topicConf.setPriorityLevel(0);
>         }
>     })
>     .subscribe();
> ```

Reply via email to