Hi Zike,

> What about using the Pattern type to store the compiled Pattern in the
TopicConsumerConfigurationData?

Thanks for the suggestion - yes that's a good idea. I'll update the doc.

- Dave


On Mon, Jul 11, 2022 at 12:04 PM Zike Yang <z...@apache.org> wrote:

> Hi, Dave
>
> Thanks for your proposals. Overall looks good to me. Just a minor comment:
>
> What about using the Pattern type to store the compiled Pattern in the
> TopicConsumerConfigurationData? Like below:
> ```
> public class TopicConsumerConfigurationData implements Serializable {
>     private static final long serialVersionUID = 1L;
>
>     private Pattern topicsPattern;
>     private int priorityLevel;
> }
> ```
> Just like what we did for the topicsPattern in the
> ConsumerConfigurationData. [0]
>
>
> [0]
> https://github.com/apache/pulsar/blob/bbf2a47867ff54327c1f2940e72f08a44a5dc5f7/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java#L58
>
> Zike Yang
>
>
>
> Zike Yang
>
>
> On Sat, Jul 9, 2022 at 12:10 AM Dave Maughan
> <dave.maug...@streamnative.io.invalid> wrote:
> >
> > Hi Enrico,
> >
> > Why can't you create multiple independent Consumers ?
> > > They will share the same resources (memory pools, thread pools)
> >
> >
> > You can do that but it means the user has to manage the consumption from
> > each of the consumer instances manually. This is already solved in the
> > existing multi topic consumer. This change adds user convenience - to
> > continue consuming from multiple topics in a single consumer instance,
> but
> > allowing them to customise the configuration.
> >
> > - Dave
> >
> > On Fri, Jul 8, 2022 at 4:26 PM Enrico Olivelli <eolive...@gmail.com>
> wrote:
> >
> > > Dave,
> > > Why can't you create multiple independent Consumers ?
> > > They will share the same resources (memory pools, thread pools)
> > >
> > >
> > >
> > >
> > > Enrico
> > >
> > > Il giorno ven 8 lug 2022 alle ore 15:00 Dave Maughan
> > > <dave.maug...@streamnative.io.invalid> ha scritto:
> > > >
> > > > Hi Pulsar community,
> > > >
> > > > I've opened a PIP to discuss a new Java client option to allow
> setting
> > > > topic specific consumer priorityLevel.
> > > >
> > > > Proposal Link: https://github.com/apache/pulsar/issues/16481
> > > >
> > > > ---
> > > >
> > > > ## Motivation
> > > >
> > > > The Pulsar Java consumer supports setting a priority level for
> priority
> > > > message
> > > > dispatch in shared subscription consumers and priority assignment in
> > > > failover
> > > > subscription consumers. See the
> [ConsumerBuilder.html#priorityLevel(int)
> > > > Javadoc](
> > > >
> > >
> https://javadoc.io/static/org.apache.pulsar/pulsar-client-api/2.10.1/org/apache/pulsar/client/api/ConsumerBuilder.html#priorityLevel(int)
> > > > )
> > > > for a detailed functional description. The Pulsar Java consumer also
> > > > supports
> > > > consuming from multiple topics. However, it is not possible to set a
> > > > different
> > > > priority level for different topics in the same Consumer instance.
> > > >
> > > > This behaviour is desirable in some use cases. For example, a
> consumer
> > > > processing region specific topics might wish to configure region
> > > stickiness
> > > > - A
> > > > multi-region application might be consuming from topics
> events-us-east-1
> > > and
> > > > events-eu-west-1. Consumers in all regions should be configured to
> > > consume
> > > > all
> > > > topics to ensure data completeness. However, to ensure low latency,
> the
> > > > us-east-1 consumer would need to set a higher priority level for the
> > > > us-east-1
> > > > topic. Similarly, the eu-west-1 consumer would need to set a higher
> > > priority
> > > > level for the eu-west-1 topic.
> > > >
> > > > ## Goal
> > > >
> > > > Update the Java client API to allow the configuration of different
> > > priority
> > > > levels for different topics.
> > > >
> > > > Do so in such a way that supports the addition of other topic
> specific
> > > > configuration options or overrides in the future.
> > > >
> > > > Issues will be created to track feature parity in the other client
> > > > implementations for this PIP.
> > > >
> > > > ## API Changes
> > > >
> > > > In pulsar-client-api, update `ConsumerBuilder` to include two new
> > > methods:
> > > >
> > > > ```java
> > > > public interface ConsumerBuilder<T> extends Cloneable {
> > > >     ...
> > > >
> > > >     TopicConsumerBuilder<T> topicConfiguration(String
> > > topicNameOrPattern);
> > > >
> > > >     ConsumerBuilder<T> topicConfiguration(String topicNameOrPattern,
> > > >             java.util.function.Consumer<TopicConsumerBuilder<T>>
> > > > builderConsumer);
> > > > }
> > > > ```
> > > >
> > > > Create a new interface:
> > > >
> > > > ```java
> > > > public interface TopicConsumerBuilder<T> {
> > > >     TopicConsumerBuilder<T> priorityLevel(int priorityLevel);
> > > >
> > > >     ConsumerBuilder<T> build();
> > > > }
> > > > ```
> > > >
> > > > In pulsar-client-original, update `ConsumerConfigurationData` to
> include
> > > a
> > > > new field:
> > > >
> > > > ```java
> > > > @Data
> > > > public class ConsumerConfigurationData<T> implements Serializable,
> > > > Cloneable {
> > > >     ...
> > > >
> > > >     private List<TopicConsumerConfigurationData> topicConfigurations
> =
> > > new
> > > > ArrayList<>();
> > > > }
> > > > ```
> > > >
> > > > Create a topic configuration class:
> > > >
> > > > ```java
> > > > @Data
> > > > public class TopicConsumerConfigurationData implements Serializable {
> > > >     private static final long serialVersionUID = 1L;
> > > >
> > > >     private String topicNameOrPattern;
> > > >     private int priorityLevel;
> > > >
> > > >     public boolean matchesTopicName(String topicName) {
> > > >         return
> > > > Pattern.compile(topicNameOrPattern).matcher(topicName).matches();
> > > >     }
> > > >
> > > >     public static TopicConsumerConfigurationData of(String
> > > > topicNameOrPattern,
> > > >             ConsumerConfigurationData<?> conf) {
> > > >         return new TopicConsumerConfigurationData(topicNameOrPattern,
> > > > conf.getPriorityLevel());
> > > >     }
> > > > }
> > > > ```
> > > >
> > > > Then, in `ConsumerImpl` the appropriate topic configuration can be
> > > selected
> > > > based on the topic being subscribed to. Since the topic
> configuration is
> > > > effectively keyed by a topic name or pattern, it’s possible for the
> user
> > > to
> > > > be
> > > > able configure multiple topic configurations that match the same
> concrete
> > > > topic
> > > > name. In this case the first topic name match should be selected.
> > > >
> > > > ```java
> > > > TopicConsumerConfigurationData getMatchingTopicConfiguration(String
> > > > topicName,
> > > >         ConsumerConfigurationData conf) {
> > > >     return topicConfigurations.stream()
> > > >         .filter(topicConf -> topicConf.matchesTopicName(topicName))
> > > >         .findFirst()
> > > >         .orElseGet(() -> TopicConsumerConfigurationData.of(topicName,
> > > > conf));
> > > > }
> > > > ```
> > > >
> > > > Example Usage:
> > > >
> > > > ```java
> > > > pulsarClient.newConsumer()
> > > >     .topicsPattern("events.*")
> > > >     .priorityLevel(1)
> > > >     .topicConfiguration(".*us-east-1", b -> b.priorityLevel(0))
> > > >     .subscribe();
> > > > ```
> > > >
> > > > or
> > > >
> > > > ```java
> > > > pulsarClient.newConsumer()
> > > >     .topicsPattern("events.*")
> > > >     .priorityLevel(1)
> > > >     .topicConfiguration(".*us-east-1")
> > > >         .priorityLevel(0)
> > > >         .build()
> > > >     .subscribe();
> > > > ```
> > > >
> > > > ## Rejected Alternatives
> > > >
> > > > * Extend the existing `ConsumerBuilder` rather than introducing a
> nested,
> > > > topic specific builder class.
> > > >
> > > > Rejection reason: Does not provide a clear API to discover and extend
> > > other
> > > > topic specific configuration options and overrides.
> > > >
> > > > ```java
> > > > public interface ConsumerBuilder<T> extends Cloneable {
> > > >     ...
> > > >
> > > >     ConsumerBuilder<T> topicPriorityLevel(String topicNameOrPattern,
> int
> > > > priorityLevel);
> > > > }
> > > > ```
> > > >
> > > > Example usage:
> > > >
> > > > ```java
> > > > pulsarClient.newConsumer()
> > > >     .topicsPattern("events.*")
> > > >     .priorityLevel(1)
> > > >     .topicPriorityLevel(".*us-east-1", 0)
> > > >     .subscribe();
> > > > ```
> > > >
> > > > * Provide a configurator interface to configure options and
> overrides at
> > > > runtime
> > > >
> > > > Rejection reason: Not compatible with `ConsumerBuilder.loadConf`.
> > > >
> > > > ```java
> > > > @Data
> > > > class TopicConsumerConfigurationData {
> > > >     private int priorityLevel;
> > > > }
> > > > ```
> > > >
> > > > ```java
> > > > interface TopicConsumerConfigurator extends Serializable {
> > > >    void configure(String topicName, TopicConsumerConfigurationData
> > > > topicConf);
> > > > }
> > > > ```
> > > >
> > > > ```java
> > > > public interface ConsumerBuilder<T> extends Cloneable {
> > > >     ...
> > > >
> > > >     ConsumerBuilder<T> topicConfigurator(TopicConsumerConfigurator
> > > > configurator);
> > > > }
> > > > ```
> > > >
> > > > Example usage:
> > > >
> > > > ```java
> > > > pulsarClient.newConsumer()
> > > >     .topicsPattern("events.*")
> > > >     .priorityLevel(1)
> > > >     .topicConfigurator((topicName, topicConf) -> {
> > > >         if (topicName.endsWith("us-east-1") {
> > > >             topicConf.setPriorityLevel(0);
> > > >         }
> > > >     })
> > > >     .subscribe();
> > > > ```
> > >
>

Reply via email to