Adding a MultiTopicConsumerHandle is indeed a brilliant way. Not only can it implement the addition and removal of certain consumers within it, but perhaps it could also allow for other operations on individual consumers within the MultiTopicConsumer? Such as pause(), resume(), because I see there are pausedConsumers in the MultiTopicConsumerImpl.
The MultiTopicConsumer currently lacks the functionality to manage individual consumers. Creating a MultiTopicConsumerHandle could greatly expand and manage individual consumers within a multi-topic consumer. I think I might try proposing a PIP. Let me first explore what other functionalities can be added to the handler. Thanks, sinan Lari Hotari <lhot...@apache.org>于2025年1月5日 周日04:33写道: > Thanks for sharing details about the broader context of the use case. I > think we could start drafting ideas of what the API solution could be. One > of the first things to address in designing APIs is to focus on the naming > of concepts. Concepts should be clear and not conflicting with other names > or concepts used in the API. After there are names for things, it's easier > to discuss and proceed in design. > > You proposed calling the new methods "dynamicUnsubscribe" and > "dynamicSubscribe". One of the problems with "dynamicUnsubscribe" is that > it's most likely not an "unsubscribe", but more like closing the underlying > consumer in the multi-topic consumer for a single topic. > Perhaps calling the methods "closeTopicConsumer" and "addTopicConsumer" > would be better names to address this conflict in concepts? > > There would also need to be ways to find out what topics are being > consumed. There might need to be a way to have an event interface > (listener) besides having a way to explicitly ask for the current topics > that are consumed. > > There are multiple patterns of how this interface could be accessed. One > way would be to make the multi-topic consumer instances implement this > interface and the client code could use instanceof to check if the consumer > implements the interface. Another choice is to have a separate method for > getting the interface handle. > > Finding a name for this interface would also be needed. I wonder if > MultiTopicConsumerHandle would make any sense. It's definitely a low level > interface. That's why I wouldn't make the consumer directly implement the > interface, but instead have a separate method for requesting an "extension > interface". One possible generic method signature could be > <T> Optional<T> public getExtension(Class<T> extensionInterface) > For example, this could be called in this way: > Optional<MultiTopicConsumerHandle> multiTopicConsumerHandler = > consumer.getExtension(MultiTopicConsumerHandle.class); > This would return an `Optional.empty()` if the "extension interface" isn't > available for the consumer. > The benefit of the "extension interface" approach is that it could be > easier to add experimental features in this way without breaking or > cluttering the main interface for users which don't need the functionality > at all. > > Although we are thinking far ahead about the solution, it's also good to > continue learning more about the problem space so that we could eventually > decide whether this is functionality that we would be willing to support in > the Pulsar Java client API in the future. > > -Lari > > On 2025/01/04 09:52:07 SiNan Liu wrote: > > I would like to momentarily set aside the issue of topic processing speed > > disparities causing bottlenecks and instead focus on enhancing our > dynamic > > subscription management capabilities. > > > > > > Currently, when a consumer subscribes to topics using a pattern, any > > modification to this pattern necessitates shutting down the existing > > consumer and initializing a new one with the updated pattern. > > > > > > 1. There are instances where we might want to temporarily or permanently > > unsubscribe from specific topics: > > > > > > > > • For temporary adjustments, our dynamic subscription management > > API can facilitate on-the-fly removal of selected topics. > > > > > > • If it is permanent, the user can temporarily record the > modified > > pattern but not remove it by closing and restarting. When the user > creates > > other new consumers or restarts this consumer, the new pattern will be > > used. This will not affect the consumption of unrelated topics due to > > pattern modification. > > > > > > 2. Additionally, this functionality allows us to add and consume topics > > with varying name prefixes, with the option to make these adjustments > > temporary or permanent as described above. > > > > The current pull request <https://github.com/apache/pulsar/pull/23794> > > implements the capability to dynamically add or remove subscriptions > within > > `PatternMultiTopicsConsumerImpl`. > > > > > > We should proceed to define a public API for this, it could be structured > > as follows: > > > > > > ```java > > > > /** > > > > * Asynchronously unsubscribe from a set of topics. > > > > * > > > > * This method allows dynamically removing a specified set of topics from > > the consumer's subscription list at runtime. > > > > * The operation is non-blocking and the returned CompletableFuture can > be > > used to track the completion status. > > > > * > > > > * @param topics the set of topics to unsubscribe from > > > > * @return {@link CompletableFuture} to track the operation > > > > */ > > > > CompletableFuture<Void> dynamicUnsubscribe(Set<String> topics); > > > > > > /** > > > > * Asynchronously subscribe to a set of topics. > > > > * > > > > * This method enables dynamically adding a specified set of topics to > the > > consumer's existing subscription list at runtime. > > > > * The operation is non-blocking and the returned CompletableFuture can > be > > used to track the completion status. > > > > * > > > > * @param topics the set of topics to subscribe to > > > > * @return {@link CompletableFuture} to track the operation > > > > */ > > > > CompletableFuture<Void> dynamicSubscribe(Set<String> topics); > > > > ``` > > > > > > Implement it in MultiTopicsConsumerImpl and > PatternMultiTopicsConsumerImpl, > > other implementation classes can simply throw an > > UnsupportedOperationException or do nothing at all. > > > > > > > > Thanks, > > sinan > > > > > > Lari Hotari <lhot...@apache.org> 于2025年1月3日周五 23:15写道: > > > > > Before proceeding with implementation details, it would be helpful to > > > better understand the underlying use cases and requirements. > > > > > > Could you provide more details in this Pulsar dev mailing list thread > > > about: > > > 1. The specific scenarios where topic processing speed differences > create > > > bottlenecks - what metrics or indicators suggest a topic is "slow" and > > > blocking others? > > > 2. An example of how you envision applications would use this dynamic > > > management capability - perhaps some code examples showing the desired > API > > > usage? > > > 3. The specific performance impacts you're seeing in production that > > > motivated this proposal > > > > > > While MultiTopicsConsumerImpl does provide unsubscribe functionality, > we > > > should be cautious about depending on implementation classes rather > than > > > stable public APIs. > > > Instead of directly extending the implementation approach, we might > want > > > to consider: > > > > > > - Defining a clear interface that abstracts the dynamic subscription > > > management capabilities (or some other way to address the use case and > the > > > requirements) > > > - Ensuring the solution works consistently across different consumer > > > implementations (e.g. MultiTopicsConsumerImpl and > > > PatternMultiTopicsConsumerImpl) > > > - Exploring how this functionality could be provided in the Pulsar > client > > > API > > > > > > Would you be able to share a minimal reproducer that demonstrates the > > > performance issues? This would help the community better understand the > > > scope of the problem and evaluate potential solutions. > > > Looking forward to learning more about your use case and collaborating > on > > > finding the right approach. > > > > > > -Lari > > > > > > On 2025/01/03 13:53:28 SiNan Liu wrote: > > > > Hi, Pulsar Community. > > > > > > > > I added a new PR that aims to enhance our management of topic > > > subscriptions > > > > within PatternMultiTopicsConsumer in Pulsar client. This PR > introduces > > > the > > > > ability to dynamically add or remove topics from a regex pattern > > > > subscription, which can significantly improve our system’s > responsiveness > > > > and efficiency. > > > > > > > > When subscribing to topics using regex patterns, it’s possible to > > > encounter > > > > scenarios where some topics process messages slower than others. > This can > > > > lead to significant performance bottlenecks, as slower topics might > block > > > > others. There’s a need to manage such situations dynamically to > ensure > > > > efficient processing and fair resource allocation among topics. > > > > > > > > Related discussions can be found in [this Reddit thread]( > > > > > > > > https://www.reddit.com/r/ApachePulsar/comments/1fssbbn/roundrobin_between_wildcard_topics/ > > > ), > > > > which highlights issues similar to what we aim to address. > > > > > > > > Currently, MultiTopicsConsumerImpl allows unsubscribing from specific > > > > topics manually, as demonstrated here > > > > < > > > > https://github.com/apache/pulsar/blob/5a3a1f169a7f90181bd5c213c8e9f479bc74f0f2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L1246-L1258 > > > > > > > > . > > > > > > > > > > > > *This proposal seeks to integrate similar functionality into > > > > PatternMultiTopicsConsumerImpl, allowing dynamically add or remove > > > > specified topics.* > > > > > > > > > > > > > > > > Thanks, > > > > sinan > > > > > > > > > >