I would like to momentarily set aside the issue of topic processing speed
disparities causing bottlenecks and instead focus on enhancing our dynamic
subscription management capabilities.


Currently, when a consumer subscribes to topics using a pattern, any
modification to this pattern necessitates shutting down the existing
consumer and initializing a new one with the updated pattern.


1. There are instances where we might want to temporarily or permanently
unsubscribe from specific topics:



        • For temporary adjustments, our dynamic subscription management
API can facilitate on-the-fly removal of selected topics.


        • If it is permanent, the user can temporarily record the modified
pattern but not remove it by closing and restarting. When the user creates
other new consumers or restarts this consumer, the new pattern will be
used. This will not affect the consumption of unrelated topics due to
pattern modification.


2. Additionally, this functionality allows us to add and consume topics
with varying name prefixes, with the option to make these adjustments
temporary or permanent as described above.

The current pull request <https://github.com/apache/pulsar/pull/23794>
implements the capability to dynamically add or remove subscriptions within
`PatternMultiTopicsConsumerImpl`.


We should proceed to define a public API for this, it could be structured
as follows:


```java

/**

 * Asynchronously unsubscribe from a set of topics.

 *

 * This method allows dynamically removing a specified set of topics from
the consumer's subscription list at runtime.

 * The operation is non-blocking and the returned CompletableFuture can be
used to track the completion status.

 *

 * @param topics the set of topics to unsubscribe from

 * @return {@link CompletableFuture} to track the operation

 */

CompletableFuture<Void> dynamicUnsubscribe(Set<String> topics);


/**

 * Asynchronously subscribe to a set of topics.

 *

 * This method enables dynamically adding a specified set of topics to the
consumer's existing subscription list at runtime.

 * The operation is non-blocking and the returned CompletableFuture can be
used to track the completion status.

 *

 * @param topics the set of topics to subscribe to

 * @return {@link CompletableFuture} to track the operation

 */

CompletableFuture<Void> dynamicSubscribe(Set<String> topics);

```


Implement it in MultiTopicsConsumerImpl and PatternMultiTopicsConsumerImpl,
other implementation classes can simply throw an
UnsupportedOperationException or do nothing at all.



Thanks,
sinan


Lari Hotari <lhot...@apache.org> 于2025年1月3日周五 23:15写道:

> Before proceeding with implementation details, it would be helpful to
> better understand the underlying use cases and requirements.
>
> Could you provide more details in this Pulsar dev mailing list thread
> about:
> 1. The specific scenarios where topic processing speed differences create
> bottlenecks - what metrics or indicators suggest a topic is "slow" and
> blocking others?
> 2. An example of how you envision applications would use this dynamic
> management capability - perhaps some code examples showing the desired API
> usage?
> 3. The specific performance impacts you're seeing in production that
> motivated this proposal
>
> While MultiTopicsConsumerImpl does provide unsubscribe functionality, we
> should be cautious about depending on implementation classes rather than
> stable public APIs.
> Instead of directly extending the implementation approach, we might want
> to consider:
>
> - Defining a clear interface that abstracts the dynamic subscription
> management capabilities (or some other way to address the use case and the
> requirements)
> - Ensuring the solution works consistently across different consumer
> implementations (e.g. MultiTopicsConsumerImpl and
> PatternMultiTopicsConsumerImpl)
> - Exploring how this functionality could be provided in the Pulsar client
> API
>
> Would you be able to share a minimal reproducer that demonstrates the
> performance issues? This would help the community better understand the
> scope of the problem and evaluate potential solutions.
> Looking forward to learning more about your use case and collaborating on
> finding the right approach.
>
> -Lari
>
> On 2025/01/03 13:53:28 SiNan Liu wrote:
> > Hi, Pulsar Community.
> >
> > I added a new PR that aims to enhance our management of topic
> subscriptions
> > within PatternMultiTopicsConsumer in Pulsar client. This PR introduces
> the
> > ability to dynamically add or remove topics from a regex pattern
> > subscription, which can significantly improve our system’s responsiveness
> > and efficiency.
> >
> > When subscribing to topics using regex patterns, it’s possible to
> encounter
> > scenarios where some topics process messages slower than others. This can
> > lead to significant performance bottlenecks, as slower topics might block
> > others. There’s a need to manage such situations dynamically to ensure
> > efficient processing and fair resource allocation among topics.
> >
> > Related discussions can be found in [this Reddit thread](
> >
> https://www.reddit.com/r/ApachePulsar/comments/1fssbbn/roundrobin_between_wildcard_topics/
> ),
> > which highlights issues similar to what we aim to address.
> >
> > Currently, MultiTopicsConsumerImpl allows unsubscribing from specific
> > topics manually, as demonstrated here
> > <
> https://github.com/apache/pulsar/blob/5a3a1f169a7f90181bd5c213c8e9f479bc74f0f2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L1246-L1258
> >
> > .
> >
> >
> > *This proposal seeks to integrate similar functionality into
> > PatternMultiTopicsConsumerImpl, allowing dynamically add or remove
> > specified topics.*
> >
> >
> >
> > Thanks,
> > sinan
> >
>

Reply via email to