Hi Joe and Michael, I think I misunderstood what you replied before. Now I understand and explain it again.
Besides the reasons what Asaf mentioned above, there are also some limits for using topic list watcher. For example the `topicsPattern.pattern` must less that `maxSubscriptionPatternLeng` [0]. If the consumer subscribes multi partitioned-topics, the `topicsPattern.pattern` maybe very long. So I think that it's better to have a separate notification implementation for partition update. [0] https://github.com/apache/pulsar/blob/5d6932137d76d544f939bef27df25f61b4a4d00d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java#L115-L126 Thanks, Xiaoyu Hou houxiaoyu <anonhx...@gmail.com> 于2023年2月27日周一 10:56写道: > Hi Michael, > > > I think we just need the client to "subscribe" to a topic notification > for > > "<topic-name>-partition-[0-9]+" to eliminate the polling > > If pulsar users want to pub/sub a partitioned-topic, I think most of the > users would like to create a simple producer or consumer like following: > ``` > Producer<byte[]> producer = client.newProducer().topic(topic).create(); > producer.sendAsync(msg); > ``` > ``` > client.newConsumer() > .topic(topic) > .subscriptionName(subscription) > .subscribe(); > ``` > I think there is no reason for users to use `topicsPattern` if a pulsar > just wants to subscribe a partitioned-topic. In addition, `topicsPattern` > couldn't be used for producers. > > So I think PIP-145 [0] will benefit for regex subscriptions. And this PIP > [1] will benefit for the common partitioned-topic pub/sub scenario. > > [0] https://github.com/apache/pulsar/issues/14505 > [1] https://github.com/apache/pulsar/issues/19596 > > Thanks > Xiaoyu Hou > > Michael Marshall <mmarsh...@apache.org> 于2023年2月25日周六 01:29写道: > >> > Just the way to implements partitioned-topic metadata >> > notification mechanism is much like notifications on regex sub changes >> >> Why do we need a separate notification implementation? The regex >> subscription feature is about discovering topics (not subscriptions) >> that match a regular expression. As Joe mentioned, I think we just >> need the client to "subscribe" to a topic notification for >> "<topic-name>-partition-[0-9]+" to eliminate the polling. >> >> Building on PIP 145, the work for this PIP would be in implementing a >> different `TopicsChangedListener` [1] so that the result of an added >> topic is to add a producer/consumer to the new partition. >> >> I support removing polling in our streaming platform, but I'd prefer >> to limit the number of notification systems we implement. >> >> Thanks, >> Michael >> >> [0] https://github.com/apache/pulsar/pull/16062 >> [1] >> https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java#L169-L175 >> >> >> >> On Fri, Feb 24, 2023 at 1:57 AM houxiaoyu <anonhx...@gmail.com> wrote: >> > >> > Hi Joe, >> > >> > When we use PartitionedProducerImpl or MultiTopicsConsumerImpl, there >> is a >> > poll task to fetch the metadata of the partitioned-topic regularly for >> the >> > number of partitions updated. This PIP wants to use a >> > notification mechanism to replace the metadata poll task. >> > >> > Just the way to implements partitioned-topic metadata >> > notification mechanism is much like notifications on regex sub changes >> > >> > Joe F <joefranc...@gmail.com> 于2023年2月24日周五 13:37写道: >> > >> > > Why is this needed when we have notifications on regex sub changes? >> Aren't >> > > the partition names a well-defined regex? >> > > >> > > Joe >> > > >> > > On Thu, Feb 23, 2023 at 8:52 PM houxiaoyu <anonhx...@gmail.com> >> wrote: >> > > >> > > > Hi Asaf, >> > > > thanks for your reminder. >> > > > >> > > > ## Changing >> > > > I have updated the following changes to make sure the notification >> > > arrived >> > > > successfully: >> > > > 1. The watch success response `CommandWatchPartitionUpdateSuccess` >> will >> > > > contain all the concerned topics of this watcher >> > > > 2. The notification `CommandPartitionUpdate` will always contain >> all the >> > > > concerned topics of this watcher. >> > > > 3. The notification `CommandPartitionUpdate`contains a monotonically >> > > > increased version. >> > > > 4. A map >> `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/, >> > > > Pair<version, long/*timestamp*/>>` will keep track of the updating >> > > > 5. A timer will check the updating timeout through `inFlightUpdate` >> > > > 6. The client acks `CommandPartitionUpdateResult` to broker when it >> > > > finishes updating. >> > > > >> > > > ## Details >> > > > >> > > > The following mechanism could make sure the newest notification >> arrived >> > > > successfully, copying the description from GH: >> > > > >> > > > A new class, `org.apache.pulsar.PartitonUpdateWatcherService` will >> keep >> > > > track of watchers and will listen to the changes in the metadata. >> > > Whenever >> > > > a topic partition updates it checks if any watchers should be >> notified >> > > and >> > > > sends an update for all topics the watcher concerns through the >> > > ServerCnx. >> > > > Then we will record this request into a map, >> > > > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/, >> > > Pair<version, >> > > > long/*timestamp*/>>`. A timer will check this update timeout >> through >> > > > inFlightUpdate . We will query all the concerned topics's >> partition if >> > > > this watcher has sent an update timeout and will resend it. >> > > > >> > > > The client acks `CommandPartitionUpdateResult` to broker when it >> finishes >> > > > updating. The broker handle `CommandPartitionUpdateResult` request: >> > > > - If CommandPartitionUpdateResult#version < >> > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, >> > > broker >> > > > ignores this ack. >> > > > - If CommandPartitionUpdateResult#version == >> > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version >> > > > - If CommandPartitionUpdateResult#success is true, broker just >> > > removes >> > > > the watcherID from inFlightUpdate. >> > > > - If CommandPartitionUpdateResult#success is false, broker >> removes >> > > the >> > > > watcherId from inFlightUpdate, and queries all the concerned >> topics's >> > > > partition and resend. >> > > > - If CommandPartitionUpdateResult#version > >> > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, >> this >> > > > should not happen. >> > > > >> > > > ## Edge cases >> > > > - Broker restarts or crashes >> > > > Client will reconnect to another broker, broker responses >> > > > `CommandWatchPartitionUpdateSuccess` with watcher concerned topics's >> > > > partitions. We will call `PartitionsUpdateListener` if the >> connection >> > > > opens. >> > > > - Client acks fail or timeout >> > > > Broker will resend the watcher concerned topics's partitions either >> > > client >> > > > acks fail or acks timeout. >> > > > - Partition updates before client acks. >> > > > `CommandPartitionUpdate#version` monotonically increases every time >> it is >> > > > updated. If Partition updates before client acks, a greater version >> will >> > > be >> > > > put into `PartitonUpdateWatcherService#inFlightUpdate`. The >> previous >> > > acks >> > > > will be ignored because the version is less than the current >> version. >> > > > >> > > > >> > > > Asaf Mesika <asaf.mes...@gmail.com> 于2023年2月22日周三 21:33写道: >> > > > >> > > > > How about edge cases? >> > > > > In Andra's PIP he took into account cases where updates were >> lost, so >> > > he >> > > > > created a secondary poll. Not saying it's the best situation for >> your >> > > > case >> > > > > of course. >> > > > > I'm saying that when a broker sends an update >> CommandPartitionUpdate, >> > > how >> > > > > do you know it arrived successfully? From my memory, there is no >> ACK in >> > > > the >> > > > > protocol, saying "I'm the client, I got the update successfully" >> and >> > > only >> > > > > then it removed the "dirty" flag for that topic, for this watcher >> ID. >> > > > > >> > > > > Are there any other edge cases we can have? Let's be exhaustive. >> > > > > >> > > > > >> > > > > >> > > > > On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu <anonhx...@gmail.com> >> wrote: >> > > > > >> > > > > > Thanks for your great suggestion Enrico. >> > > > > > >> > > > > > I agreed with you. It's more reasonable to add a >> > > > > > `supports_partition_update_watchers` in `FeatureFlags` to >> detect >> > > that >> > > > > the >> > > > > > connected broker supporting this feature , and add a new broker >> > > > > > configuration property `enableNotificationForPartitionUpdate` >> with >> > > > > default >> > > > > > value true, which is much like PIP-145. >> > > > > > >> > > > > > I have updated the descriptions. >> > > > > > >> > > > > > Enrico Olivelli <eolive...@gmail.com> 于2023年2月22日周三 17:26写道: >> > > > > > >> > > > > > > I support this proposal. >> > > > > > > Coping here my comments from GH: >> > > > > > > >> > > > > > > can't we enable this by default in case we detect that the >> > > connected >> > > > > > > Broker supports it ? >> > > > > > > I can't find any reason for not using this mechanism if it is >> > > > > available. >> > > > > > > >> > > > > > > Maybe we can set the default to "true" and allow users to >> disable >> > > it >> > > > > > > in case it impacts their systems in an unwanted way. >> > > > > > > >> > > > > > > Maybe It would be useful to have a way to disable the >> mechanism on >> > > > the >> > > > > > > broker side as well >> > > > > > > >> > > > > > > Enrico >> > > > > > > >> > > > > > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu >> > > > > > > <anonhx...@gmail.com> ha scritto: >> > > > > > > > >> > > > > > > > Hi Pulsar community: >> > > > > > > > >> > > > > > > > I opened a PIP to discuss "Notifications for partitions >> update" >> > > > > > > > >> > > > > > > > ### Motivation >> > > > > > > > >> > > > > > > > Pulsar client will poll brokers at fix time for checking the >> > > > > partitions >> > > > > > > > update if we publish/subscribe the partitioned topics with >> > > > > > > > `autoUpdatePartitions` as true. This causes unnecessary >> load for >> > > > > both >> > > > > > > > clients and brokers since most of the time the number of >> > > partitions >> > > > > > will >> > > > > > > > not change. In addition polling introduces latency in >> partitions >> > > > > update >> > > > > > > > which is specified by `autoUpdatePartitionsInterval`. >> > > > > > > > This PIP would like to introduce a notification mechanism >> for >> > > > > partition >> > > > > > > > update, which is much like PIP-145 for regex subscriptions >> > > > > > > > https://github.com/apache/pulsar/issues/14505. >> > > > > > > > >> > > > > > > > For more details, please read the PIP at: >> > > > > > > > https://github.com/apache/pulsar/issues/19596 >> > > > > > > > Looking forward to hearing your thoughts. >> > > > > > > > >> > > > > > > > Thanks, >> > > > > > > > Xiaoyu Hou >> > > > > > > > ---- >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> >