Hi, Dave

Thanks for your proposals. Overall looks good to me. Just a minor comment:

What about using the Pattern type to store the compiled Pattern in the
TopicConsumerConfigurationData? Like below:
```
public class TopicConsumerConfigurationData implements Serializable {
    private static final long serialVersionUID = 1L;

    private Pattern topicsPattern;
    private int priorityLevel;
}
```
Just like what we did for the topicsPattern in the
ConsumerConfigurationData. [0]


[0] 
https://github.com/apache/pulsar/blob/bbf2a47867ff54327c1f2940e72f08a44a5dc5f7/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java#L58

Zike Yang



Zike Yang


On Sat, Jul 9, 2022 at 12:10 AM Dave Maughan
<dave.maug...@streamnative.io.invalid> wrote:
>
> Hi Enrico,
>
> Why can't you create multiple independent Consumers ?
> > They will share the same resources (memory pools, thread pools)
>
>
> You can do that but it means the user has to manage the consumption from
> each of the consumer instances manually. This is already solved in the
> existing multi topic consumer. This change adds user convenience - to
> continue consuming from multiple topics in a single consumer instance, but
> allowing them to customise the configuration.
>
> - Dave
>
> On Fri, Jul 8, 2022 at 4:26 PM Enrico Olivelli <eolive...@gmail.com> wrote:
>
> > Dave,
> > Why can't you create multiple independent Consumers ?
> > They will share the same resources (memory pools, thread pools)
> >
> >
> >
> >
> > Enrico
> >
> > Il giorno ven 8 lug 2022 alle ore 15:00 Dave Maughan
> > <dave.maug...@streamnative.io.invalid> ha scritto:
> > >
> > > Hi Pulsar community,
> > >
> > > I've opened a PIP to discuss a new Java client option to allow setting
> > > topic specific consumer priorityLevel.
> > >
> > > Proposal Link: https://github.com/apache/pulsar/issues/16481
> > >
> > > ---
> > >
> > > ## Motivation
> > >
> > > The Pulsar Java consumer supports setting a priority level for priority
> > > message
> > > dispatch in shared subscription consumers and priority assignment in
> > > failover
> > > subscription consumers. See the [ConsumerBuilder.html#priorityLevel(int)
> > > Javadoc](
> > >
> > https://javadoc.io/static/org.apache.pulsar/pulsar-client-api/2.10.1/org/apache/pulsar/client/api/ConsumerBuilder.html#priorityLevel(int)
> > > )
> > > for a detailed functional description. The Pulsar Java consumer also
> > > supports
> > > consuming from multiple topics. However, it is not possible to set a
> > > different
> > > priority level for different topics in the same Consumer instance.
> > >
> > > This behaviour is desirable in some use cases. For example, a consumer
> > > processing region specific topics might wish to configure region
> > stickiness
> > > - A
> > > multi-region application might be consuming from topics events-us-east-1
> > and
> > > events-eu-west-1. Consumers in all regions should be configured to
> > consume
> > > all
> > > topics to ensure data completeness. However, to ensure low latency, the
> > > us-east-1 consumer would need to set a higher priority level for the
> > > us-east-1
> > > topic. Similarly, the eu-west-1 consumer would need to set a higher
> > priority
> > > level for the eu-west-1 topic.
> > >
> > > ## Goal
> > >
> > > Update the Java client API to allow the configuration of different
> > priority
> > > levels for different topics.
> > >
> > > Do so in such a way that supports the addition of other topic specific
> > > configuration options or overrides in the future.
> > >
> > > Issues will be created to track feature parity in the other client
> > > implementations for this PIP.
> > >
> > > ## API Changes
> > >
> > > In pulsar-client-api, update `ConsumerBuilder` to include two new
> > methods:
> > >
> > > ```java
> > > public interface ConsumerBuilder<T> extends Cloneable {
> > >     ...
> > >
> > >     TopicConsumerBuilder<T> topicConfiguration(String
> > topicNameOrPattern);
> > >
> > >     ConsumerBuilder<T> topicConfiguration(String topicNameOrPattern,
> > >             java.util.function.Consumer<TopicConsumerBuilder<T>>
> > > builderConsumer);
> > > }
> > > ```
> > >
> > > Create a new interface:
> > >
> > > ```java
> > > public interface TopicConsumerBuilder<T> {
> > >     TopicConsumerBuilder<T> priorityLevel(int priorityLevel);
> > >
> > >     ConsumerBuilder<T> build();
> > > }
> > > ```
> > >
> > > In pulsar-client-original, update `ConsumerConfigurationData` to include
> > a
> > > new field:
> > >
> > > ```java
> > > @Data
> > > public class ConsumerConfigurationData<T> implements Serializable,
> > > Cloneable {
> > >     ...
> > >
> > >     private List<TopicConsumerConfigurationData> topicConfigurations =
> > new
> > > ArrayList<>();
> > > }
> > > ```
> > >
> > > Create a topic configuration class:
> > >
> > > ```java
> > > @Data
> > > public class TopicConsumerConfigurationData implements Serializable {
> > >     private static final long serialVersionUID = 1L;
> > >
> > >     private String topicNameOrPattern;
> > >     private int priorityLevel;
> > >
> > >     public boolean matchesTopicName(String topicName) {
> > >         return
> > > Pattern.compile(topicNameOrPattern).matcher(topicName).matches();
> > >     }
> > >
> > >     public static TopicConsumerConfigurationData of(String
> > > topicNameOrPattern,
> > >             ConsumerConfigurationData<?> conf) {
> > >         return new TopicConsumerConfigurationData(topicNameOrPattern,
> > > conf.getPriorityLevel());
> > >     }
> > > }
> > > ```
> > >
> > > Then, in `ConsumerImpl` the appropriate topic configuration can be
> > selected
> > > based on the topic being subscribed to. Since the topic configuration is
> > > effectively keyed by a topic name or pattern, it’s possible for the user
> > to
> > > be
> > > able configure multiple topic configurations that match the same concrete
> > > topic
> > > name. In this case the first topic name match should be selected.
> > >
> > > ```java
> > > TopicConsumerConfigurationData getMatchingTopicConfiguration(String
> > > topicName,
> > >         ConsumerConfigurationData conf) {
> > >     return topicConfigurations.stream()
> > >         .filter(topicConf -> topicConf.matchesTopicName(topicName))
> > >         .findFirst()
> > >         .orElseGet(() -> TopicConsumerConfigurationData.of(topicName,
> > > conf));
> > > }
> > > ```
> > >
> > > Example Usage:
> > >
> > > ```java
> > > pulsarClient.newConsumer()
> > >     .topicsPattern("events.*")
> > >     .priorityLevel(1)
> > >     .topicConfiguration(".*us-east-1", b -> b.priorityLevel(0))
> > >     .subscribe();
> > > ```
> > >
> > > or
> > >
> > > ```java
> > > pulsarClient.newConsumer()
> > >     .topicsPattern("events.*")
> > >     .priorityLevel(1)
> > >     .topicConfiguration(".*us-east-1")
> > >         .priorityLevel(0)
> > >         .build()
> > >     .subscribe();
> > > ```
> > >
> > > ## Rejected Alternatives
> > >
> > > * Extend the existing `ConsumerBuilder` rather than introducing a nested,
> > > topic specific builder class.
> > >
> > > Rejection reason: Does not provide a clear API to discover and extend
> > other
> > > topic specific configuration options and overrides.
> > >
> > > ```java
> > > public interface ConsumerBuilder<T> extends Cloneable {
> > >     ...
> > >
> > >     ConsumerBuilder<T> topicPriorityLevel(String topicNameOrPattern, int
> > > priorityLevel);
> > > }
> > > ```
> > >
> > > Example usage:
> > >
> > > ```java
> > > pulsarClient.newConsumer()
> > >     .topicsPattern("events.*")
> > >     .priorityLevel(1)
> > >     .topicPriorityLevel(".*us-east-1", 0)
> > >     .subscribe();
> > > ```
> > >
> > > * Provide a configurator interface to configure options and overrides at
> > > runtime
> > >
> > > Rejection reason: Not compatible with `ConsumerBuilder.loadConf`.
> > >
> > > ```java
> > > @Data
> > > class TopicConsumerConfigurationData {
> > >     private int priorityLevel;
> > > }
> > > ```
> > >
> > > ```java
> > > interface TopicConsumerConfigurator extends Serializable {
> > >    void configure(String topicName, TopicConsumerConfigurationData
> > > topicConf);
> > > }
> > > ```
> > >
> > > ```java
> > > public interface ConsumerBuilder<T> extends Cloneable {
> > >     ...
> > >
> > >     ConsumerBuilder<T> topicConfigurator(TopicConsumerConfigurator
> > > configurator);
> > > }
> > > ```
> > >
> > > Example usage:
> > >
> > > ```java
> > > pulsarClient.newConsumer()
> > >     .topicsPattern("events.*")
> > >     .priorityLevel(1)
> > >     .topicConfigurator((topicName, topicConf) -> {
> > >         if (topicName.endsWith("us-east-1") {
> > >             topicConf.setPriorityLevel(0);
> > >         }
> > >     })
> > >     .subscribe();
> > > ```
> >

Reply via email to