Hi Andras Beni, Thanks for your detailed proposal. I agree that the regex consumer is a good candidate for optimization. Were there any alternative solutions you considered when looking to optimize the regex consumer, and if so, will you describe them?
One of my concerns is that this feature only solves the problem of topic discovery for regex consumers. Topic discovery is a generic problem in Pulsar, and any solution we implement for the regex consumer should benefit user applications that also need to discover topics. > A new class, org.apache.pulsar.TopicsListService will keep track > of watchers and will listen to changes in the metadata. I think we should avoid creating a new service to distribute notifications to consumers. Instead, we should consider using a compacted topic to store and distribute topic name information. We could have a system topic in each namespace that contains all of the non-system topics in the namespace. This solution would not expand the Pulsar protocol and would rely on core Pulsar features that are already hardened. Note that the implementation for the producer to the compacted topic of topic names would be nearly identical to the `TopicsListService` class. The main difference would be how changes in metadata are distributed. I concede that my solution does not support broker side message filtering. Given that Pulsar (intentionally) does not support broker side message filtering at this time, I think it is acceptable to skip this optimization in favor of a more generic feature. Do you agree that a compacted topic of topic names could meet the underlying motivation for this feature? Let me know what you think. Thanks, Michael On Tue, Mar 1, 2022 at 4:37 AM Andras Beni <andras.b...@streamnative.io.invalid> wrote: > > Hi everyone, > > I've just created a proposal for improving the performance of regex > subscriptions. > It's available at https://github.com/apache/pulsar/issues/14505 and is > copied below. > Let me know what you think. > > Thanks, > Andras > > Motivation > > Pulsar allows consumers to subscribe to multiple topics by a pattern. When > using this feature, consumers poll brokers for the list of all topics in a > namespace and filter the list on the client side based on the pattern. This > causes unnecessary network load since most of the time only a small > fraction of returned topics match the pattern. In addition polling > introduces latency in processing messages produced to a newly created topic. > Goal > > This PIP proposes three changes to improve performance and decrease network > utilization: > > - Server side filtering of topic list: clients will supply the broker > with a regex pattern when requesting the list of topics for a namespace. > Brokers will apply this pattern when present and only include matching > topics in the response. > - Hashing topic list to skip updates: the feature will introduce a hash > similar in usage to the HTTP ETag header. The broker computes a hash of the > topic list and checks it against the hash the client sends along its > request. This allows for short responses that simply mean "Nothing has > changed". > - Notifications for faster discovery: clients will be able to register > with brokers as observers of the topic list. Brokers will send events to > clients whenever there's a change in the list of matching topics. > > To help compatibility of new clients with older brokers, a new feature flag > will be introduced for this feature. Brokers will return FeatureFlags as > part of the CommandConnected message to let clients know what features they > support. > > First, the feature will be implemented in the broker and the Java client, > but later other clients can also make use of the capability. > API ChangesProtocol Changes > > New fields will be added to existing commands CommandGetTopicsOfNamespace > and CommandGetTopicsOfNamespaceResponse. > > message CommandGetTopicsOfNamespace { > enum Mode { > PERSISTENT = 0; > NON_PERSISTENT = 1; > ALL = 2; > } > required uint64 request_id = 1; > required string namespace = 2; > optional Mode mode = 3 [default = PERSISTENT]; > optional string topics_pattern = 4; > optional string topics_hash = 5; > } > > message CommandGetTopicsOfNamespaceResponse { > required uint64 request_id = 1; > repeated string topics = 2; > // true iff the topic list was filtered by the pattern supplied by > the client > optional bool filtered = 3 [default = false]; > // hash computed from the names of matching topics > optional string topics_hash = 4; > // if false, topics is empty and the list of matching topics has not > changed > optional bool changed = 5 [default = true]; > } > > Clients can register as topic list observers by sending the command > CommandWatchTopicList: > > message CommandWatchTopicList { > required uint64 watcher_id = 1; > required string namespace = 2; > required string topics_pattern = 3; > // Only present when the client reconnects: > optional string topics_hash = 4; > } > > Brokers will respond with a success message containing the watcher ID and > the initial list of topics. > > message CommandWatchTopicListSuccess { > required uint64 watcher_id = 1; > repeated string topic = 2; > required string topics_hash = 3; > } > > When new matching topics are added or deleted, the broker sends an update > along with the hash computed from the whole list of matching topics (i.e. > not just those that are listed in this message). > > message CommandWatchTopicUpdate { > required uint64 watcher_id = 1; > repeated string new_topics = 2; > repeated string deleted_topics = 3; > required string topics_hash = 4; > } > > Clients can unsubscribe the watcher by sending a CommandUnwatchTopicList > message, to which the response is a CommandWatchTopicListSuccess without > any topics. > > message CommandUnwatchTopicList { > required uint64 watcher_id = 1; > } > > When a client connects to a broker it is notified if the broker supports > topic watchers. If not, it will not send CommandWatchTopicList message and > continues to rely on polling. > > message FeatureFlags { > optional bool supports_auth_refresh = 1 [default = false]; > optional bool supports_broker_entry_metadata = 2 [default = false]; > optional bool supports_partial_producer = 3 [default = false]; > optional bool supports_topic_watchers = 4 [default = false]; > } > > message CommandConnected { > required string server_version = 1; > optional int32 protocol_version = 2 [default = 0]; > optional int32 max_message_size = 3; > optional FeatureFlags feature_flags = 4; > } > > HTTP clients will not be changed and will continue using the current > polling behaviour. > Configuration Changes > > A new broker configuration property enableBrokerSideTopicFiltering will be > added with default value true. Setting this to false will disable the > feature. > ImplementationPolling with pattern > > Pulsar clients will poll for topics with the pattern included in the > command. Initially the client doesn't have a topicsHash, but once the > broker has responded, clients will retain the hash and send it with the > next command. If the response from the server contains no hash, the client > will perform client side filtering. Otherwise, clients will consider the > returned list as already filtered. > > The Pulsar broker will check the command for topicsPattern. If there's no > pattern in the message, the broker will respond with all topics of the > namespace. If a pattern is present, the list of topics is filtered and a > hash is computed from the list. If the request contains a topicsHash and it > equals the current hash the response will not contain the list of topics, > only the changed flag is set to false. The pattern in topicsPattern will be > evaluated using java.util.regex.Pattern. > Notifications > > If the broker supports topic list watchers, the client will create such a > watcher by sending CommandWatchTopicList. A new class, > org.apache.pulsar.TopicsListService will keep track of watchers and will > listen to changes in the metadata. Whenever a topic is created it checks if > any watchers should be notified and sends an update through the ServerCnx. > To prevent memory leaks, all watchers will be removed from the > TopicsListService when the ServerCnx's channel becomes inactive. > CompatibilityOld clients with new servers > > When the server receives a message without the new fields, it will not > filter the messages but sends the whole list of topics. These clients will > ignore the new fields in the response. > New clients with old servers > > New fields in the client request will be ignored by the protobuf parser. > The server will send the unfiltered list and omit the new fields (as it > does not know about them). The client will check the response for the new > fields. If the hash is not present, the client filters the result as it > does now. > > The introduction of FeatureFlags in CommandConnected will prevent the > client from sending CommandWatchTopicList messages to brokers that don't > yet support it. > > The same is true when broker-side topic filtering is switched off.