Thanks for sharing details about the broader context of the use case. I think 
we could start drafting ideas of what the API solution could be. One of the 
first things to address in designing APIs is to focus on the naming of 
concepts. Concepts should be clear and not conflicting with other names or 
concepts used in the API. After there are names for things, it's easier to 
discuss and proceed in design.

You proposed calling the new methods "dynamicUnsubscribe" and 
"dynamicSubscribe". One of the problems with "dynamicUnsubscribe" is that it's 
most likely not an "unsubscribe", but more like closing the underlying consumer 
in the multi-topic consumer for a single topic.
Perhaps calling the methods "closeTopicConsumer" and "addTopicConsumer" would 
be better names to address this conflict in concepts?

There would also need to be ways to find out what topics are being consumed. 
There might need to be a way to have an event interface (listener) besides 
having a way to explicitly ask for the current topics that are consumed.

There are multiple patterns of how this interface could be accessed. One way 
would be to make the multi-topic consumer instances implement this interface 
and the client code could use instanceof to check if the consumer implements 
the interface. Another choice is to have a separate method for getting the 
interface handle.

Finding a name for this interface would also be needed. I wonder if 
MultiTopicConsumerHandle would make any sense. It's definitely a low level 
interface. That's why I wouldn't make the consumer directly implement the 
interface, but instead have a separate method for requesting an "extension 
interface". One possible generic method signature could be 
<T> Optional<T> public getExtension(Class<T> extensionInterface)
For example, this could be called in this way:
Optional<MultiTopicConsumerHandle> multiTopicConsumerHandler = 
consumer.getExtension(MultiTopicConsumerHandle.class);
This would return an `Optional.empty()` if the "extension interface" isn't 
available for the consumer.
The benefit of the "extension interface" approach is that it could be easier to 
add experimental features in this way without breaking or cluttering the main 
interface for users which don't need the functionality at all.

Although we are thinking far ahead about the solution, it's also good to 
continue learning more about the problem space so that we could eventually 
decide whether this is functionality that we would be willing to support in the 
Pulsar Java client API in the future.

-Lari

On 2025/01/04 09:52:07 SiNan Liu wrote:
> I would like to momentarily set aside the issue of topic processing speed
> disparities causing bottlenecks and instead focus on enhancing our dynamic
> subscription management capabilities.
> 
> 
> Currently, when a consumer subscribes to topics using a pattern, any
> modification to this pattern necessitates shutting down the existing
> consumer and initializing a new one with the updated pattern.
> 
> 
> 1. There are instances where we might want to temporarily or permanently
> unsubscribe from specific topics:
> 
> 
> 
>         • For temporary adjustments, our dynamic subscription management
> API can facilitate on-the-fly removal of selected topics.
> 
> 
>         • If it is permanent, the user can temporarily record the modified
> pattern but not remove it by closing and restarting. When the user creates
> other new consumers or restarts this consumer, the new pattern will be
> used. This will not affect the consumption of unrelated topics due to
> pattern modification.
> 
> 
> 2. Additionally, this functionality allows us to add and consume topics
> with varying name prefixes, with the option to make these adjustments
> temporary or permanent as described above.
> 
> The current pull request <https://github.com/apache/pulsar/pull/23794>
> implements the capability to dynamically add or remove subscriptions within
> `PatternMultiTopicsConsumerImpl`.
> 
> 
> We should proceed to define a public API for this, it could be structured
> as follows:
> 
> 
> ```java
> 
> /**
> 
>  * Asynchronously unsubscribe from a set of topics.
> 
>  *
> 
>  * This method allows dynamically removing a specified set of topics from
> the consumer's subscription list at runtime.
> 
>  * The operation is non-blocking and the returned CompletableFuture can be
> used to track the completion status.
> 
>  *
> 
>  * @param topics the set of topics to unsubscribe from
> 
>  * @return {@link CompletableFuture} to track the operation
> 
>  */
> 
> CompletableFuture<Void> dynamicUnsubscribe(Set<String> topics);
> 
> 
> /**
> 
>  * Asynchronously subscribe to a set of topics.
> 
>  *
> 
>  * This method enables dynamically adding a specified set of topics to the
> consumer's existing subscription list at runtime.
> 
>  * The operation is non-blocking and the returned CompletableFuture can be
> used to track the completion status.
> 
>  *
> 
>  * @param topics the set of topics to subscribe to
> 
>  * @return {@link CompletableFuture} to track the operation
> 
>  */
> 
> CompletableFuture<Void> dynamicSubscribe(Set<String> topics);
> 
> ```
> 
> 
> Implement it in MultiTopicsConsumerImpl and PatternMultiTopicsConsumerImpl,
> other implementation classes can simply throw an
> UnsupportedOperationException or do nothing at all.
> 
> 
> 
> Thanks,
> sinan
> 
> 
> Lari Hotari <lhot...@apache.org> 于2025年1月3日周五 23:15写道:
> 
> > Before proceeding with implementation details, it would be helpful to
> > better understand the underlying use cases and requirements.
> >
> > Could you provide more details in this Pulsar dev mailing list thread
> > about:
> > 1. The specific scenarios where topic processing speed differences create
> > bottlenecks - what metrics or indicators suggest a topic is "slow" and
> > blocking others?
> > 2. An example of how you envision applications would use this dynamic
> > management capability - perhaps some code examples showing the desired API
> > usage?
> > 3. The specific performance impacts you're seeing in production that
> > motivated this proposal
> >
> > While MultiTopicsConsumerImpl does provide unsubscribe functionality, we
> > should be cautious about depending on implementation classes rather than
> > stable public APIs.
> > Instead of directly extending the implementation approach, we might want
> > to consider:
> >
> > - Defining a clear interface that abstracts the dynamic subscription
> > management capabilities (or some other way to address the use case and the
> > requirements)
> > - Ensuring the solution works consistently across different consumer
> > implementations (e.g. MultiTopicsConsumerImpl and
> > PatternMultiTopicsConsumerImpl)
> > - Exploring how this functionality could be provided in the Pulsar client
> > API
> >
> > Would you be able to share a minimal reproducer that demonstrates the
> > performance issues? This would help the community better understand the
> > scope of the problem and evaluate potential solutions.
> > Looking forward to learning more about your use case and collaborating on
> > finding the right approach.
> >
> > -Lari
> >
> > On 2025/01/03 13:53:28 SiNan Liu wrote:
> > > Hi, Pulsar Community.
> > >
> > > I added a new PR that aims to enhance our management of topic
> > subscriptions
> > > within PatternMultiTopicsConsumer in Pulsar client. This PR introduces
> > the
> > > ability to dynamically add or remove topics from a regex pattern
> > > subscription, which can significantly improve our system’s responsiveness
> > > and efficiency.
> > >
> > > When subscribing to topics using regex patterns, it’s possible to
> > encounter
> > > scenarios where some topics process messages slower than others. This can
> > > lead to significant performance bottlenecks, as slower topics might block
> > > others. There’s a need to manage such situations dynamically to ensure
> > > efficient processing and fair resource allocation among topics.
> > >
> > > Related discussions can be found in [this Reddit thread](
> > >
> > https://www.reddit.com/r/ApachePulsar/comments/1fssbbn/roundrobin_between_wildcard_topics/
> > ),
> > > which highlights issues similar to what we aim to address.
> > >
> > > Currently, MultiTopicsConsumerImpl allows unsubscribing from specific
> > > topics manually, as demonstrated here
> > > <
> > https://github.com/apache/pulsar/blob/5a3a1f169a7f90181bd5c213c8e9f479bc74f0f2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L1246-L1258
> > >
> > > .
> > >
> > >
> > > *This proposal seeks to integrate similar functionality into
> > > PatternMultiTopicsConsumerImpl, allowing dynamically add or remove
> > > specified topics.*
> > >
> > >
> > >
> > > Thanks,
> > > sinan
> > >
> >
> 

Reply via email to