+1

It allows users to only have one consumer but with different priority
levels for different topics.
Penghui

On Tue, Jul 12, 2022 at 4:18 PM Dave Maughan
<dave.maug...@streamnative.io.invalid> wrote:

> Hi Zike,
>
> > What about using the Pattern type to store the compiled Pattern in the
> TopicConsumerConfigurationData?
>
> Thanks for the suggestion - yes that's a good idea. I'll update the doc.
>
> - Dave
>
>
> On Mon, Jul 11, 2022 at 12:04 PM Zike Yang <z...@apache.org> wrote:
>
> > Hi, Dave
> >
> > Thanks for your proposals. Overall looks good to me. Just a minor
> comment:
> >
> > What about using the Pattern type to store the compiled Pattern in the
> > TopicConsumerConfigurationData? Like below:
> > ```
> > public class TopicConsumerConfigurationData implements Serializable {
> >     private static final long serialVersionUID = 1L;
> >
> >     private Pattern topicsPattern;
> >     private int priorityLevel;
> > }
> > ```
> > Just like what we did for the topicsPattern in the
> > ConsumerConfigurationData. [0]
> >
> >
> > [0]
> >
> https://github.com/apache/pulsar/blob/bbf2a47867ff54327c1f2940e72f08a44a5dc5f7/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java#L58
> >
> > Zike Yang
> >
> >
> >
> > Zike Yang
> >
> >
> > On Sat, Jul 9, 2022 at 12:10 AM Dave Maughan
> > <dave.maug...@streamnative.io.invalid> wrote:
> > >
> > > Hi Enrico,
> > >
> > > Why can't you create multiple independent Consumers ?
> > > > They will share the same resources (memory pools, thread pools)
> > >
> > >
> > > You can do that but it means the user has to manage the consumption
> from
> > > each of the consumer instances manually. This is already solved in the
> > > existing multi topic consumer. This change adds user convenience - to
> > > continue consuming from multiple topics in a single consumer instance,
> > but
> > > allowing them to customise the configuration.
> > >
> > > - Dave
> > >
> > > On Fri, Jul 8, 2022 at 4:26 PM Enrico Olivelli <eolive...@gmail.com>
> > wrote:
> > >
> > > > Dave,
> > > > Why can't you create multiple independent Consumers ?
> > > > They will share the same resources (memory pools, thread pools)
> > > >
> > > >
> > > >
> > > >
> > > > Enrico
> > > >
> > > > Il giorno ven 8 lug 2022 alle ore 15:00 Dave Maughan
> > > > <dave.maug...@streamnative.io.invalid> ha scritto:
> > > > >
> > > > > Hi Pulsar community,
> > > > >
> > > > > I've opened a PIP to discuss a new Java client option to allow
> > setting
> > > > > topic specific consumer priorityLevel.
> > > > >
> > > > > Proposal Link: https://github.com/apache/pulsar/issues/16481
> > > > >
> > > > > ---
> > > > >
> > > > > ## Motivation
> > > > >
> > > > > The Pulsar Java consumer supports setting a priority level for
> > priority
> > > > > message
> > > > > dispatch in shared subscription consumers and priority assignment
> in
> > > > > failover
> > > > > subscription consumers. See the
> > [ConsumerBuilder.html#priorityLevel(int)
> > > > > Javadoc](
> > > > >
> > > >
> >
> https://javadoc.io/static/org.apache.pulsar/pulsar-client-api/2.10.1/org/apache/pulsar/client/api/ConsumerBuilder.html#priorityLevel(int)
> > > > > )
> > > > > for a detailed functional description. The Pulsar Java consumer
> also
> > > > > supports
> > > > > consuming from multiple topics. However, it is not possible to set
> a
> > > > > different
> > > > > priority level for different topics in the same Consumer instance.
> > > > >
> > > > > This behaviour is desirable in some use cases. For example, a
> > consumer
> > > > > processing region specific topics might wish to configure region
> > > > stickiness
> > > > > - A
> > > > > multi-region application might be consuming from topics
> > events-us-east-1
> > > > and
> > > > > events-eu-west-1. Consumers in all regions should be configured to
> > > > consume
> > > > > all
> > > > > topics to ensure data completeness. However, to ensure low latency,
> > the
> > > > > us-east-1 consumer would need to set a higher priority level for
> the
> > > > > us-east-1
> > > > > topic. Similarly, the eu-west-1 consumer would need to set a higher
> > > > priority
> > > > > level for the eu-west-1 topic.
> > > > >
> > > > > ## Goal
> > > > >
> > > > > Update the Java client API to allow the configuration of different
> > > > priority
> > > > > levels for different topics.
> > > > >
> > > > > Do so in such a way that supports the addition of other topic
> > specific
> > > > > configuration options or overrides in the future.
> > > > >
> > > > > Issues will be created to track feature parity in the other client
> > > > > implementations for this PIP.
> > > > >
> > > > > ## API Changes
> > > > >
> > > > > In pulsar-client-api, update `ConsumerBuilder` to include two new
> > > > methods:
> > > > >
> > > > > ```java
> > > > > public interface ConsumerBuilder<T> extends Cloneable {
> > > > >     ...
> > > > >
> > > > >     TopicConsumerBuilder<T> topicConfiguration(String
> > > > topicNameOrPattern);
> > > > >
> > > > >     ConsumerBuilder<T> topicConfiguration(String
> topicNameOrPattern,
> > > > >             java.util.function.Consumer<TopicConsumerBuilder<T>>
> > > > > builderConsumer);
> > > > > }
> > > > > ```
> > > > >
> > > > > Create a new interface:
> > > > >
> > > > > ```java
> > > > > public interface TopicConsumerBuilder<T> {
> > > > >     TopicConsumerBuilder<T> priorityLevel(int priorityLevel);
> > > > >
> > > > >     ConsumerBuilder<T> build();
> > > > > }
> > > > > ```
> > > > >
> > > > > In pulsar-client-original, update `ConsumerConfigurationData` to
> > include
> > > > a
> > > > > new field:
> > > > >
> > > > > ```java
> > > > > @Data
> > > > > public class ConsumerConfigurationData<T> implements Serializable,
> > > > > Cloneable {
> > > > >     ...
> > > > >
> > > > >     private List<TopicConsumerConfigurationData>
> topicConfigurations
> > =
> > > > new
> > > > > ArrayList<>();
> > > > > }
> > > > > ```
> > > > >
> > > > > Create a topic configuration class:
> > > > >
> > > > > ```java
> > > > > @Data
> > > > > public class TopicConsumerConfigurationData implements
> Serializable {
> > > > >     private static final long serialVersionUID = 1L;
> > > > >
> > > > >     private String topicNameOrPattern;
> > > > >     private int priorityLevel;
> > > > >
> > > > >     public boolean matchesTopicName(String topicName) {
> > > > >         return
> > > > > Pattern.compile(topicNameOrPattern).matcher(topicName).matches();
> > > > >     }
> > > > >
> > > > >     public static TopicConsumerConfigurationData of(String
> > > > > topicNameOrPattern,
> > > > >             ConsumerConfigurationData<?> conf) {
> > > > >         return new
> TopicConsumerConfigurationData(topicNameOrPattern,
> > > > > conf.getPriorityLevel());
> > > > >     }
> > > > > }
> > > > > ```
> > > > >
> > > > > Then, in `ConsumerImpl` the appropriate topic configuration can be
> > > > selected
> > > > > based on the topic being subscribed to. Since the topic
> > configuration is
> > > > > effectively keyed by a topic name or pattern, it’s possible for the
> > user
> > > > to
> > > > > be
> > > > > able configure multiple topic configurations that match the same
> > concrete
> > > > > topic
> > > > > name. In this case the first topic name match should be selected.
> > > > >
> > > > > ```java
> > > > > TopicConsumerConfigurationData getMatchingTopicConfiguration(String
> > > > > topicName,
> > > > >         ConsumerConfigurationData conf) {
> > > > >     return topicConfigurations.stream()
> > > > >         .filter(topicConf -> topicConf.matchesTopicName(topicName))
> > > > >         .findFirst()
> > > > >         .orElseGet(() ->
> TopicConsumerConfigurationData.of(topicName,
> > > > > conf));
> > > > > }
> > > > > ```
> > > > >
> > > > > Example Usage:
> > > > >
> > > > > ```java
> > > > > pulsarClient.newConsumer()
> > > > >     .topicsPattern("events.*")
> > > > >     .priorityLevel(1)
> > > > >     .topicConfiguration(".*us-east-1", b -> b.priorityLevel(0))
> > > > >     .subscribe();
> > > > > ```
> > > > >
> > > > > or
> > > > >
> > > > > ```java
> > > > > pulsarClient.newConsumer()
> > > > >     .topicsPattern("events.*")
> > > > >     .priorityLevel(1)
> > > > >     .topicConfiguration(".*us-east-1")
> > > > >         .priorityLevel(0)
> > > > >         .build()
> > > > >     .subscribe();
> > > > > ```
> > > > >
> > > > > ## Rejected Alternatives
> > > > >
> > > > > * Extend the existing `ConsumerBuilder` rather than introducing a
> > nested,
> > > > > topic specific builder class.
> > > > >
> > > > > Rejection reason: Does not provide a clear API to discover and
> extend
> > > > other
> > > > > topic specific configuration options and overrides.
> > > > >
> > > > > ```java
> > > > > public interface ConsumerBuilder<T> extends Cloneable {
> > > > >     ...
> > > > >
> > > > >     ConsumerBuilder<T> topicPriorityLevel(String
> topicNameOrPattern,
> > int
> > > > > priorityLevel);
> > > > > }
> > > > > ```
> > > > >
> > > > > Example usage:
> > > > >
> > > > > ```java
> > > > > pulsarClient.newConsumer()
> > > > >     .topicsPattern("events.*")
> > > > >     .priorityLevel(1)
> > > > >     .topicPriorityLevel(".*us-east-1", 0)
> > > > >     .subscribe();
> > > > > ```
> > > > >
> > > > > * Provide a configurator interface to configure options and
> > overrides at
> > > > > runtime
> > > > >
> > > > > Rejection reason: Not compatible with `ConsumerBuilder.loadConf`.
> > > > >
> > > > > ```java
> > > > > @Data
> > > > > class TopicConsumerConfigurationData {
> > > > >     private int priorityLevel;
> > > > > }
> > > > > ```
> > > > >
> > > > > ```java
> > > > > interface TopicConsumerConfigurator extends Serializable {
> > > > >    void configure(String topicName, TopicConsumerConfigurationData
> > > > > topicConf);
> > > > > }
> > > > > ```
> > > > >
> > > > > ```java
> > > > > public interface ConsumerBuilder<T> extends Cloneable {
> > > > >     ...
> > > > >
> > > > >     ConsumerBuilder<T> topicConfigurator(TopicConsumerConfigurator
> > > > > configurator);
> > > > > }
> > > > > ```
> > > > >
> > > > > Example usage:
> > > > >
> > > > > ```java
> > > > > pulsarClient.newConsumer()
> > > > >     .topicsPattern("events.*")
> > > > >     .priorityLevel(1)
> > > > >     .topicConfigurator((topicName, topicConf) -> {
> > > > >         if (topicName.endsWith("us-east-1") {
> > > > >             topicConf.setPriorityLevel(0);
> > > > >         }
> > > > >     })
> > > > >     .subscribe();
> > > > > ```
> > > >
> >
>

Reply via email to