Hi Michael, > I think we just need the client to "subscribe" to a topic notification for > "<topic-name>-partition-[0-9]+" to eliminate the polling
If pulsar users want to pub/sub a partitioned-topic, I think most of the users would like to create a simple producer or consumer like following: ``` Producer<byte[]> producer = client.newProducer().topic(topic).create(); producer.sendAsync(msg); ``` ``` client.newConsumer() .topic(topic) .subscriptionName(subscription) .subscribe(); ``` I think there is no reason for users to use `topicsPattern` if a pulsar just wants to subscribe a partitioned-topic. In addition, `topicsPattern` couldn't be used for producers. So I think PIP-145 [0] will benefit for regex subscriptions. And this PIP [1] will benefit for the common partitioned-topic pub/sub scenario. [0] https://github.com/apache/pulsar/issues/14505 [1] https://github.com/apache/pulsar/issues/19596 Thanks Xiaoyu Hou Michael Marshall <mmarsh...@apache.org> 于2023年2月25日周六 01:29写道: > > Just the way to implements partitioned-topic metadata > > notification mechanism is much like notifications on regex sub changes > > Why do we need a separate notification implementation? The regex > subscription feature is about discovering topics (not subscriptions) > that match a regular expression. As Joe mentioned, I think we just > need the client to "subscribe" to a topic notification for > "<topic-name>-partition-[0-9]+" to eliminate the polling. > > Building on PIP 145, the work for this PIP would be in implementing a > different `TopicsChangedListener` [1] so that the result of an added > topic is to add a producer/consumer to the new partition. > > I support removing polling in our streaming platform, but I'd prefer > to limit the number of notification systems we implement. > > Thanks, > Michael > > [0] https://github.com/apache/pulsar/pull/16062 > [1] > https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java#L169-L175 > > > > On Fri, Feb 24, 2023 at 1:57 AM houxiaoyu <anonhx...@gmail.com> wrote: > > > > Hi Joe, > > > > When we use PartitionedProducerImpl or MultiTopicsConsumerImpl, there > is a > > poll task to fetch the metadata of the partitioned-topic regularly for > the > > number of partitions updated. This PIP wants to use a > > notification mechanism to replace the metadata poll task. > > > > Just the way to implements partitioned-topic metadata > > notification mechanism is much like notifications on regex sub changes > > > > Joe F <joefranc...@gmail.com> 于2023年2月24日周五 13:37写道: > > > > > Why is this needed when we have notifications on regex sub changes? > Aren't > > > the partition names a well-defined regex? > > > > > > Joe > > > > > > On Thu, Feb 23, 2023 at 8:52 PM houxiaoyu <anonhx...@gmail.com> wrote: > > > > > > > Hi Asaf, > > > > thanks for your reminder. > > > > > > > > ## Changing > > > > I have updated the following changes to make sure the notification > > > arrived > > > > successfully: > > > > 1. The watch success response `CommandWatchPartitionUpdateSuccess` > will > > > > contain all the concerned topics of this watcher > > > > 2. The notification `CommandPartitionUpdate` will always contain all > the > > > > concerned topics of this watcher. > > > > 3. The notification `CommandPartitionUpdate`contains a monotonically > > > > increased version. > > > > 4. A map > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/, > > > > Pair<version, long/*timestamp*/>>` will keep track of the updating > > > > 5. A timer will check the updating timeout through `inFlightUpdate` > > > > 6. The client acks `CommandPartitionUpdateResult` to broker when it > > > > finishes updating. > > > > > > > > ## Details > > > > > > > > The following mechanism could make sure the newest notification > arrived > > > > successfully, copying the description from GH: > > > > > > > > A new class, `org.apache.pulsar.PartitonUpdateWatcherService` will > keep > > > > track of watchers and will listen to the changes in the metadata. > > > Whenever > > > > a topic partition updates it checks if any watchers should be > notified > > > and > > > > sends an update for all topics the watcher concerns through the > > > ServerCnx. > > > > Then we will record this request into a map, > > > > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/, > > > Pair<version, > > > > long/*timestamp*/>>`. A timer will check this update timeout through > > > > inFlightUpdate . We will query all the concerned topics's partition > if > > > > this watcher has sent an update timeout and will resend it. > > > > > > > > The client acks `CommandPartitionUpdateResult` to broker when it > finishes > > > > updating. The broker handle `CommandPartitionUpdateResult` request: > > > > - If CommandPartitionUpdateResult#version < > > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, > > > broker > > > > ignores this ack. > > > > - If CommandPartitionUpdateResult#version == > > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version > > > > - If CommandPartitionUpdateResult#success is true, broker just > > > removes > > > > the watcherID from inFlightUpdate. > > > > - If CommandPartitionUpdateResult#success is false, broker > removes > > > the > > > > watcherId from inFlightUpdate, and queries all the concerned topics's > > > > partition and resend. > > > > - If CommandPartitionUpdateResult#version > > > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, > this > > > > should not happen. > > > > > > > > ## Edge cases > > > > - Broker restarts or crashes > > > > Client will reconnect to another broker, broker responses > > > > `CommandWatchPartitionUpdateSuccess` with watcher concerned topics's > > > > partitions. We will call `PartitionsUpdateListener` if the > connection > > > > opens. > > > > - Client acks fail or timeout > > > > Broker will resend the watcher concerned topics's partitions either > > > client > > > > acks fail or acks timeout. > > > > - Partition updates before client acks. > > > > `CommandPartitionUpdate#version` monotonically increases every time > it is > > > > updated. If Partition updates before client acks, a greater version > will > > > be > > > > put into `PartitonUpdateWatcherService#inFlightUpdate`. The previous > > > acks > > > > will be ignored because the version is less than the current version. > > > > > > > > > > > > Asaf Mesika <asaf.mes...@gmail.com> 于2023年2月22日周三 21:33写道: > > > > > > > > > How about edge cases? > > > > > In Andra's PIP he took into account cases where updates were lost, > so > > > he > > > > > created a secondary poll. Not saying it's the best situation for > your > > > > case > > > > > of course. > > > > > I'm saying that when a broker sends an update > CommandPartitionUpdate, > > > how > > > > > do you know it arrived successfully? From my memory, there is no > ACK in > > > > the > > > > > protocol, saying "I'm the client, I got the update successfully" > and > > > only > > > > > then it removed the "dirty" flag for that topic, for this watcher > ID. > > > > > > > > > > Are there any other edge cases we can have? Let's be exhaustive. > > > > > > > > > > > > > > > > > > > > On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu <anonhx...@gmail.com> > wrote: > > > > > > > > > > > Thanks for your great suggestion Enrico. > > > > > > > > > > > > I agreed with you. It's more reasonable to add a > > > > > > `supports_partition_update_watchers` in `FeatureFlags` to > detect > > > that > > > > > the > > > > > > connected broker supporting this feature , and add a new broker > > > > > > configuration property `enableNotificationForPartitionUpdate` > with > > > > > default > > > > > > value true, which is much like PIP-145. > > > > > > > > > > > > I have updated the descriptions. > > > > > > > > > > > > Enrico Olivelli <eolive...@gmail.com> 于2023年2月22日周三 17:26写道: > > > > > > > > > > > > > I support this proposal. > > > > > > > Coping here my comments from GH: > > > > > > > > > > > > > > can't we enable this by default in case we detect that the > > > connected > > > > > > > Broker supports it ? > > > > > > > I can't find any reason for not using this mechanism if it is > > > > > available. > > > > > > > > > > > > > > Maybe we can set the default to "true" and allow users to > disable > > > it > > > > > > > in case it impacts their systems in an unwanted way. > > > > > > > > > > > > > > Maybe It would be useful to have a way to disable the > mechanism on > > > > the > > > > > > > broker side as well > > > > > > > > > > > > > > Enrico > > > > > > > > > > > > > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu > > > > > > > <anonhx...@gmail.com> ha scritto: > > > > > > > > > > > > > > > > Hi Pulsar community: > > > > > > > > > > > > > > > > I opened a PIP to discuss "Notifications for partitions > update" > > > > > > > > > > > > > > > > ### Motivation > > > > > > > > > > > > > > > > Pulsar client will poll brokers at fix time for checking the > > > > > partitions > > > > > > > > update if we publish/subscribe the partitioned topics with > > > > > > > > `autoUpdatePartitions` as true. This causes unnecessary load > for > > > > > both > > > > > > > > clients and brokers since most of the time the number of > > > partitions > > > > > > will > > > > > > > > not change. In addition polling introduces latency in > partitions > > > > > update > > > > > > > > which is specified by `autoUpdatePartitionsInterval`. > > > > > > > > This PIP would like to introduce a notification mechanism for > > > > > partition > > > > > > > > update, which is much like PIP-145 for regex subscriptions > > > > > > > > https://github.com/apache/pulsar/issues/14505. > > > > > > > > > > > > > > > > For more details, please read the PIP at: > > > > > > > > https://github.com/apache/pulsar/issues/19596 > > > > > > > > Looking forward to hearing your thoughts. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Xiaoyu Hou > > > > > > > > ---- > > > > > > > > > > > > > > > > > > > > > > > > > >