Any comments ?

FYI I have updated the PIP after addressing some feedback on the PR:
- no more need to create a dummy Dispatcher
- now we are reading entries in batches

I would like to see this in 2.11 please

Enrico

Il giorno gio 14 lug 2022 alle ore 09:34 Enrico Olivelli
<eolive...@gmail.com> ha scritto:
>
> Hello,
> this is a PIP to implement a tool to analyse the subscription backlog
>
> Link: https://github.com/apache/pulsar/issues/16597
> Prototype: https://github.com/apache/pulsar/pull/16545
>
> Below you can find the proposal (I will amend the GH issue while we
> discuss, as usual)
>
> Enrico
>
> Motivation
>
> Currently there is no way to have a accurate backlog for a subscription:
>
> you have only the number of "entries", not messages
> server side filters (PIP-105) may filter out some messages
>
> Having the number of entries is sometimes not enough because with
> batch messages the amount of work on the Consumers is proportional to
> the number of messages, that may vary from entry to entry.
>
> Goal
>
> The idea of this patch is to provide a dedicate API (REST,
> pulsar-admin, and Java PulsarAdmin) to "analise" a subscription and
> provide detailed information about that is expected to be delivered to
> Consumers.
>
> The operation will be quite expensive because we have to load the
> messages from storage and pass them to the filters, but due to the
> dynamic nature of Pulsar subscriptions there is no other way to have
> this value.
>
> One good strategy to do monitoring/alerting is to setup alerts on the
> usual "stats" and use this new API to inspect the subscription deeper,
> typically be issuing a manual command.
>
> API Changes
>
> internal ManagedCursor API:
>
> CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
> maxEntries, long timeOutMs);
>
> This method scans the Cursor from the lastMarkDelete position to the tail.
> There is a time limit and a maxEntries limit, these are needed in
> order to prevent huge (and useless) scans.
> The Predicate can stop the scan, if it doesn't want to continue the
> processing for some reasons.
>
> New REST API:
>
>     @GET
>     
> @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
>     @ApiOperation(value = "Analyse a subscription, by scanning all the
> unprocessed messages")
>
>     public void analiseSubscriptionBacklog(
>            @Suspended final AsyncResponse asyncResponse,
>             @ApiParam(value = "Specify the tenant", required = true)
>             @PathParam("tenant") String tenant,
>             @ApiParam(value = "Specify the namespace", required = true)
>             @PathParam("namespace") String namespace,
>             @ApiParam(value = "Specify topic name", required = true)
>             @PathParam("topic") @Encoded String encodedTopic,
>             @ApiParam(value = "Subscription", required = true)
>             @PathParam("subName") String encodedSubName,
>             @ApiParam(value = "Is authentication required to perform
> this operation")
>             @QueryParam("authoritative") @DefaultValue("false")
> boolean authoritative) {
>
> API response model:
>
> public class AnaliseSubscriptionBacklogResult {
>     private long entries;
>     private long messages;
>
>     private long filterRejectedEntries;
>     private long filterAcceptedEntries;
>     private long filterRescheduledEntries;
>
>     private long filterRejectedMessages;
>     private long filterAcceptedMessages;
>     private long filterRescheduledMessages;
>
>     private boolean aborted;
>
> The response contains "aborted=true" is the request has been aborted
> by some internal limitations, like a timeout or the scan hit the max
> number of entries.
> We are not going to provide more details about the reason of the stop.
> It will make the API too detailed and harder to maintain. Also, in the
> logs of the broker you will find the details.
>
> New PulsarAdmin API:
>
> /**
>      * Analise subscription backlog.
>      * This is a potentially expensive operation, as it requires
>      * to read the messages from storage.
>      * This function takes into consideration batch messages
>      * and also Subscription filters.
>      * @param topic
>      *            Topic name
>      * @param subscriptionName
>      *            the subscription
>      * @return an accurate analysis of the backlog
>      * @throws PulsarAdminException
>      *            Unexpected error
>      */
>     AnaliseSubscriptionBacklogResult analiseSubscriptionBacklog(String
> topic, String subscriptionName)
>             throws PulsarAdminException;
>
>     /**
>      * Analise subscription backlog.
>      * This is a potentially expensive operation, as it requires
>      * to read the messages from storage.
>      * This function takes into consideration batch messages
>      * and also Subscription filters.
>      * @param topic
>      *            Topic name
>      * @param subscriptionName
>      *            the subscription
>      * @return an accurate analysis of the backlog
>      * @throws PulsarAdminException
>      *            Unexpected error
>      */
>     CompletableFuture<AnaliseSubscriptionBacklogResult>
> analiseSubscriptionBacklogAsync(String topic,
>
>                  String subscriptionName);
>
> A pulsar-admin command will be added as well as usual.
>
> New configuration entries in broker.conf:
>
> @FieldContext(
>          category = CATEGORY_POLICIES,
>          doc = "Maximum time to spend while scanning a subscription to
> calculate the accurate backlog"
>  )
>  private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
>  @FieldContext(
>          category = CATEGORY_POLICIES,
>          doc = "Maximum number of entries to process while scanning a
> subscription to calculate the accurate backlog"
>  )
>  private long subscriptionBacklogScanMaxEntries = 10_000;
>
> Implementation
>
> The implementation is pretty straightforward:
>
> add a new API in ManagedCursor to do the Scan
> add the REST API
> implement in PersistentSubscription a analiseBacklog method that does the scan
>
> The the PersistentSubscription runs the scan:
>
> it applies the filters if they are present
> it considers individuallyDeletedMessages
>
> Non trivial problem regarding the Dispatcher:
> The Filters are loaded by a AbstractBaseDispatcher, but
> PersistentSubscription starts a Dispatcher only when the first
> consumer is connecter.
> This happens because the Subscription itself doesn't have a type
> (Failover,Exclusive,Shared...) and KeySharedMetadata, all this stuff
> is decided by the first consumer coming (after the load of the topic,
> so the subscription type may change after a topic unload).
> This PIP won't fix this "problem", and so in case of missing
> Dispatcher we are going to use a ephemeral Dispatcher without Type.
> Maybe in the future it will be better to persist the Subscription Type
> and other metadata, this way we can create the Dispatcher while
> instantiating the Subscription.
>
> Reject Alternatives
>
> We could store somehow some counter about the number of logical
> messages during writes. But that does not work for a few reasons:
>
> you cannot know which subscriptions will be created in a topic
> subscription can be created from the past (Earliest)
> subscription filters may change over time: they are usually configured
> using Subscription Properties, and those properties are dynamic
> doing computations on the write path (like running filters) kills
> latency and thoughtput
>
> Use a client to clone the subscription and consume data.
> This doesn't work because you have to transfer the data to the client,
> and this is possibly a huge amount of work and a waste of resources.

Reply via email to