Bump. Are there other concerns or suggestions about this PIP :) Ping @ Michael @Joe @Enrico
Thanks Xiaoyu Hou houxiaoyu <anonhx...@gmail.com> 于2023年2月27日周一 14:10写道: > Hi Joe and Michael, > > I think I misunderstood what you replied before. Now I understand and > explain it again. > > Besides the reasons what Asaf mentioned above, there are also some limits > for using topic list watcher. For example the `topicsPattern.pattern` must > less that `maxSubscriptionPatternLeng` [0]. If the consumer subscribes > multi partitioned-topics, the `topicsPattern.pattern` maybe very long. > > So I think that it's better to have a separate notification implementation > for partition update. > > [0] > https://github.com/apache/pulsar/blob/5d6932137d76d544f939bef27df25f61b4a4d00d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java#L115-L126 > > Thanks, > Xiaoyu Hou > > houxiaoyu <anonhx...@gmail.com> 于2023年2月27日周一 10:56写道: > >> Hi Michael, >> >> > I think we just need the client to "subscribe" to a topic notification >> for >> > "<topic-name>-partition-[0-9]+" to eliminate the polling >> >> If pulsar users want to pub/sub a partitioned-topic, I think most of the >> users would like to create a simple producer or consumer like following: >> ``` >> Producer<byte[]> producer = client.newProducer().topic(topic).create(); >> producer.sendAsync(msg); >> ``` >> ``` >> client.newConsumer() >> .topic(topic) >> .subscriptionName(subscription) >> .subscribe(); >> ``` >> I think there is no reason for users to use `topicsPattern` if a pulsar >> just wants to subscribe a partitioned-topic. In addition, `topicsPattern` >> couldn't be used for producers. >> >> So I think PIP-145 [0] will benefit for regex subscriptions. And this >> PIP [1] will benefit for the common partitioned-topic pub/sub scenario. >> >> [0] https://github.com/apache/pulsar/issues/14505 >> [1] https://github.com/apache/pulsar/issues/19596 >> >> Thanks >> Xiaoyu Hou >> >> Michael Marshall <mmarsh...@apache.org> 于2023年2月25日周六 01:29写道: >> >>> > Just the way to implements partitioned-topic metadata >>> > notification mechanism is much like notifications on regex sub changes >>> >>> Why do we need a separate notification implementation? The regex >>> subscription feature is about discovering topics (not subscriptions) >>> that match a regular expression. As Joe mentioned, I think we just >>> need the client to "subscribe" to a topic notification for >>> "<topic-name>-partition-[0-9]+" to eliminate the polling. >>> >>> Building on PIP 145, the work for this PIP would be in implementing a >>> different `TopicsChangedListener` [1] so that the result of an added >>> topic is to add a producer/consumer to the new partition. >>> >>> I support removing polling in our streaming platform, but I'd prefer >>> to limit the number of notification systems we implement. >>> >>> Thanks, >>> Michael >>> >>> [0] https://github.com/apache/pulsar/pull/16062 >>> [1] >>> https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java#L169-L175 >>> >>> >>> >>> On Fri, Feb 24, 2023 at 1:57 AM houxiaoyu <anonhx...@gmail.com> wrote: >>> > >>> > Hi Joe, >>> > >>> > When we use PartitionedProducerImpl or MultiTopicsConsumerImpl, there >>> is a >>> > poll task to fetch the metadata of the partitioned-topic regularly for >>> the >>> > number of partitions updated. This PIP wants to use a >>> > notification mechanism to replace the metadata poll task. >>> > >>> > Just the way to implements partitioned-topic metadata >>> > notification mechanism is much like notifications on regex sub changes >>> > >>> > Joe F <joefranc...@gmail.com> 于2023年2月24日周五 13:37写道: >>> > >>> > > Why is this needed when we have notifications on regex sub changes? >>> Aren't >>> > > the partition names a well-defined regex? >>> > > >>> > > Joe >>> > > >>> > > On Thu, Feb 23, 2023 at 8:52 PM houxiaoyu <anonhx...@gmail.com> >>> wrote: >>> > > >>> > > > Hi Asaf, >>> > > > thanks for your reminder. >>> > > > >>> > > > ## Changing >>> > > > I have updated the following changes to make sure the notification >>> > > arrived >>> > > > successfully: >>> > > > 1. The watch success response `CommandWatchPartitionUpdateSuccess` >>> will >>> > > > contain all the concerned topics of this watcher >>> > > > 2. The notification `CommandPartitionUpdate` will always contain >>> all the >>> > > > concerned topics of this watcher. >>> > > > 3. The notification `CommandPartitionUpdate`contains a >>> monotonically >>> > > > increased version. >>> > > > 4. A map >>> `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/, >>> > > > Pair<version, long/*timestamp*/>>` will keep track of the updating >>> > > > 5. A timer will check the updating timeout through `inFlightUpdate` >>> > > > 6. The client acks `CommandPartitionUpdateResult` to broker when it >>> > > > finishes updating. >>> > > > >>> > > > ## Details >>> > > > >>> > > > The following mechanism could make sure the newest notification >>> arrived >>> > > > successfully, copying the description from GH: >>> > > > >>> > > > A new class, `org.apache.pulsar.PartitonUpdateWatcherService` will >>> keep >>> > > > track of watchers and will listen to the changes in the metadata. >>> > > Whenever >>> > > > a topic partition updates it checks if any watchers should be >>> notified >>> > > and >>> > > > sends an update for all topics the watcher concerns through the >>> > > ServerCnx. >>> > > > Then we will record this request into a map, >>> > > > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/, >>> > > Pair<version, >>> > > > long/*timestamp*/>>`. A timer will check this update timeout >>> through >>> > > > inFlightUpdate . We will query all the concerned topics's >>> partition if >>> > > > this watcher has sent an update timeout and will resend it. >>> > > > >>> > > > The client acks `CommandPartitionUpdateResult` to broker when it >>> finishes >>> > > > updating. The broker handle `CommandPartitionUpdateResult` >>> request: >>> > > > - If CommandPartitionUpdateResult#version < >>> > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, >>> > > broker >>> > > > ignores this ack. >>> > > > - If CommandPartitionUpdateResult#version == >>> > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version >>> > > > - If CommandPartitionUpdateResult#success is true, broker just >>> > > removes >>> > > > the watcherID from inFlightUpdate. >>> > > > - If CommandPartitionUpdateResult#success is false, broker >>> removes >>> > > the >>> > > > watcherId from inFlightUpdate, and queries all the concerned >>> topics's >>> > > > partition and resend. >>> > > > - If CommandPartitionUpdateResult#version > >>> > > > >>> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, this >>> > > > should not happen. >>> > > > >>> > > > ## Edge cases >>> > > > - Broker restarts or crashes >>> > > > Client will reconnect to another broker, broker responses >>> > > > `CommandWatchPartitionUpdateSuccess` with watcher concerned >>> topics's >>> > > > partitions. We will call `PartitionsUpdateListener` if the >>> connection >>> > > > opens. >>> > > > - Client acks fail or timeout >>> > > > Broker will resend the watcher concerned topics's partitions either >>> > > client >>> > > > acks fail or acks timeout. >>> > > > - Partition updates before client acks. >>> > > > `CommandPartitionUpdate#version` monotonically increases every >>> time it is >>> > > > updated. If Partition updates before client acks, a greater >>> version will >>> > > be >>> > > > put into `PartitonUpdateWatcherService#inFlightUpdate`. The >>> previous >>> > > acks >>> > > > will be ignored because the version is less than the current >>> version. >>> > > > >>> > > > >>> > > > Asaf Mesika <asaf.mes...@gmail.com> 于2023年2月22日周三 21:33写道: >>> > > > >>> > > > > How about edge cases? >>> > > > > In Andra's PIP he took into account cases where updates were >>> lost, so >>> > > he >>> > > > > created a secondary poll. Not saying it's the best situation for >>> your >>> > > > case >>> > > > > of course. >>> > > > > I'm saying that when a broker sends an update >>> CommandPartitionUpdate, >>> > > how >>> > > > > do you know it arrived successfully? From my memory, there is no >>> ACK in >>> > > > the >>> > > > > protocol, saying "I'm the client, I got the update successfully" >>> and >>> > > only >>> > > > > then it removed the "dirty" flag for that topic, for this >>> watcher ID. >>> > > > > >>> > > > > Are there any other edge cases we can have? Let's be exhaustive. >>> > > > > >>> > > > > >>> > > > > >>> > > > > On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu <anonhx...@gmail.com> >>> wrote: >>> > > > > >>> > > > > > Thanks for your great suggestion Enrico. >>> > > > > > >>> > > > > > I agreed with you. It's more reasonable to add a >>> > > > > > `supports_partition_update_watchers` in `FeatureFlags` to >>> detect >>> > > that >>> > > > > the >>> > > > > > connected broker supporting this feature , and add a new broker >>> > > > > > configuration property `enableNotificationForPartitionUpdate` >>> with >>> > > > > default >>> > > > > > value true, which is much like PIP-145. >>> > > > > > >>> > > > > > I have updated the descriptions. >>> > > > > > >>> > > > > > Enrico Olivelli <eolive...@gmail.com> 于2023年2月22日周三 17:26写道: >>> > > > > > >>> > > > > > > I support this proposal. >>> > > > > > > Coping here my comments from GH: >>> > > > > > > >>> > > > > > > can't we enable this by default in case we detect that the >>> > > connected >>> > > > > > > Broker supports it ? >>> > > > > > > I can't find any reason for not using this mechanism if it is >>> > > > > available. >>> > > > > > > >>> > > > > > > Maybe we can set the default to "true" and allow users to >>> disable >>> > > it >>> > > > > > > in case it impacts their systems in an unwanted way. >>> > > > > > > >>> > > > > > > Maybe It would be useful to have a way to disable the >>> mechanism on >>> > > > the >>> > > > > > > broker side as well >>> > > > > > > >>> > > > > > > Enrico >>> > > > > > > >>> > > > > > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu >>> > > > > > > <anonhx...@gmail.com> ha scritto: >>> > > > > > > > >>> > > > > > > > Hi Pulsar community: >>> > > > > > > > >>> > > > > > > > I opened a PIP to discuss "Notifications for partitions >>> update" >>> > > > > > > > >>> > > > > > > > ### Motivation >>> > > > > > > > >>> > > > > > > > Pulsar client will poll brokers at fix time for checking >>> the >>> > > > > partitions >>> > > > > > > > update if we publish/subscribe the partitioned topics with >>> > > > > > > > `autoUpdatePartitions` as true. This causes unnecessary >>> load for >>> > > > > both >>> > > > > > > > clients and brokers since most of the time the number of >>> > > partitions >>> > > > > > will >>> > > > > > > > not change. In addition polling introduces latency in >>> partitions >>> > > > > update >>> > > > > > > > which is specified by `autoUpdatePartitionsInterval`. >>> > > > > > > > This PIP would like to introduce a notification mechanism >>> for >>> > > > > partition >>> > > > > > > > update, which is much like PIP-145 for regex subscriptions >>> > > > > > > > https://github.com/apache/pulsar/issues/14505. >>> > > > > > > > >>> > > > > > > > For more details, please read the PIP at: >>> > > > > > > > https://github.com/apache/pulsar/issues/19596 >>> > > > > > > > Looking forward to hearing your thoughts. >>> > > > > > > > >>> > > > > > > > Thanks, >>> > > > > > > > Xiaoyu Hou >>> > > > > > > > ---- >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> >>