> What if the topic owner creates an internal subscription, consumes the messages, and updates a count per filter.
I agree with this approach. If we need to scan all the backlogs to calculate the accurate backlogs for each operation, it's so expensive and difficult to apply to the production environment. With the counter for each filter(subscription) and only re-scan the data after the filter changes will reduce a lot of overhead. If we want to expose the accurate backlogs in the Prometheus endpoint, it's almost impossible. Thanks, Penghui On Wed, Jul 20, 2022 at 11:23 PM Asaf Mesika <asaf.mes...@gmail.com> wrote: > On Wed, Jul 20, 2022 at 5:46 PM Enrico Olivelli <eolive...@gmail.com> > wrote: > > > Asaf, > > > > Il giorno mer 20 lug 2022 alle ore 15:40 Asaf Mesika > > <asaf.mes...@gmail.com> ha scritto: > > > > > > I'm not sure I understand the context exactly: > > > > > > You say today we can only know the number of entries, hence we'll have > a > > > wrong number of backlog for subscription since: > > > 1. One entry contains multiple messages (batch message) > > > 2. Subscription may contain a filter, which requires you to read the > > entire > > > backlog to know it > > > > correct > > > > > > > > There are two things I don't understand: > > > > > > 1. We're adding an observability API, which you need to pay all the > read > > > cost just to know the count. I presume people would want to run this > more > > > than once. So they will read same data multiple times - why would a > user > > be > > > willing to pay such a hefty price? > > > > sometimes it is the case, because processing a message may have a high > > cost. > > So having 10 entries of 100 messages is not correctly representing the > > amount of work that must be done by the consumers > > and so the user may wish to have an exact count. > > > > Having the filters adds more complexity because you cannot predict how > > many entries will be filtered out > > > > > > So it's mainly serving that specific use case of reading the entire > messages over and over (every interval) is an order of magnitude less > expensive than the processing it self. > > > > > 2. If the user needs to know an accurate backlog, can't they use the > > > ability to create a very large number of topics, thus they will know an > > > accurate backlog without the huge cost? > > > > I can't understand why creating many topics will help. > > instead with filters it is very likely that you have only fewer topics > > with many subscriptions with different filters > > > > as you don't know the filters while writing you cannot route the > > messages to some topic > > also you would need to write the message to potentially multiple > > topics, and that would be a huge write amplification > > (think about a topic with 100 subscriptions) > > > > Yes, I haven't thought about that. > What I was thinking is that those filters are mutually exclusive therefor > topics, but in your case, if you have 100 different filters, and they > overlap, yes it would be way more expensive to write them 100 times. > > > > > > I have an idea, if that's ok: > > > > > > What if you can keep, as you said in your document, a metric counting > > > messages per filter upon write. > > This is not possible as described above > > > > You wrote above that: > > --- > you cannot know which subscriptions will be created in a topic > subscription can be created from the past (Earliest) > subscription filters may change over time: they are usually configured > using Subscription Properties, and those properties are dynamic > doing computations on the write path (like running filters) kills > latency and thoughtput > > Use a client to clone the subscription and consume data. > This doesn't work because you have to transfer the data to the client, > and this is possibly a huge amount of work and a waste of resources. > --- > > What if we don't do it directly on the write path. > What if the topic owner creates an internal subscription, consumes the > messages, and updates a count per filter. > Thus, those computation will have less effect directly on the write path. > > I'm trying to compare that cost of compuations, with consuming all the > messages, again and again, running filter computation for them, every > interval (say 1min). > The amount of computation in the latter would be more costly, no? > > > > When you update the filter / add a filter > > > by adding a new subscription, you can run code that reads from the > > > beginning of the subscription (first unacked message) to catch up and > > then > > > continues. This may be done async, so the metric will take some time to > > > catch up. > > > Amortized, it has less cost on the system overall, if compared to > reading > > > all the messages multiple times to get a period size of the > subscription. > > > Both solutions are expensive as opposed to nothing of course. Both has > to > > > be a well documented conscious choice. > > > WDYT? > > > > > > Enrico > > > > > > Asaf > > > > > > > > > On Thu, Jul 14, 2022 at 10:34 AM Enrico Olivelli <eolive...@gmail.com> > > > wrote: > > > > > > > Hello, > > > > this is a PIP to implement a tool to analyse the subscription backlog > > > > > > > > Link: https://github.com/apache/pulsar/issues/16597 > > > > Prototype: https://github.com/apache/pulsar/pull/16545 > > > > > > > > Below you can find the proposal (I will amend the GH issue while we > > > > discuss, as usual) > > > > > > > > Enrico > > > > > > > > Motivation > > > > > > > > Currently there is no way to have a accurate backlog for a > > subscription: > > > > > > > > you have only the number of "entries", not messages > > > > server side filters (PIP-105) may filter out some messages > > > > > > > > Having the number of entries is sometimes not enough because with > > > > batch messages the amount of work on the Consumers is proportional to > > > > the number of messages, that may vary from entry to entry. > > > > > > > > Goal > > > > > > > > The idea of this patch is to provide a dedicate API (REST, > > > > pulsar-admin, and Java PulsarAdmin) to "analise" a subscription and > > > > provide detailed information about that is expected to be delivered > to > > > > Consumers. > > > > > > > > The operation will be quite expensive because we have to load the > > > > messages from storage and pass them to the filters, but due to the > > > > dynamic nature of Pulsar subscriptions there is no other way to have > > > > this value. > > > > > > > > One good strategy to do monitoring/alerting is to setup alerts on the > > > > usual "stats" and use this new API to inspect the subscription > deeper, > > > > typically be issuing a manual command. > > > > > > > > API Changes > > > > > > > > internal ManagedCursor API: > > > > > > > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long > > > > maxEntries, long timeOutMs); > > > > > > > > This method scans the Cursor from the lastMarkDelete position to the > > tail. > > > > There is a time limit and a maxEntries limit, these are needed in > > > > order to prevent huge (and useless) scans. > > > > The Predicate can stop the scan, if it doesn't want to continue the > > > > processing for some reasons. > > > > > > > > New REST API: > > > > > > > > @GET > > > > > > > > > > > @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog") > > > > @ApiOperation(value = "Analyse a subscription, by scanning all > the > > > > unprocessed messages") > > > > > > > > public void analiseSubscriptionBacklog( > > > > @Suspended final AsyncResponse asyncResponse, > > > > @ApiParam(value = "Specify the tenant", required = true) > > > > @PathParam("tenant") String tenant, > > > > @ApiParam(value = "Specify the namespace", required = > true) > > > > @PathParam("namespace") String namespace, > > > > @ApiParam(value = "Specify topic name", required = true) > > > > @PathParam("topic") @Encoded String encodedTopic, > > > > @ApiParam(value = "Subscription", required = true) > > > > @PathParam("subName") String encodedSubName, > > > > @ApiParam(value = "Is authentication required to perform > > > > this operation") > > > > @QueryParam("authoritative") @DefaultValue("false") > > > > boolean authoritative) { > > > > > > > > API response model: > > > > > > > > public class AnaliseSubscriptionBacklogResult { > > > > private long entries; > > > > private long messages; > > > > > > > > private long filterRejectedEntries; > > > > private long filterAcceptedEntries; > > > > private long filterRescheduledEntries; > > > > > > > > private long filterRejectedMessages; > > > > private long filterAcceptedMessages; > > > > private long filterRescheduledMessages; > > > > > > > > private boolean aborted; > > > > > > > > The response contains "aborted=true" is the request has been aborted > > > > by some internal limitations, like a timeout or the scan hit the max > > > > number of entries. > > > > We are not going to provide more details about the reason of the > stop. > > > > It will make the API too detailed and harder to maintain. Also, in > the > > > > logs of the broker you will find the details. > > > > > > > > New PulsarAdmin API: > > > > > > > > /** > > > > * Analise subscription backlog. > > > > * This is a potentially expensive operation, as it requires > > > > * to read the messages from storage. > > > > * This function takes into consideration batch messages > > > > * and also Subscription filters. > > > > * @param topic > > > > * Topic name > > > > * @param subscriptionName > > > > * the subscription > > > > * @return an accurate analysis of the backlog > > > > * @throws PulsarAdminException > > > > * Unexpected error > > > > */ > > > > AnaliseSubscriptionBacklogResult > analiseSubscriptionBacklog(String > > > > topic, String subscriptionName) > > > > throws PulsarAdminException; > > > > > > > > /** > > > > * Analise subscription backlog. > > > > * This is a potentially expensive operation, as it requires > > > > * to read the messages from storage. > > > > * This function takes into consideration batch messages > > > > * and also Subscription filters. > > > > * @param topic > > > > * Topic name > > > > * @param subscriptionName > > > > * the subscription > > > > * @return an accurate analysis of the backlog > > > > * @throws PulsarAdminException > > > > * Unexpected error > > > > */ > > > > CompletableFuture<AnaliseSubscriptionBacklogResult> > > > > analiseSubscriptionBacklogAsync(String topic, > > > > > > > > String subscriptionName); > > > > > > > > A pulsar-admin command will be added as well as usual. > > > > > > > > New configuration entries in broker.conf: > > > > > > > > @FieldContext( > > > > category = CATEGORY_POLICIES, > > > > doc = "Maximum time to spend while scanning a subscription > to > > > > calculate the accurate backlog" > > > > ) > > > > private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L; > > > > @FieldContext( > > > > category = CATEGORY_POLICIES, > > > > doc = "Maximum number of entries to process while scanning a > > > > subscription to calculate the accurate backlog" > > > > ) > > > > private long subscriptionBacklogScanMaxEntries = 10_000; > > > > > > > > Implementation > > > > > > > > The implementation is pretty straightforward: > > > > > > > > add a new API in ManagedCursor to do the Scan > > > > add the REST API > > > > implement in PersistentSubscription a analiseBacklog method that does > > the > > > > scan > > > > > > > > The the PersistentSubscription runs the scan: > > > > > > > > it applies the filters if they are present > > > > it considers individuallyDeletedMessages > > > > > > > > Non trivial problem regarding the Dispatcher: > > > > The Filters are loaded by a AbstractBaseDispatcher, but > > > > PersistentSubscription starts a Dispatcher only when the first > > > > consumer is connecter. > > > > This happens because the Subscription itself doesn't have a type > > > > (Failover,Exclusive,Shared...) and KeySharedMetadata, all this stuff > > > > is decided by the first consumer coming (after the load of the topic, > > > > so the subscription type may change after a topic unload). > > > > This PIP won't fix this "problem", and so in case of missing > > > > Dispatcher we are going to use a ephemeral Dispatcher without Type. > > > > Maybe in the future it will be better to persist the Subscription > Type > > > > and other metadata, this way we can create the Dispatcher while > > > > instantiating the Subscription. > > > > > > > > Reject Alternatives > > > > > > > > We could store somehow some counter about the number of logical > > > > messages during writes. But that does not work for a few reasons: > > > > > > > > you cannot know which subscriptions will be created in a topic > > > > subscription can be created from the past (Earliest) > > > > subscription filters may change over time: they are usually > configured > > > > using Subscription Properties, and those properties are dynamic > > > > doing computations on the write path (like running filters) kills > > > > latency and thoughtput > > > > > > > > Use a client to clone the subscription and consume data. > > > > This doesn't work because you have to transfer the data to the > client, > > > > and this is possibly a huge amount of work and a waste of resources. > > > > > > >