Hi Joe, When we use PartitionedProducerImpl or MultiTopicsConsumerImpl, there is a poll task to fetch the metadata of the partitioned-topic regularly for the number of partitions updated. This PIP wants to use a notification mechanism to replace the metadata poll task.
Just the way to implements partitioned-topic metadata notification mechanism is much like notifications on regex sub changes Joe F <joefranc...@gmail.com> 于2023年2月24日周五 13:37写道: > Why is this needed when we have notifications on regex sub changes? Aren't > the partition names a well-defined regex? > > Joe > > On Thu, Feb 23, 2023 at 8:52 PM houxiaoyu <anonhx...@gmail.com> wrote: > > > Hi Asaf, > > thanks for your reminder. > > > > ## Changing > > I have updated the following changes to make sure the notification > arrived > > successfully: > > 1. The watch success response `CommandWatchPartitionUpdateSuccess` will > > contain all the concerned topics of this watcher > > 2. The notification `CommandPartitionUpdate` will always contain all the > > concerned topics of this watcher. > > 3. The notification `CommandPartitionUpdate`contains a monotonically > > increased version. > > 4. A map `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/, > > Pair<version, long/*timestamp*/>>` will keep track of the updating > > 5. A timer will check the updating timeout through `inFlightUpdate` > > 6. The client acks `CommandPartitionUpdateResult` to broker when it > > finishes updating. > > > > ## Details > > > > The following mechanism could make sure the newest notification arrived > > successfully, copying the description from GH: > > > > A new class, `org.apache.pulsar.PartitonUpdateWatcherService` will keep > > track of watchers and will listen to the changes in the metadata. > Whenever > > a topic partition updates it checks if any watchers should be notified > and > > sends an update for all topics the watcher concerns through the > ServerCnx. > > Then we will record this request into a map, > > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/, > Pair<version, > > long/*timestamp*/>>`. A timer will check this update timeout through > > inFlightUpdate . We will query all the concerned topics's partition if > > this watcher has sent an update timeout and will resend it. > > > > The client acks `CommandPartitionUpdateResult` to broker when it finishes > > updating. The broker handle `CommandPartitionUpdateResult` request: > > - If CommandPartitionUpdateResult#version < > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, > broker > > ignores this ack. > > - If CommandPartitionUpdateResult#version == > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version > > - If CommandPartitionUpdateResult#success is true, broker just > removes > > the watcherID from inFlightUpdate. > > - If CommandPartitionUpdateResult#success is false, broker removes > the > > watcherId from inFlightUpdate, and queries all the concerned topics's > > partition and resend. > > - If CommandPartitionUpdateResult#version > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, this > > should not happen. > > > > ## Edge cases > > - Broker restarts or crashes > > Client will reconnect to another broker, broker responses > > `CommandWatchPartitionUpdateSuccess` with watcher concerned topics's > > partitions. We will call `PartitionsUpdateListener` if the connection > > opens. > > - Client acks fail or timeout > > Broker will resend the watcher concerned topics's partitions either > client > > acks fail or acks timeout. > > - Partition updates before client acks. > > `CommandPartitionUpdate#version` monotonically increases every time it is > > updated. If Partition updates before client acks, a greater version will > be > > put into `PartitonUpdateWatcherService#inFlightUpdate`. The previous > acks > > will be ignored because the version is less than the current version. > > > > > > Asaf Mesika <asaf.mes...@gmail.com> 于2023年2月22日周三 21:33写道: > > > > > How about edge cases? > > > In Andra's PIP he took into account cases where updates were lost, so > he > > > created a secondary poll. Not saying it's the best situation for your > > case > > > of course. > > > I'm saying that when a broker sends an update CommandPartitionUpdate, > how > > > do you know it arrived successfully? From my memory, there is no ACK in > > the > > > protocol, saying "I'm the client, I got the update successfully" and > only > > > then it removed the "dirty" flag for that topic, for this watcher ID. > > > > > > Are there any other edge cases we can have? Let's be exhaustive. > > > > > > > > > > > > On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu <anonhx...@gmail.com> wrote: > > > > > > > Thanks for your great suggestion Enrico. > > > > > > > > I agreed with you. It's more reasonable to add a > > > > `supports_partition_update_watchers` in `FeatureFlags` to detect > that > > > the > > > > connected broker supporting this feature , and add a new broker > > > > configuration property `enableNotificationForPartitionUpdate` with > > > default > > > > value true, which is much like PIP-145. > > > > > > > > I have updated the descriptions. > > > > > > > > Enrico Olivelli <eolive...@gmail.com> 于2023年2月22日周三 17:26写道: > > > > > > > > > I support this proposal. > > > > > Coping here my comments from GH: > > > > > > > > > > can't we enable this by default in case we detect that the > connected > > > > > Broker supports it ? > > > > > I can't find any reason for not using this mechanism if it is > > > available. > > > > > > > > > > Maybe we can set the default to "true" and allow users to disable > it > > > > > in case it impacts their systems in an unwanted way. > > > > > > > > > > Maybe It would be useful to have a way to disable the mechanism on > > the > > > > > broker side as well > > > > > > > > > > Enrico > > > > > > > > > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu > > > > > <anonhx...@gmail.com> ha scritto: > > > > > > > > > > > > Hi Pulsar community: > > > > > > > > > > > > I opened a PIP to discuss "Notifications for partitions update" > > > > > > > > > > > > ### Motivation > > > > > > > > > > > > Pulsar client will poll brokers at fix time for checking the > > > partitions > > > > > > update if we publish/subscribe the partitioned topics with > > > > > > `autoUpdatePartitions` as true. This causes unnecessary load for > > > both > > > > > > clients and brokers since most of the time the number of > partitions > > > > will > > > > > > not change. In addition polling introduces latency in partitions > > > update > > > > > > which is specified by `autoUpdatePartitionsInterval`. > > > > > > This PIP would like to introduce a notification mechanism for > > > partition > > > > > > update, which is much like PIP-145 for regex subscriptions > > > > > > https://github.com/apache/pulsar/issues/14505. > > > > > > > > > > > > For more details, please read the PIP at: > > > > > > https://github.com/apache/pulsar/issues/19596 > > > > > > Looking forward to hearing your thoughts. > > > > > > > > > > > > Thanks, > > > > > > Xiaoyu Hou > > > > > > ---- > > > > > > > > > > > > > > >