Hi Joe and Michael,

I think I misunderstood what you replied before. Now I understand and
explain it again.

Besides the reasons what Asaf mentioned above, there are also some limits
for using topic list watcher.  For example the `topicsPattern.pattern` must
less that `maxSubscriptionPatternLeng` [0]. If the consumer subscribes
multi partitioned-topics, the `topicsPattern.pattern` maybe very long.

So I think that it's better to have a separate notification implementation
for partition update.

[0]
https://github.com/apache/pulsar/blob/5d6932137d76d544f939bef27df25f61b4a4d00d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java#L115-L126

Thanks,
Xiaoyu Hou

houxiaoyu <anonhx...@gmail.com> 于2023年2月27日周一 10:56写道:

> Hi Michael,
>
> >  I think we just need the client to "subscribe" to a topic notification
> for
> >  "<topic-name>-partition-[0-9]+" to eliminate the polling
>
> If pulsar users want to pub/sub a partitioned-topic, I think most of the
> users would like to create a simple producer or consumer like following:
> ```
> Producer<byte[]> producer = client.newProducer().topic(topic).create();
> producer.sendAsync(msg);
> ```
> ```
> client.newConsumer()
>         .topic(topic)
>         .subscriptionName(subscription)
>         .subscribe();
> ```
> I think there is no reason for users to use `topicsPattern` if a pulsar
> just wants to subscribe a partitioned-topic. In addition, `topicsPattern`
> couldn't be used for producers.
>
> So I think PIP-145 [0] will benefit for regex subscriptions.  And this PIP
> [1] will benefit for the common partitioned-topic pub/sub scenario.
>
> [0] https://github.com/apache/pulsar/issues/14505
> [1] https://github.com/apache/pulsar/issues/19596
>
> Thanks
> Xiaoyu Hou
>
> Michael Marshall <mmarsh...@apache.org> 于2023年2月25日周六 01:29写道:
>
>> > Just the way to implements partitioned-topic metadata
>> > notification mechanism is much like notifications on regex sub changes
>>
>> Why do we need a separate notification implementation? The regex
>> subscription feature is about discovering topics (not subscriptions)
>> that match a regular expression. As Joe mentioned, I think we just
>> need the client to "subscribe" to a topic notification for
>> "<topic-name>-partition-[0-9]+" to eliminate the polling.
>>
>> Building on PIP 145, the work for this PIP would be in implementing a
>> different `TopicsChangedListener` [1] so that the result of an added
>> topic is to add a producer/consumer to the new partition.
>>
>> I support removing polling in our streaming platform, but I'd prefer
>> to limit the number of notification systems we implement.
>>
>> Thanks,
>> Michael
>>
>> [0] https://github.com/apache/pulsar/pull/16062
>> [1]
>> https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java#L169-L175
>>
>>
>>
>> On Fri, Feb 24, 2023 at 1:57 AM houxiaoyu <anonhx...@gmail.com> wrote:
>> >
>> > Hi Joe,
>> >
>> > When we use PartitionedProducerImpl or MultiTopicsConsumerImpl,  there
>> is a
>> > poll task to fetch the metadata of the partitioned-topic regularly for
>> the
>> > number of partitions updated.  This PIP wants to use a
>> > notification mechanism to replace the metadata poll task.
>> >
>> > Just the way to implements partitioned-topic metadata
>> > notification mechanism is much like notifications on regex sub changes
>> >
>> > Joe F <joefranc...@gmail.com> 于2023年2月24日周五 13:37写道:
>> >
>> > > Why is this needed when we have notifications on regex sub changes?
>> Aren't
>> > > the partition names a well-defined regex?
>> > >
>> > > Joe
>> > >
>> > > On Thu, Feb 23, 2023 at 8:52 PM houxiaoyu <anonhx...@gmail.com>
>> wrote:
>> > >
>> > > > Hi Asaf,
>> > > > thanks for your reminder.
>> > > >
>> > > > ## Changing
>> > > > I have updated the following changes to make sure the notification
>> > > arrived
>> > > > successfully:
>> > > > 1. The watch success response `CommandWatchPartitionUpdateSuccess`
>> will
>> > > > contain all the concerned topics of this watcher
>> > > > 2. The notification `CommandPartitionUpdate` will always contain
>> all the
>> > > > concerned topics of this watcher.
>> > > > 3. The notification `CommandPartitionUpdate`contains a monotonically
>> > > > increased version.
>> > > > 4. A map
>> `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
>> > > > Pair<version, long/*timestamp*/>>` will keep track of the updating
>> > > > 5. A timer will check the updating timeout through `inFlightUpdate`
>> > > > 6. The client acks `CommandPartitionUpdateResult` to broker when it
>> > > > finishes updating.
>> > > >
>> > > > ## Details
>> > > >
>> > > > The following mechanism could make sure the newest notification
>> arrived
>> > > > successfully, copying the description from GH:
>> > > >
>> > > > A new class, `org.apache.pulsar.PartitonUpdateWatcherService` will
>> keep
>> > > > track of watchers and will listen to the changes in the metadata.
>> > > Whenever
>> > > > a topic partition updates it checks if any watchers should be
>> notified
>> > > and
>> > > > sends an update for all topics the watcher concerns through the
>> > > ServerCnx.
>> > > > Then we will record this request into a map,
>> > > > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
>> > > Pair<version,
>> > > > long/*timestamp*/>>`.  A timer will check this update timeout
>> through
>> > > > inFlightUpdate .  We will query all the concerned topics's
>> partition if
>> > > > this watcher has sent an update timeout and will resend it.
>> > > >
>> > > > The client acks `CommandPartitionUpdateResult` to broker when it
>> finishes
>> > > > updating.  The broker handle `CommandPartitionUpdateResult` request:
>> > > >  - If CommandPartitionUpdateResult#version <
>> > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
>> > > broker
>> > > > ignores this ack.
>> > > >  -  If CommandPartitionUpdateResult#version ==
>> > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version
>> > > >     - If CommandPartitionUpdateResult#success is true,  broker just
>> > > removes
>> > > > the watcherID from inFlightUpdate.
>> > > >     - If CommandPartitionUpdateResult#success is false,  broker
>> removes
>> > > the
>> > > > watcherId from inFlightUpdate, and queries all the concerned
>> topics's
>> > > > partition and resend.
>> > > >  - If CommandPartitionUpdateResult#version >
>> > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
>> this
>> > > > should not happen.
>> > > >
>> > > >  ## Edge cases
>> > > > - Broker restarts or crashes
>> > > > Client will reconnect to another broker, broker responses
>> > > > `CommandWatchPartitionUpdateSuccess` with watcher concerned topics's
>> > > > partitions.  We will call `PartitionsUpdateListener` if the
>> connection
>> > > > opens.
>> > > > - Client acks fail or timeout
>> > > > Broker will resend the watcher concerned topics's partitions either
>> > > client
>> > > > acks fail or acks timeout.
>> > > > - Partition updates before client acks.
>> > > > `CommandPartitionUpdate#version` monotonically increases every time
>> it is
>> > > > updated. If Partition updates before client acks, a greater version
>> will
>> > > be
>> > > > put into `PartitonUpdateWatcherService#inFlightUpdate`.  The
>> previous
>> > > acks
>> > > > will be ignored because the version is less than the current
>> version.
>> > > >
>> > > >
>> > > > Asaf Mesika <asaf.mes...@gmail.com> 于2023年2月22日周三 21:33写道:
>> > > >
>> > > > > How about edge cases?
>> > > > > In Andra's PIP he took into account cases where updates were
>> lost, so
>> > > he
>> > > > > created a secondary poll. Not saying it's the best situation for
>> your
>> > > > case
>> > > > > of course.
>> > > > > I'm saying that when a broker sends an update
>> CommandPartitionUpdate,
>> > > how
>> > > > > do you know it arrived successfully? From my memory, there is no
>> ACK in
>> > > > the
>> > > > > protocol, saying "I'm the client, I got the update successfully"
>> and
>> > > only
>> > > > > then it removed the "dirty" flag for that topic, for this watcher
>> ID.
>> > > > >
>> > > > > Are there any other edge cases we can have? Let's be exhaustive.
>> > > > >
>> > > > >
>> > > > >
>> > > > > On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu <anonhx...@gmail.com>
>> wrote:
>> > > > >
>> > > > > > Thanks for your great suggestion Enrico.
>> > > > > >
>> > > > > > I agreed with you. It's more reasonable to add a
>> > > > > > `supports_partition_update_watchers`  in `FeatureFlags`  to
>> detect
>> > > that
>> > > > > the
>> > > > > > connected broker supporting this feature , and add a new broker
>> > > > > > configuration property `enableNotificationForPartitionUpdate`
>> with
>> > > > > default
>> > > > > > value true, which is much like PIP-145.
>> > > > > >
>> > > > > > I have updated the descriptions.
>> > > > > >
>> > > > > > Enrico Olivelli <eolive...@gmail.com> 于2023年2月22日周三 17:26写道:
>> > > > > >
>> > > > > > > I support this proposal.
>> > > > > > > Coping here my comments from GH:
>> > > > > > >
>> > > > > > > can't we enable this by default in case we detect that the
>> > > connected
>> > > > > > > Broker supports it ?
>> > > > > > > I can't find any reason for not using this mechanism if it is
>> > > > > available.
>> > > > > > >
>> > > > > > > Maybe we can set the default to "true" and allow users to
>> disable
>> > > it
>> > > > > > > in case it impacts their systems in an unwanted way.
>> > > > > > >
>> > > > > > > Maybe It would be useful to have a way to disable the
>> mechanism on
>> > > > the
>> > > > > > > broker side as well
>> > > > > > >
>> > > > > > > Enrico
>> > > > > > >
>> > > > > > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu
>> > > > > > > <anonhx...@gmail.com> ha scritto:
>> > > > > > > >
>> > > > > > > > Hi Pulsar community:
>> > > > > > > >
>> > > > > > > > I opened a PIP to discuss "Notifications for partitions
>> update"
>> > > > > > > >
>> > > > > > > > ### Motivation
>> > > > > > > >
>> > > > > > > > Pulsar client will poll brokers at fix time for checking the
>> > > > > partitions
>> > > > > > > > update if we publish/subscribe the partitioned topics with
>> > > > > > > > `autoUpdatePartitions` as true. This causes unnecessary
>> load for
>> > > > > both
>> > > > > > > > clients and brokers since most of the time the number of
>> > > partitions
>> > > > > > will
>> > > > > > > > not change. In addition polling introduces latency in
>> partitions
>> > > > > update
>> > > > > > > >  which is specified by `autoUpdatePartitionsInterval`.
>> > > > > > > > This PIP would like to introduce a notification mechanism
>> for
>> > > > > partition
>> > > > > > > > update, which is much like PIP-145 for regex subscriptions
>> > > > > > > > https://github.com/apache/pulsar/issues/14505.
>> > > > > > > >
>> > > > > > > > For more details, please read the PIP at:
>> > > > > > > > https://github.com/apache/pulsar/issues/19596
>> > > > > > > > Looking forward to hearing your thoughts.
>> > > > > > > >
>> > > > > > > > Thanks,
>> > > > > > > > Xiaoyu Hou
>> > > > > > > > ----
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>>
>

Reply via email to