> Just the way to implements partitioned-topic metadata > notification mechanism is much like notifications on regex sub changes
Why do we need a separate notification implementation? The regex subscription feature is about discovering topics (not subscriptions) that match a regular expression. As Joe mentioned, I think we just need the client to "subscribe" to a topic notification for "<topic-name>-partition-[0-9]+" to eliminate the polling. Building on PIP 145, the work for this PIP would be in implementing a different `TopicsChangedListener` [1] so that the result of an added topic is to add a producer/consumer to the new partition. I support removing polling in our streaming platform, but I'd prefer to limit the number of notification systems we implement. Thanks, Michael [0] https://github.com/apache/pulsar/pull/16062 [1] https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java#L169-L175 On Fri, Feb 24, 2023 at 1:57 AM houxiaoyu <anonhx...@gmail.com> wrote: > > Hi Joe, > > When we use PartitionedProducerImpl or MultiTopicsConsumerImpl, there is a > poll task to fetch the metadata of the partitioned-topic regularly for the > number of partitions updated. This PIP wants to use a > notification mechanism to replace the metadata poll task. > > Just the way to implements partitioned-topic metadata > notification mechanism is much like notifications on regex sub changes > > Joe F <joefranc...@gmail.com> 于2023年2月24日周五 13:37写道: > > > Why is this needed when we have notifications on regex sub changes? Aren't > > the partition names a well-defined regex? > > > > Joe > > > > On Thu, Feb 23, 2023 at 8:52 PM houxiaoyu <anonhx...@gmail.com> wrote: > > > > > Hi Asaf, > > > thanks for your reminder. > > > > > > ## Changing > > > I have updated the following changes to make sure the notification > > arrived > > > successfully: > > > 1. The watch success response `CommandWatchPartitionUpdateSuccess` will > > > contain all the concerned topics of this watcher > > > 2. The notification `CommandPartitionUpdate` will always contain all the > > > concerned topics of this watcher. > > > 3. The notification `CommandPartitionUpdate`contains a monotonically > > > increased version. > > > 4. A map `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/, > > > Pair<version, long/*timestamp*/>>` will keep track of the updating > > > 5. A timer will check the updating timeout through `inFlightUpdate` > > > 6. The client acks `CommandPartitionUpdateResult` to broker when it > > > finishes updating. > > > > > > ## Details > > > > > > The following mechanism could make sure the newest notification arrived > > > successfully, copying the description from GH: > > > > > > A new class, `org.apache.pulsar.PartitonUpdateWatcherService` will keep > > > track of watchers and will listen to the changes in the metadata. > > Whenever > > > a topic partition updates it checks if any watchers should be notified > > and > > > sends an update for all topics the watcher concerns through the > > ServerCnx. > > > Then we will record this request into a map, > > > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/, > > Pair<version, > > > long/*timestamp*/>>`. A timer will check this update timeout through > > > inFlightUpdate . We will query all the concerned topics's partition if > > > this watcher has sent an update timeout and will resend it. > > > > > > The client acks `CommandPartitionUpdateResult` to broker when it finishes > > > updating. The broker handle `CommandPartitionUpdateResult` request: > > > - If CommandPartitionUpdateResult#version < > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, > > broker > > > ignores this ack. > > > - If CommandPartitionUpdateResult#version == > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version > > > - If CommandPartitionUpdateResult#success is true, broker just > > removes > > > the watcherID from inFlightUpdate. > > > - If CommandPartitionUpdateResult#success is false, broker removes > > the > > > watcherId from inFlightUpdate, and queries all the concerned topics's > > > partition and resend. > > > - If CommandPartitionUpdateResult#version > > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, this > > > should not happen. > > > > > > ## Edge cases > > > - Broker restarts or crashes > > > Client will reconnect to another broker, broker responses > > > `CommandWatchPartitionUpdateSuccess` with watcher concerned topics's > > > partitions. We will call `PartitionsUpdateListener` if the connection > > > opens. > > > - Client acks fail or timeout > > > Broker will resend the watcher concerned topics's partitions either > > client > > > acks fail or acks timeout. > > > - Partition updates before client acks. > > > `CommandPartitionUpdate#version` monotonically increases every time it is > > > updated. If Partition updates before client acks, a greater version will > > be > > > put into `PartitonUpdateWatcherService#inFlightUpdate`. The previous > > acks > > > will be ignored because the version is less than the current version. > > > > > > > > > Asaf Mesika <asaf.mes...@gmail.com> 于2023年2月22日周三 21:33写道: > > > > > > > How about edge cases? > > > > In Andra's PIP he took into account cases where updates were lost, so > > he > > > > created a secondary poll. Not saying it's the best situation for your > > > case > > > > of course. > > > > I'm saying that when a broker sends an update CommandPartitionUpdate, > > how > > > > do you know it arrived successfully? From my memory, there is no ACK in > > > the > > > > protocol, saying "I'm the client, I got the update successfully" and > > only > > > > then it removed the "dirty" flag for that topic, for this watcher ID. > > > > > > > > Are there any other edge cases we can have? Let's be exhaustive. > > > > > > > > > > > > > > > > On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu <anonhx...@gmail.com> wrote: > > > > > > > > > Thanks for your great suggestion Enrico. > > > > > > > > > > I agreed with you. It's more reasonable to add a > > > > > `supports_partition_update_watchers` in `FeatureFlags` to detect > > that > > > > the > > > > > connected broker supporting this feature , and add a new broker > > > > > configuration property `enableNotificationForPartitionUpdate` with > > > > default > > > > > value true, which is much like PIP-145. > > > > > > > > > > I have updated the descriptions. > > > > > > > > > > Enrico Olivelli <eolive...@gmail.com> 于2023年2月22日周三 17:26写道: > > > > > > > > > > > I support this proposal. > > > > > > Coping here my comments from GH: > > > > > > > > > > > > can't we enable this by default in case we detect that the > > connected > > > > > > Broker supports it ? > > > > > > I can't find any reason for not using this mechanism if it is > > > > available. > > > > > > > > > > > > Maybe we can set the default to "true" and allow users to disable > > it > > > > > > in case it impacts their systems in an unwanted way. > > > > > > > > > > > > Maybe It would be useful to have a way to disable the mechanism on > > > the > > > > > > broker side as well > > > > > > > > > > > > Enrico > > > > > > > > > > > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu > > > > > > <anonhx...@gmail.com> ha scritto: > > > > > > > > > > > > > > Hi Pulsar community: > > > > > > > > > > > > > > I opened a PIP to discuss "Notifications for partitions update" > > > > > > > > > > > > > > ### Motivation > > > > > > > > > > > > > > Pulsar client will poll brokers at fix time for checking the > > > > partitions > > > > > > > update if we publish/subscribe the partitioned topics with > > > > > > > `autoUpdatePartitions` as true. This causes unnecessary load for > > > > both > > > > > > > clients and brokers since most of the time the number of > > partitions > > > > > will > > > > > > > not change. In addition polling introduces latency in partitions > > > > update > > > > > > > which is specified by `autoUpdatePartitionsInterval`. > > > > > > > This PIP would like to introduce a notification mechanism for > > > > partition > > > > > > > update, which is much like PIP-145 for regex subscriptions > > > > > > > https://github.com/apache/pulsar/issues/14505. > > > > > > > > > > > > > > For more details, please read the PIP at: > > > > > > > https://github.com/apache/pulsar/issues/19596 > > > > > > > Looking forward to hearing your thoughts. > > > > > > > > > > > > > > Thanks, > > > > > > > Xiaoyu Hou > > > > > > > ---- > > > > > > > > > > > > > > > > > > > >