+1,
This feature is very useful for users.

Best,
Mattison

On Mon, 18 Jul 2022 at 18:11, Nicolò Boschi <boschi1...@gmail.com> wrote:

> +1, good work, I think it's very valuable for users for monitoring
> purposes.
>
> Nicolò Boschi
>
>
> Il giorno lun 18 lug 2022 alle ore 11:47 Lothruin Mirwen <
> lothruin.mir...@gmail.com> ha scritto:
>
> > This should be really helpful to monitor real backlog in production
> > environments.
> > Currently we are struck at "entries" count but doesn't describe well how
> > much real message lag we have.
> >
> > +1 excited to see it work on our production environments
> >
> > Diego Salvi
> >
> > Il giorno lun 18 lug 2022 alle ore 11:18 Enrico Olivelli <
> > eolive...@gmail.com> ha scritto:
> >
> > > Any comments ?
> > >
> > > FYI I have updated the PIP after addressing some feedback on the PR:
> > > - no more need to create a dummy Dispatcher
> > > - now we are reading entries in batches
> > >
> > > I would like to see this in 2.11 please
> > >
> > > Enrico
> > >
> > > Il giorno gio 14 lug 2022 alle ore 09:34 Enrico Olivelli
> > > <eolive...@gmail.com> ha scritto:
> > > >
> > > > Hello,
> > > > this is a PIP to implement a tool to analyse the subscription backlog
> > > >
> > > > Link: https://github.com/apache/pulsar/issues/16597
> > > > Prototype: https://github.com/apache/pulsar/pull/16545
> > > >
> > > > Below you can find the proposal (I will amend the GH issue while we
> > > > discuss, as usual)
> > > >
> > > > Enrico
> > > >
> > > > Motivation
> > > >
> > > > Currently there is no way to have a accurate backlog for a
> > subscription:
> > > >
> > > > you have only the number of "entries", not messages
> > > > server side filters (PIP-105) may filter out some messages
> > > >
> > > > Having the number of entries is sometimes not enough because with
> > > > batch messages the amount of work on the Consumers is proportional to
> > > > the number of messages, that may vary from entry to entry.
> > > >
> > > > Goal
> > > >
> > > > The idea of this patch is to provide a dedicate API (REST,
> > > > pulsar-admin, and Java PulsarAdmin) to "analise" a subscription and
> > > > provide detailed information about that is expected to be delivered
> to
> > > > Consumers.
> > > >
> > > > The operation will be quite expensive because we have to load the
> > > > messages from storage and pass them to the filters, but due to the
> > > > dynamic nature of Pulsar subscriptions there is no other way to have
> > > > this value.
> > > >
> > > > One good strategy to do monitoring/alerting is to setup alerts on the
> > > > usual "stats" and use this new API to inspect the subscription
> deeper,
> > > > typically be issuing a manual command.
> > > >
> > > > API Changes
> > > >
> > > > internal ManagedCursor API:
> > > >
> > > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
> > > > maxEntries, long timeOutMs);
> > > >
> > > > This method scans the Cursor from the lastMarkDelete position to the
> > > tail.
> > > > There is a time limit and a maxEntries limit, these are needed in
> > > > order to prevent huge (and useless) scans.
> > > > The Predicate can stop the scan, if it doesn't want to continue the
> > > > processing for some reasons.
> > > >
> > > > New REST API:
> > > >
> > > >     @GET
> > > >
> > >
> >
> @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
> > > >     @ApiOperation(value = "Analyse a subscription, by scanning all
> the
> > > > unprocessed messages")
> > > >
> > > >     public void analiseSubscriptionBacklog(
> > > >            @Suspended final AsyncResponse asyncResponse,
> > > >             @ApiParam(value = "Specify the tenant", required = true)
> > > >             @PathParam("tenant") String tenant,
> > > >             @ApiParam(value = "Specify the namespace", required =
> true)
> > > >             @PathParam("namespace") String namespace,
> > > >             @ApiParam(value = "Specify topic name", required = true)
> > > >             @PathParam("topic") @Encoded String encodedTopic,
> > > >             @ApiParam(value = "Subscription", required = true)
> > > >             @PathParam("subName") String encodedSubName,
> > > >             @ApiParam(value = "Is authentication required to perform
> > > > this operation")
> > > >             @QueryParam("authoritative") @DefaultValue("false")
> > > > boolean authoritative) {
> > > >
> > > > API response model:
> > > >
> > > > public class AnaliseSubscriptionBacklogResult {
> > > >     private long entries;
> > > >     private long messages;
> > > >
> > > >     private long filterRejectedEntries;
> > > >     private long filterAcceptedEntries;
> > > >     private long filterRescheduledEntries;
> > > >
> > > >     private long filterRejectedMessages;
> > > >     private long filterAcceptedMessages;
> > > >     private long filterRescheduledMessages;
> > > >
> > > >     private boolean aborted;
> > > >
> > > > The response contains "aborted=true" is the request has been aborted
> > > > by some internal limitations, like a timeout or the scan hit the max
> > > > number of entries.
> > > > We are not going to provide more details about the reason of the
> stop.
> > > > It will make the API too detailed and harder to maintain. Also, in
> the
> > > > logs of the broker you will find the details.
> > > >
> > > > New PulsarAdmin API:
> > > >
> > > > /**
> > > >      * Analise subscription backlog.
> > > >      * This is a potentially expensive operation, as it requires
> > > >      * to read the messages from storage.
> > > >      * This function takes into consideration batch messages
> > > >      * and also Subscription filters.
> > > >      * @param topic
> > > >      *            Topic name
> > > >      * @param subscriptionName
> > > >      *            the subscription
> > > >      * @return an accurate analysis of the backlog
> > > >      * @throws PulsarAdminException
> > > >      *            Unexpected error
> > > >      */
> > > >     AnaliseSubscriptionBacklogResult
> analiseSubscriptionBacklog(String
> > > > topic, String subscriptionName)
> > > >             throws PulsarAdminException;
> > > >
> > > >     /**
> > > >      * Analise subscription backlog.
> > > >      * This is a potentially expensive operation, as it requires
> > > >      * to read the messages from storage.
> > > >      * This function takes into consideration batch messages
> > > >      * and also Subscription filters.
> > > >      * @param topic
> > > >      *            Topic name
> > > >      * @param subscriptionName
> > > >      *            the subscription
> > > >      * @return an accurate analysis of the backlog
> > > >      * @throws PulsarAdminException
> > > >      *            Unexpected error
> > > >      */
> > > >     CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > > analiseSubscriptionBacklogAsync(String topic,
> > > >
> > > >                  String subscriptionName);
> > > >
> > > > A pulsar-admin command will be added as well as usual.
> > > >
> > > > New configuration entries in broker.conf:
> > > >
> > > > @FieldContext(
> > > >          category = CATEGORY_POLICIES,
> > > >          doc = "Maximum time to spend while scanning a subscription
> to
> > > > calculate the accurate backlog"
> > > >  )
> > > >  private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > > >  @FieldContext(
> > > >          category = CATEGORY_POLICIES,
> > > >          doc = "Maximum number of entries to process while scanning a
> > > > subscription to calculate the accurate backlog"
> > > >  )
> > > >  private long subscriptionBacklogScanMaxEntries = 10_000;
> > > >
> > > > Implementation
> > > >
> > > > The implementation is pretty straightforward:
> > > >
> > > > add a new API in ManagedCursor to do the Scan
> > > > add the REST API
> > > > implement in PersistentSubscription a analiseBacklog method that does
> > > the scan
> > > >
> > > > The the PersistentSubscription runs the scan:
> > > >
> > > > it applies the filters if they are present
> > > > it considers individuallyDeletedMessages
> > > >
> > > > Non trivial problem regarding the Dispatcher:
> > > > The Filters are loaded by a AbstractBaseDispatcher, but
> > > > PersistentSubscription starts a Dispatcher only when the first
> > > > consumer is connecter.
> > > > This happens because the Subscription itself doesn't have a type
> > > > (Failover,Exclusive,Shared...) and KeySharedMetadata, all this stuff
> > > > is decided by the first consumer coming (after the load of the topic,
> > > > so the subscription type may change after a topic unload).
> > > > This PIP won't fix this "problem", and so in case of missing
> > > > Dispatcher we are going to use a ephemeral Dispatcher without Type.
> > > > Maybe in the future it will be better to persist the Subscription
> Type
> > > > and other metadata, this way we can create the Dispatcher while
> > > > instantiating the Subscription.
> > > >
> > > > Reject Alternatives
> > > >
> > > > We could store somehow some counter about the number of logical
> > > > messages during writes. But that does not work for a few reasons:
> > > >
> > > > you cannot know which subscriptions will be created in a topic
> > > > subscription can be created from the past (Earliest)
> > > > subscription filters may change over time: they are usually
> configured
> > > > using Subscription Properties, and those properties are dynamic
> > > > doing computations on the write path (like running filters) kills
> > > > latency and thoughtput
> > > >
> > > > Use a client to clone the subscription and consume data.
> > > > This doesn't work because you have to transfer the data to the
> client,
> > > > and this is possibly a huge amount of work and a waste of resources.
> > >
> >
>

Reply via email to