Thank you all I am closing this VOTE as Passed Let's follow up on the PR for detailed feedback about the implementation
Enrico Il giorno lun 25 lug 2022 alle ore 07:21 mattison chao <mattisonc...@apache.org> ha scritto: > > +1 > > Best, > Mattison > > On Sun, 24 Jul 2022 at 14:51, Haiting Jiang <jianghait...@apache.org> wrote: > > > > +1 > > > > Thanks, > > Haiting > > > > On 2022/07/23 02:00:32 PengHui Li wrote: > > > +1 > > > > > > Penghui > > > > > > On Wed, Jul 20, 2022 at 9:41 PM Asaf Mesika <asaf.mes...@gmail.com> wrote: > > > > > > > Sorry to barge in the vote - I forgot to send my reply on the > > > > discussion 2 > > > > days ago :) > > > > > > > > > > > > On Tue, Jul 19, 2022 at 11:22 PM Nicolò Boschi <boschi1...@gmail.com> > > > > wrote: > > > > > > > > > +1, thanks > > > > > > > > > > Nicolò Boschi > > > > > > > > > > Il Mar 19 Lug 2022, 22:16 Christophe Bornet <bornet.ch...@gmail.com> > > > > > ha > > > > > scritto: > > > > > > > > > > > +1 > > > > > > > > > > > > Le mar. 19 juil. 2022 à 20:01, Andrey Yegorov < > > > > > andrey.yego...@datastax.com > > > > > > > > > > > > > a écrit : > > > > > > > > > > > > > +1 > > > > > > > > > > > > > > On Tue, Jul 19, 2022 at 10:51 AM Dave Fisher <w...@apache.org> > > > > wrote: > > > > > > > > > > > > > > > +1 (binding) > > > > > > > > > > > > > > > > I support this enhancement for when a user occasionally requires > > > > > > accurate > > > > > > > > backlog stats. Once we bring this into service we can see if > > > > further > > > > > > > > guardrails are required. > > > > > > > > > > > > > > > > Regards, > > > > > > > > Dave > > > > > > > > > > > > > > > > > On Jul 19, 2022, at 10:02 AM, Enrico Olivelli < > > > > eolive...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > This is the VOTE thread for PIP-187 > > > > > > > > > > > > > > > > > > This is the GH issue: > > > > > https://github.com/apache/pulsar/issues/16597 > > > > > > > > > This is the PR: https://github.com/apache/pulsar/pull/16545 > > > > > > > > > > > > > > > > > > The vote is open for at least 48 hours > > > > > > > > > > > > > > > > > > Below you can find a copy of the text of the PIP > > > > > > > > > > > > > > > > > > Best regards > > > > > > > > > Enrico > > > > > > > > > > > > > > > > > > > > > > > > > > > Motivation > > > > > > > > > > > > > > > > > > Currently there is no way to have a accurate backlog for a > > > > > > > subscription: > > > > > > > > > > > > > > > > > > you have only the number of "entries", not messages > > > > > > > > > server side filters (PIP-105) may filter out some messages > > > > > > > > > > > > > > > > > > Having the number of entries is sometimes not enough because > > > > > > > > > with > > > > > > > > > batch messages the amount of work on the Consumers is > > > > proportional > > > > > to > > > > > > > > > the number of messages, that may vary from entry to entry. > > > > > > > > > > > > > > > > > > Goal > > > > > > > > > > > > > > > > > > The idea of this patch is to provide a dedicate API (REST, > > > > > > > > > pulsar-admin, and Java PulsarAdmin) to "analyze" a > > > > > > > > > subscription > > > > and > > > > > > > > > provide detailed information about that is expected to be > > > > delivered > > > > > > to > > > > > > > > > Consumers. > > > > > > > > > > > > > > > > > > The operation will be quite expensive because we have to load > > > > > > > > > the > > > > > > > > > messages from storage and pass them to the filters, but due to > > > > the > > > > > > > > > dynamic nature of Pulsar subscriptions there is no other way > > > > > > > > > to > > > > > have > > > > > > > > > this value. > > > > > > > > > > > > > > > > > > One good strategy to do monitoring/alerting is to setup > > > > > > > > > alerts on > > > > > the > > > > > > > > > usual "stats" and use this new API to inspect the subscription > > > > > > deeper, > > > > > > > > > typically be issuing a manual command. > > > > > > > > > > > > > > > > > > API Changes > > > > > > > > > > > > > > > > > > internal ManagedCursor API: > > > > > > > > > > > > > > > > > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> > > > > > > > > > condition, > > > > > long > > > > > > > > > maxEntries, long timeOutMs); > > > > > > > > > > > > > > > > > > This method scans the Cursor from the lastMarkDelete position > > > > > > > > > to > > > > > the > > > > > > > > tail. > > > > > > > > > There is a time limit and a maxEntries limit, these are > > > > > > > > > needed in > > > > > > > > > order to prevent huge (and useless) scans. > > > > > > > > > The Predicate can stop the scan, if it doesn't want to > > > > > > > > > continue > > > > the > > > > > > > > > processing for some reasons. > > > > > > > > > > > > > > > > > > New REST API: > > > > > > > > > > > > > > > > > > @GET > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog > > > > > > > > > Backlog") > > > > > > > > > @ApiOperation(value = "Analyze a subscription, by scanning > > > > > > > > > all > > > > > the > > > > > > > > > unprocessed messages") > > > > > > > > > > > > > > > > > > public void analyzeBacklog SubscriptionBacklog( > > > > > > > > > @Suspended final AsyncResponse asyncResponse, > > > > > > > > > @ApiParam(value = "Specify the tenant", required = > > > > true) > > > > > > > > > @PathParam("tenant") String tenant, > > > > > > > > > @ApiParam(value = "Specify the namespace", > > > > > > > > > required = > > > > > > true) > > > > > > > > > @PathParam("namespace") String namespace, > > > > > > > > > @ApiParam(value = "Specify topic name", required = > > > > true) > > > > > > > > > @PathParam("topic") @Encoded String encodedTopic, > > > > > > > > > @ApiParam(value = "Subscription", required = true) > > > > > > > > > @PathParam("subName") String encodedSubName, > > > > > > > > > @ApiParam(value = "Is authentication required to > > > > perform > > > > > > > > > this operation") > > > > > > > > > @QueryParam("authoritative") @DefaultValue("false") > > > > > > > > > boolean authoritative) { > > > > > > > > > > > > > > > > > > API response model: > > > > > > > > > > > > > > > > > > public class AnalyzeSubscriptionBacklogResult { > > > > > > > > > private long entries; > > > > > > > > > private long messages; > > > > > > > > > > > > > > > > > > private long filterRejectedEntries; > > > > > > > > > private long filterAcceptedEntries; > > > > > > > > > private long filterRescheduledEntries; > > > > > > > > > > > > > > > > > > private long filterRejectedMessages; > > > > > > > > > private long filterAcceptedMessages; > > > > > > > > > private long filterRescheduledMessages; > > > > > > > > > > > > > > > > > > private boolean aborted; > > > > > > > > > > > > > > > > > > The response contains "aborted=true" is the request has been > > > > > aborted > > > > > > > > > by some internal limitations, like a timeout or the scan hit > > > > > > > > > the > > > > > max > > > > > > > > > number of entries. > > > > > > > > > We are not going to provide more details about the reason of > > > > > > > > > the > > > > > > stop. > > > > > > > > > It will make the API too detailed and harder to maintain. > > > > > > > > > Also, > > > > in > > > > > > the > > > > > > > > > logs of the broker you will find the details. > > > > > > > > > > > > > > > > > > New PulsarAdmin API: > > > > > > > > > > > > > > > > > > /** > > > > > > > > > * Analyze subscription backlog. > > > > > > > > > * This is a potentially expensive operation, as it > > > > > > > > > requires > > > > > > > > > * to read the messages from storage. > > > > > > > > > * This function takes into consideration batch messages > > > > > > > > > * and also Subscription filters. > > > > > > > > > * @param topic > > > > > > > > > * Topic name > > > > > > > > > * @param subscriptionName > > > > > > > > > * the subscription > > > > > > > > > * @return an accurate analysis of the backlog > > > > > > > > > * @throws PulsarAdminException > > > > > > > > > * Unexpected error > > > > > > > > > */ > > > > > > > > > AnalyzeSubscriptionBacklogResult > > > > > analyzeSubscriptionBacklog(String > > > > > > > > > topic, String subscriptionName) > > > > > > > > > throws PulsarAdminException; > > > > > > > > > > > > > > > > > > /** > > > > > > > > > * Analyze subscription backlog. > > > > > > > > > * This is a potentially expensive operation, as it > > > > > > > > > requires > > > > > > > > > * to read the messages from storage. > > > > > > > > > * This function takes into consideration batch messages > > > > > > > > > * and also Subscription filters. > > > > > > > > > * @param topic > > > > > > > > > * Topic name > > > > > > > > > * @param subscriptionName > > > > > > > > > * the subscription > > > > > > > > > * @return an accurate analysis of the backlog > > > > > > > > > * @throws PulsarAdminException > > > > > > > > > * Unexpected error > > > > > > > > > */ > > > > > > > > > CompletableFuture<AnaliseSubscriptionBacklogResult> > > > > > > > > > analiseSubscriptionBacklogAsync(String topic, > > > > > > > > > > > > > > > > > > String subscriptionName); > > > > > > > > > > > > > > > > > > A pulsar-admin command will be added as well as usual. > > > > > > > > > > > > > > > > > > New configuration entries in broker.conf: > > > > > > > > > > > > > > > > > > @FieldContext( > > > > > > > > > category = CATEGORY_POLICIES, > > > > > > > > > doc = "Maximum time to spend while scanning a > > > > subscription > > > > > to > > > > > > > > > calculate the accurate backlog" > > > > > > > > > ) > > > > > > > > > private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * > > > > > > > > > 2L; > > > > > > > > > @FieldContext( > > > > > > > > > category = CATEGORY_POLICIES, > > > > > > > > > doc = "Maximum number of entries to process while > > > > scanning > > > > > a > > > > > > > > > subscription to calculate the accurate backlog" > > > > > > > > > ) > > > > > > > > > private long subscriptionBacklogScanMaxEntries = 10_000; > > > > > > > > > > > > > > > > > > Implementation > > > > > > > > > > > > > > > > > > The implementation is pretty straightforward: > > > > > > > > > > > > > > > > > > add a new API in ManagedCursor to do the Scan > > > > > > > > > add the REST API > > > > > > > > > implement in PersistentSubscription a analiseBacklog method > > > > > > > > > that > > > > > does > > > > > > > > the scan > > > > > > > > > > > > > > > > > > The the PersistentSubscription runs the scan: > > > > > > > > > > > > > > > > > > it applies the filters if they are present > > > > > > > > > it considers individuallyDeletedMessages > > > > > > > > > > > > > > > > > > Reject Alternatives > > > > > > > > > > > > > > > > > > We could store somehow some counter about the number of > > > > > > > > > logical > > > > > > > > > messages during writes. But that does not work for a few > > > > > > > > > reasons: > > > > > > > > > > > > > > > > > > you cannot know which subscriptions will be created in a topic > > > > > > > > > subscription can be created from the past (Earliest) > > > > > > > > > subscription filters may change over time: they are usually > > > > > > configured > > > > > > > > > using Subscription Properties, and those properties are > > > > > > > > > dynamic > > > > > > > > > doing computations on the write path (like running filters) > > > > > > > > > kills > > > > > > > > > latency and thoughtput > > > > > > > > > > > > > > > > > > Use a client to clone the subscription and consume data. > > > > > > > > > This doesn't work because you have to transfer the data to the > > > > > > client, > > > > > > > > > and this is possibly a huge amount of work and a waste of > > > > > resources. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > Andrey Yegorov > > > > > > > > > > > > > > > > > > > > > > > > >