Hello, this is a PIP to implement a tool to analyse the subscription backlog
Link: https://github.com/apache/pulsar/issues/16597 Prototype: https://github.com/apache/pulsar/pull/16545 Below you can find the proposal (I will amend the GH issue while we discuss, as usual) Enrico Motivation Currently there is no way to have a accurate backlog for a subscription: you have only the number of "entries", not messages server side filters (PIP-105) may filter out some messages Having the number of entries is sometimes not enough because with batch messages the amount of work on the Consumers is proportional to the number of messages, that may vary from entry to entry. Goal The idea of this patch is to provide a dedicate API (REST, pulsar-admin, and Java PulsarAdmin) to "analise" a subscription and provide detailed information about that is expected to be delivered to Consumers. The operation will be quite expensive because we have to load the messages from storage and pass them to the filters, but due to the dynamic nature of Pulsar subscriptions there is no other way to have this value. One good strategy to do monitoring/alerting is to setup alerts on the usual "stats" and use this new API to inspect the subscription deeper, typically be issuing a manual command. API Changes internal ManagedCursor API: CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long maxEntries, long timeOutMs); This method scans the Cursor from the lastMarkDelete position to the tail. There is a time limit and a maxEntries limit, these are needed in order to prevent huge (and useless) scans. The Predicate can stop the scan, if it doesn't want to continue the processing for some reasons. New REST API: @GET @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog") @ApiOperation(value = "Analyse a subscription, by scanning all the unprocessed messages") public void analiseSubscriptionBacklog( @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String tenant, @ApiParam(value = "Specify the namespace", required = true) @PathParam("namespace") String namespace, @ApiParam(value = "Specify topic name", required = true) @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "Subscription", required = true) @PathParam("subName") String encodedSubName, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { API response model: public class AnaliseSubscriptionBacklogResult { private long entries; private long messages; private long filterRejectedEntries; private long filterAcceptedEntries; private long filterRescheduledEntries; private long filterRejectedMessages; private long filterAcceptedMessages; private long filterRescheduledMessages; private boolean aborted; The response contains "aborted=true" is the request has been aborted by some internal limitations, like a timeout or the scan hit the max number of entries. We are not going to provide more details about the reason of the stop. It will make the API too detailed and harder to maintain. Also, in the logs of the broker you will find the details. New PulsarAdmin API: /** * Analise subscription backlog. * This is a potentially expensive operation, as it requires * to read the messages from storage. * This function takes into consideration batch messages * and also Subscription filters. * @param topic * Topic name * @param subscriptionName * the subscription * @return an accurate analysis of the backlog * @throws PulsarAdminException * Unexpected error */ AnaliseSubscriptionBacklogResult analiseSubscriptionBacklog(String topic, String subscriptionName) throws PulsarAdminException; /** * Analise subscription backlog. * This is a potentially expensive operation, as it requires * to read the messages from storage. * This function takes into consideration batch messages * and also Subscription filters. * @param topic * Topic name * @param subscriptionName * the subscription * @return an accurate analysis of the backlog * @throws PulsarAdminException * Unexpected error */ CompletableFuture<AnaliseSubscriptionBacklogResult> analiseSubscriptionBacklogAsync(String topic, String subscriptionName); A pulsar-admin command will be added as well as usual. New configuration entries in broker.conf: @FieldContext( category = CATEGORY_POLICIES, doc = "Maximum time to spend while scanning a subscription to calculate the accurate backlog" ) private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L; @FieldContext( category = CATEGORY_POLICIES, doc = "Maximum number of entries to process while scanning a subscription to calculate the accurate backlog" ) private long subscriptionBacklogScanMaxEntries = 10_000; Implementation The implementation is pretty straightforward: add a new API in ManagedCursor to do the Scan add the REST API implement in PersistentSubscription a analiseBacklog method that does the scan The the PersistentSubscription runs the scan: it applies the filters if they are present it considers individuallyDeletedMessages Non trivial problem regarding the Dispatcher: The Filters are loaded by a AbstractBaseDispatcher, but PersistentSubscription starts a Dispatcher only when the first consumer is connecter. This happens because the Subscription itself doesn't have a type (Failover,Exclusive,Shared...) and KeySharedMetadata, all this stuff is decided by the first consumer coming (after the load of the topic, so the subscription type may change after a topic unload). This PIP won't fix this "problem", and so in case of missing Dispatcher we are going to use a ephemeral Dispatcher without Type. Maybe in the future it will be better to persist the Subscription Type and other metadata, this way we can create the Dispatcher while instantiating the Subscription. Reject Alternatives We could store somehow some counter about the number of logical messages during writes. But that does not work for a few reasons: you cannot know which subscriptions will be created in a topic subscription can be created from the past (Earliest) subscription filters may change over time: they are usually configured using Subscription Properties, and those properties are dynamic doing computations on the write path (like running filters) kills latency and thoughtput Use a client to clone the subscription and consume data. This doesn't work because you have to transfer the data to the client, and this is possibly a huge amount of work and a waste of resources.