Any comments ? FYI I have updated the PIP after addressing some feedback on the PR: - no more need to create a dummy Dispatcher - now we are reading entries in batches
I would like to see this in 2.11 please Enrico Il giorno gio 14 lug 2022 alle ore 09:34 Enrico Olivelli <eolive...@gmail.com> ha scritto: > > Hello, > this is a PIP to implement a tool to analyse the subscription backlog > > Link: https://github.com/apache/pulsar/issues/16597 > Prototype: https://github.com/apache/pulsar/pull/16545 > > Below you can find the proposal (I will amend the GH issue while we > discuss, as usual) > > Enrico > > Motivation > > Currently there is no way to have a accurate backlog for a subscription: > > you have only the number of "entries", not messages > server side filters (PIP-105) may filter out some messages > > Having the number of entries is sometimes not enough because with > batch messages the amount of work on the Consumers is proportional to > the number of messages, that may vary from entry to entry. > > Goal > > The idea of this patch is to provide a dedicate API (REST, > pulsar-admin, and Java PulsarAdmin) to "analise" a subscription and > provide detailed information about that is expected to be delivered to > Consumers. > > The operation will be quite expensive because we have to load the > messages from storage and pass them to the filters, but due to the > dynamic nature of Pulsar subscriptions there is no other way to have > this value. > > One good strategy to do monitoring/alerting is to setup alerts on the > usual "stats" and use this new API to inspect the subscription deeper, > typically be issuing a manual command. > > API Changes > > internal ManagedCursor API: > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long > maxEntries, long timeOutMs); > > This method scans the Cursor from the lastMarkDelete position to the tail. > There is a time limit and a maxEntries limit, these are needed in > order to prevent huge (and useless) scans. > The Predicate can stop the scan, if it doesn't want to continue the > processing for some reasons. > > New REST API: > > @GET > > @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog") > @ApiOperation(value = "Analyse a subscription, by scanning all the > unprocessed messages") > > public void analiseSubscriptionBacklog( > @Suspended final AsyncResponse asyncResponse, > @ApiParam(value = "Specify the tenant", required = true) > @PathParam("tenant") String tenant, > @ApiParam(value = "Specify the namespace", required = true) > @PathParam("namespace") String namespace, > @ApiParam(value = "Specify topic name", required = true) > @PathParam("topic") @Encoded String encodedTopic, > @ApiParam(value = "Subscription", required = true) > @PathParam("subName") String encodedSubName, > @ApiParam(value = "Is authentication required to perform > this operation") > @QueryParam("authoritative") @DefaultValue("false") > boolean authoritative) { > > API response model: > > public class AnaliseSubscriptionBacklogResult { > private long entries; > private long messages; > > private long filterRejectedEntries; > private long filterAcceptedEntries; > private long filterRescheduledEntries; > > private long filterRejectedMessages; > private long filterAcceptedMessages; > private long filterRescheduledMessages; > > private boolean aborted; > > The response contains "aborted=true" is the request has been aborted > by some internal limitations, like a timeout or the scan hit the max > number of entries. > We are not going to provide more details about the reason of the stop. > It will make the API too detailed and harder to maintain. Also, in the > logs of the broker you will find the details. > > New PulsarAdmin API: > > /** > * Analise subscription backlog. > * This is a potentially expensive operation, as it requires > * to read the messages from storage. > * This function takes into consideration batch messages > * and also Subscription filters. > * @param topic > * Topic name > * @param subscriptionName > * the subscription > * @return an accurate analysis of the backlog > * @throws PulsarAdminException > * Unexpected error > */ > AnaliseSubscriptionBacklogResult analiseSubscriptionBacklog(String > topic, String subscriptionName) > throws PulsarAdminException; > > /** > * Analise subscription backlog. > * This is a potentially expensive operation, as it requires > * to read the messages from storage. > * This function takes into consideration batch messages > * and also Subscription filters. > * @param topic > * Topic name > * @param subscriptionName > * the subscription > * @return an accurate analysis of the backlog > * @throws PulsarAdminException > * Unexpected error > */ > CompletableFuture<AnaliseSubscriptionBacklogResult> > analiseSubscriptionBacklogAsync(String topic, > > String subscriptionName); > > A pulsar-admin command will be added as well as usual. > > New configuration entries in broker.conf: > > @FieldContext( > category = CATEGORY_POLICIES, > doc = "Maximum time to spend while scanning a subscription to > calculate the accurate backlog" > ) > private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L; > @FieldContext( > category = CATEGORY_POLICIES, > doc = "Maximum number of entries to process while scanning a > subscription to calculate the accurate backlog" > ) > private long subscriptionBacklogScanMaxEntries = 10_000; > > Implementation > > The implementation is pretty straightforward: > > add a new API in ManagedCursor to do the Scan > add the REST API > implement in PersistentSubscription a analiseBacklog method that does the scan > > The the PersistentSubscription runs the scan: > > it applies the filters if they are present > it considers individuallyDeletedMessages > > Non trivial problem regarding the Dispatcher: > The Filters are loaded by a AbstractBaseDispatcher, but > PersistentSubscription starts a Dispatcher only when the first > consumer is connecter. > This happens because the Subscription itself doesn't have a type > (Failover,Exclusive,Shared...) and KeySharedMetadata, all this stuff > is decided by the first consumer coming (after the load of the topic, > so the subscription type may change after a topic unload). > This PIP won't fix this "problem", and so in case of missing > Dispatcher we are going to use a ephemeral Dispatcher without Type. > Maybe in the future it will be better to persist the Subscription Type > and other metadata, this way we can create the Dispatcher while > instantiating the Subscription. > > Reject Alternatives > > We could store somehow some counter about the number of logical > messages during writes. But that does not work for a few reasons: > > you cannot know which subscriptions will be created in a topic > subscription can be created from the past (Earliest) > subscription filters may change over time: they are usually configured > using Subscription Properties, and those properties are dynamic > doing computations on the write path (like running filters) kills > latency and thoughtput > > Use a client to clone the subscription and consume data. > This doesn't work because you have to transfer the data to the client, > and this is possibly a huge amount of work and a waste of resources.