Hi Michael, > is there a reason that we couldn't also use this to improve PIP 145?
The protocol described in this PIP could also be used to improve PIP-145. However I think that it' not a good reason that we use the regex sub watcher to implement the partitioned update watcher because of the other reasons we mentioned above. > Since we know we're using a TCP connection, is it possible to rely on > pulsar's keep alive timeout (the broker and the client each have their > own) to close a connection that isn't responsive? Maybe it could fail on application layer I think, for example, the partitioned update listener run fail unexceptionly. Currently another task will be scheduled if the poll task encounters error in partition auto update timer task. [0] > Regarding the connection, which connection should the client use to send the watch requests? The `PartitionUpdateWatcher` will call `connectionHandler.grabCnx()` to open an connection, which is analogous to `TopicListWatcher`. [1] > do we plan on using metadata storenotifications to trigger the callbacks that trigger notifications sent > to the clients Yes, we will just look up the metadataStore to fetch the count of the partitions and register a watcher to the metadataStore to trigger the count update. > One nit on the protobuf for CommandWatchPartitionUpdateSuccess: > > repeated string topics = 3; > repeated uint32 partitions = 4; > > What do you think about using a repeated message that represents a > pair of a topic and its partition count instead of using two lists? Great. It looks better using a repeated message, I will update the protobuf. > How will we handle the case where a watched topic does not exist? 1. When `PulsarClient` calls `create()` to create a producer or calls `subscribe()` to create a consumer, the client will first get partitioned-topic metadata from broker, [2]. If the topic doesn't exist and `isAllowAutoTopicCreation=true` in broker, the partitioned-topic zk node will auto create with default partition num. 2. After the client getting partitioned-topic metadata successfully, the `PartitionedProducerImpl` will be create if `meta.partition > 0`. `PartitionUpdateWatcher` will be initilized in `PartitionedProducerImpl` constructor. The `PartitionUpdateWatcher` sends command to broker to register a watcher. If any topic in the topicList doesn't exist, the broker will send error to the client and the `PartitionedProducerImpl` will start fail. `MultiTopicsConsumerImpl` will work in the same way. > I want to touch on authorization. A role should have "lookup" > permission to watch for updates on each partitioned topic that it > watches. As a result, if we allow for a request to watch multiple > topics, some might succeed while others fail. How do we handle partial > success? If any topic in the topicList authorizes fail, the broker will send error to the client. The following reasons support this action: 1. Before we sending command to register a partition update watcher, the client should have send the `CommandPartitionedTopicMetadata` and should have the `lookup` permission [3] [4]. 2. Currently if any topic subsrbies fail the consumer wil start faiil. [5] [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L1453-L1461 [1] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java#L67-L81 [2] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java#L365-L371 [3] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L903-L923 [4] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L558-L560 [5] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L171-L193 Thanks, Xiaoyu Hou Michael Marshall <mmarsh...@apache.org> 于2023年3月7日周二 15:43写道: > Thanks for the context Xiaoyu Hou and Asaf. I appreciate the > efficiencies that we can gain by creating a specific implementation > for the partitioned topic use case. I agree that this new notification > system makes sense based on Pulsar's current features, and I have some > implementation questions. > > >- If the broker sends notification and it's lost due network issues, > > you'll only know about it due to the client doing constant polling, using > > its hash to minimize response. > > I see that we implemented an ack mechanism to get around this. I > haven't looked closely, but is there a reason that we couldn't also > use this to improve PIP 145? > > Since we know we're using a TCP connection, is it possible to rely on > pulsar's keep alive timeout (the broker and the client each have their > own) to close a connection that isn't responsive? Then, when the > connection is re-established, the client would get the latest topic > partition count. > > Regarding the connection, which connection should the client use to > send the watch requests? At the moment, the "parent" partitioned topic > does not have an owner, but perhaps it would help this design to make > a single owner for a given partitioned topic. This could trivially be > done using the existing bundle mapping. Then, all watchers for a given > partitioned topic would be hosted on the same broker, which should be > more efficient. I don't think we currently redirect clients to any > specific bundle when creating the metadata for a partitioned topic, > but if we did, then we might be able to remove some edge cases for > notification delivery because a single broker would update the > metadata store and then trigger the notifications to the clients. If > we don't use this implementation, do we plan on using metadata store > notifications to trigger the callbacks that trigger notifications sent > to the clients? > > > - Each time meta-update you'll need to run it through regular > > expression, on all topics hosted on the broker, for any given client. > > That's a lot of CPU. > > - Suggested mechanism mainly cares about the count of partitions, so > > it's a lot more efficient. > > I forgot the partition count was its own piece of metadata that the > broker can watch for. That part definitely makes sense to me. > > One nit on the protobuf for CommandWatchPartitionUpdateSuccess: > > repeated string topics = 3; > repeated uint32 partitions = 4; > > What do you think about using a repeated message that represents a > pair of a topic and its partition count instead of using two lists? > > How will we handle the case where a watched topic does not exist? > > I want to touch on authorization. A role should have "lookup" > permission to watch for updates on each partitioned topic that it > watches. As a result, if we allow for a request to watch multiple > topics, some might succeed while others fail. How do we handle partial > success? > > One interesting detail is that this PIP is essentially aligned with > notifying clients when topic metadata changes while PIP 145 was > related to topic creation itself. An analogous proposal could request > a notification for any topic that gets a new metadata label. I do not > think it is worth considering that case in this design. > > Thanks, > Michael > > [0] https://lists.apache.org/thread/t4cwht08d4mhp3qzoxmqh6tht8l0728r > > On Sun, Mar 5, 2023 at 8:01 PM houxiaoyu <anonhx...@gmail.com> wrote: > > > > Bump. Are there other concerns or suggestions about this PIP :) Ping @ > > Michael @Joe @Enrico > > > > Thanks > > Xiaoyu Hou > > > > houxiaoyu <anonhx...@gmail.com> 于2023年2月27日周一 14:10写道: > > > > > Hi Joe and Michael, > > > > > > I think I misunderstood what you replied before. Now I understand and > > > explain it again. > > > > > > Besides the reasons what Asaf mentioned above, there are also some > limits > > > for using topic list watcher. For example the `topicsPattern.pattern` > must > > > less that `maxSubscriptionPatternLeng` [0]. If the consumer subscribes > > > multi partitioned-topics, the `topicsPattern.pattern` maybe very long. > > > > > > So I think that it's better to have a separate notification > implementation > > > for partition update. > > > > > > [0] > > > > https://github.com/apache/pulsar/blob/5d6932137d76d544f939bef27df25f61b4a4d00d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java#L115-L126 > > > > > > Thanks, > > > Xiaoyu Hou > > > > > > houxiaoyu <anonhx...@gmail.com> 于2023年2月27日周一 10:56写道: > > > > > >> Hi Michael, > > >> > > >> > I think we just need the client to "subscribe" to a topic > notification > > >> for > > >> > "<topic-name>-partition-[0-9]+" to eliminate the polling > > >> > > >> If pulsar users want to pub/sub a partitioned-topic, I think most of > the > > >> users would like to create a simple producer or consumer like > following: > > >> ``` > > >> Producer<byte[]> producer = > client.newProducer().topic(topic).create(); > > >> producer.sendAsync(msg); > > >> ``` > > >> ``` > > >> client.newConsumer() > > >> .topic(topic) > > >> .subscriptionName(subscription) > > >> .subscribe(); > > >> ``` > > >> I think there is no reason for users to use `topicsPattern` if a > pulsar > > >> just wants to subscribe a partitioned-topic. In addition, > `topicsPattern` > > >> couldn't be used for producers. > > >> > > >> So I think PIP-145 [0] will benefit for regex subscriptions. And this > > >> PIP [1] will benefit for the common partitioned-topic pub/sub > scenario. > > >> > > >> [0] https://github.com/apache/pulsar/issues/14505 > > >> [1] https://github.com/apache/pulsar/issues/19596 > > >> > > >> Thanks > > >> Xiaoyu Hou > > >> > > >> Michael Marshall <mmarsh...@apache.org> 于2023年2月25日周六 01:29写道: > > >> > > >>> > Just the way to implements partitioned-topic metadata > > >>> > notification mechanism is much like notifications on regex sub > changes > > >>> > > >>> Why do we need a separate notification implementation? The regex > > >>> subscription feature is about discovering topics (not subscriptions) > > >>> that match a regular expression. As Joe mentioned, I think we just > > >>> need the client to "subscribe" to a topic notification for > > >>> "<topic-name>-partition-[0-9]+" to eliminate the polling. > > >>> > > >>> Building on PIP 145, the work for this PIP would be in implementing a > > >>> different `TopicsChangedListener` [1] so that the result of an added > > >>> topic is to add a producer/consumer to the new partition. > > >>> > > >>> I support removing polling in our streaming platform, but I'd prefer > > >>> to limit the number of notification systems we implement. > > >>> > > >>> Thanks, > > >>> Michael > > >>> > > >>> [0] https://github.com/apache/pulsar/pull/16062 > > >>> [1] > > >>> > https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java#L169-L175 > > >>> > > >>> > > >>> > > >>> On Fri, Feb 24, 2023 at 1:57 AM houxiaoyu <anonhx...@gmail.com> > wrote: > > >>> > > > >>> > Hi Joe, > > >>> > > > >>> > When we use PartitionedProducerImpl or MultiTopicsConsumerImpl, > there > > >>> is a > > >>> > poll task to fetch the metadata of the partitioned-topic regularly > for > > >>> the > > >>> > number of partitions updated. This PIP wants to use a > > >>> > notification mechanism to replace the metadata poll task. > > >>> > > > >>> > Just the way to implements partitioned-topic metadata > > >>> > notification mechanism is much like notifications on regex sub > changes > > >>> > > > >>> > Joe F <joefranc...@gmail.com> 于2023年2月24日周五 13:37写道: > > >>> > > > >>> > > Why is this needed when we have notifications on regex sub > changes? > > >>> Aren't > > >>> > > the partition names a well-defined regex? > > >>> > > > > >>> > > Joe > > >>> > > > > >>> > > On Thu, Feb 23, 2023 at 8:52 PM houxiaoyu <anonhx...@gmail.com> > > >>> wrote: > > >>> > > > > >>> > > > Hi Asaf, > > >>> > > > thanks for your reminder. > > >>> > > > > > >>> > > > ## Changing > > >>> > > > I have updated the following changes to make sure the > notification > > >>> > > arrived > > >>> > > > successfully: > > >>> > > > 1. The watch success response > `CommandWatchPartitionUpdateSuccess` > > >>> will > > >>> > > > contain all the concerned topics of this watcher > > >>> > > > 2. The notification `CommandPartitionUpdate` will always > contain > > >>> all the > > >>> > > > concerned topics of this watcher. > > >>> > > > 3. The notification `CommandPartitionUpdate`contains a > > >>> monotonically > > >>> > > > increased version. > > >>> > > > 4. A map > > >>> `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/, > > >>> > > > Pair<version, long/*timestamp*/>>` will keep track of the > updating > > >>> > > > 5. A timer will check the updating timeout through > `inFlightUpdate` > > >>> > > > 6. The client acks `CommandPartitionUpdateResult` to broker > when it > > >>> > > > finishes updating. > > >>> > > > > > >>> > > > ## Details > > >>> > > > > > >>> > > > The following mechanism could make sure the newest notification > > >>> arrived > > >>> > > > successfully, copying the description from GH: > > >>> > > > > > >>> > > > A new class, `org.apache.pulsar.PartitonUpdateWatcherService` > will > > >>> keep > > >>> > > > track of watchers and will listen to the changes in the > metadata. > > >>> > > Whenever > > >>> > > > a topic partition updates it checks if any watchers should be > > >>> notified > > >>> > > and > > >>> > > > sends an update for all topics the watcher concerns through the > > >>> > > ServerCnx. > > >>> > > > Then we will record this request into a map, > > >>> > > > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/, > > >>> > > Pair<version, > > >>> > > > long/*timestamp*/>>`. A timer will check this update timeout > > >>> through > > >>> > > > inFlightUpdate . We will query all the concerned topics's > > >>> partition if > > >>> > > > this watcher has sent an update timeout and will resend it. > > >>> > > > > > >>> > > > The client acks `CommandPartitionUpdateResult` to broker when > it > > >>> finishes > > >>> > > > updating. The broker handle `CommandPartitionUpdateResult` > > >>> request: > > >>> > > > - If CommandPartitionUpdateResult#version < > > >>> > > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, > > >>> > > broker > > >>> > > > ignores this ack. > > >>> > > > - If CommandPartitionUpdateResult#version == > > >>> > > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version > > >>> > > > - If CommandPartitionUpdateResult#success is true, broker > just > > >>> > > removes > > >>> > > > the watcherID from inFlightUpdate. > > >>> > > > - If CommandPartitionUpdateResult#success is false, broker > > >>> removes > > >>> > > the > > >>> > > > watcherId from inFlightUpdate, and queries all the concerned > > >>> topics's > > >>> > > > partition and resend. > > >>> > > > - If CommandPartitionUpdateResult#version > > > >>> > > > > > >>> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, > this > > >>> > > > should not happen. > > >>> > > > > > >>> > > > ## Edge cases > > >>> > > > - Broker restarts or crashes > > >>> > > > Client will reconnect to another broker, broker responses > > >>> > > > `CommandWatchPartitionUpdateSuccess` with watcher concerned > > >>> topics's > > >>> > > > partitions. We will call `PartitionsUpdateListener` if the > > >>> connection > > >>> > > > opens. > > >>> > > > - Client acks fail or timeout > > >>> > > > Broker will resend the watcher concerned topics's partitions > either > > >>> > > client > > >>> > > > acks fail or acks timeout. > > >>> > > > - Partition updates before client acks. > > >>> > > > `CommandPartitionUpdate#version` monotonically increases every > > >>> time it is > > >>> > > > updated. If Partition updates before client acks, a greater > > >>> version will > > >>> > > be > > >>> > > > put into `PartitonUpdateWatcherService#inFlightUpdate`. The > > >>> previous > > >>> > > acks > > >>> > > > will be ignored because the version is less than the current > > >>> version. > > >>> > > > > > >>> > > > > > >>> > > > Asaf Mesika <asaf.mes...@gmail.com> 于2023年2月22日周三 21:33写道: > > >>> > > > > > >>> > > > > How about edge cases? > > >>> > > > > In Andra's PIP he took into account cases where updates were > > >>> lost, so > > >>> > > he > > >>> > > > > created a secondary poll. Not saying it's the best situation > for > > >>> your > > >>> > > > case > > >>> > > > > of course. > > >>> > > > > I'm saying that when a broker sends an update > > >>> CommandPartitionUpdate, > > >>> > > how > > >>> > > > > do you know it arrived successfully? From my memory, there > is no > > >>> ACK in > > >>> > > > the > > >>> > > > > protocol, saying "I'm the client, I got the update > successfully" > > >>> and > > >>> > > only > > >>> > > > > then it removed the "dirty" flag for that topic, for this > > >>> watcher ID. > > >>> > > > > > > >>> > > > > Are there any other edge cases we can have? Let's be > exhaustive. > > >>> > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > > On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu < > anonhx...@gmail.com> > > >>> wrote: > > >>> > > > > > > >>> > > > > > Thanks for your great suggestion Enrico. > > >>> > > > > > > > >>> > > > > > I agreed with you. It's more reasonable to add a > > >>> > > > > > `supports_partition_update_watchers` in `FeatureFlags` to > > >>> detect > > >>> > > that > > >>> > > > > the > > >>> > > > > > connected broker supporting this feature , and add a new > broker > > >>> > > > > > configuration property > `enableNotificationForPartitionUpdate` > > >>> with > > >>> > > > > default > > >>> > > > > > value true, which is much like PIP-145. > > >>> > > > > > > > >>> > > > > > I have updated the descriptions. > > >>> > > > > > > > >>> > > > > > Enrico Olivelli <eolive...@gmail.com> 于2023年2月22日周三 > 17:26写道: > > >>> > > > > > > > >>> > > > > > > I support this proposal. > > >>> > > > > > > Coping here my comments from GH: > > >>> > > > > > > > > >>> > > > > > > can't we enable this by default in case we detect that > the > > >>> > > connected > > >>> > > > > > > Broker supports it ? > > >>> > > > > > > I can't find any reason for not using this mechanism if > it is > > >>> > > > > available. > > >>> > > > > > > > > >>> > > > > > > Maybe we can set the default to "true" and allow users to > > >>> disable > > >>> > > it > > >>> > > > > > > in case it impacts their systems in an unwanted way. > > >>> > > > > > > > > >>> > > > > > > Maybe It would be useful to have a way to disable the > > >>> mechanism on > > >>> > > > the > > >>> > > > > > > broker side as well > > >>> > > > > > > > > >>> > > > > > > Enrico > > >>> > > > > > > > > >>> > > > > > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu > > >>> > > > > > > <anonhx...@gmail.com> ha scritto: > > >>> > > > > > > > > > >>> > > > > > > > Hi Pulsar community: > > >>> > > > > > > > > > >>> > > > > > > > I opened a PIP to discuss "Notifications for partitions > > >>> update" > > >>> > > > > > > > > > >>> > > > > > > > ### Motivation > > >>> > > > > > > > > > >>> > > > > > > > Pulsar client will poll brokers at fix time for > checking > > >>> the > > >>> > > > > partitions > > >>> > > > > > > > update if we publish/subscribe the partitioned topics > with > > >>> > > > > > > > `autoUpdatePartitions` as true. This causes unnecessary > > >>> load for > > >>> > > > > both > > >>> > > > > > > > clients and brokers since most of the time the number > of > > >>> > > partitions > > >>> > > > > > will > > >>> > > > > > > > not change. In addition polling introduces latency in > > >>> partitions > > >>> > > > > update > > >>> > > > > > > > which is specified by `autoUpdatePartitionsInterval`. > > >>> > > > > > > > This PIP would like to introduce a notification > mechanism > > >>> for > > >>> > > > > partition > > >>> > > > > > > > update, which is much like PIP-145 for regex > subscriptions > > >>> > > > > > > > https://github.com/apache/pulsar/issues/14505. > > >>> > > > > > > > > > >>> > > > > > > > For more details, please read the PIP at: > > >>> > > > > > > > https://github.com/apache/pulsar/issues/19596 > > >>> > > > > > > > Looking forward to hearing your thoughts. > > >>> > > > > > > > > > >>> > > > > > > > Thanks, > > >>> > > > > > > > Xiaoyu Hou > > >>> > > > > > > > ---- > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > >> >