> As you can't know when you will need this value, you are going to
waste resources in order to calculate
something that you won't use.

> This API to analyze a subscription is not meant to be used often, but
only to debug (manually) problems.

Oh, I got your point.

One concern is if we are using REST API to get the accurate value,
not sure if all the messages are filtered out within the request timeout
or the max scan entries.

I'm thinking can we add a start message ID for the request? It looks like we
can continue to get the backlogs based on the last scanned message ID.

For example:

 - get backlog from 1:1
 - received 1 backlog and the last scanned message ID is 1:50000, which
does not reach the end of topic
 - get the backlog from 1:50001
 - received 2 backlogs and the last scanned message ID 3:40000, which does
not reach the end of topic
 - get the backlog from 3:40000
 - received 5 backlogs and reached the end of topic

The accurate backlog for this subscription is 8.

Thanks,
Penghui
On Thu, Jul 21, 2022 at 3:36 PM Enrico Olivelli <eolive...@gmail.com> wrote:

> Il giorno gio 21 lug 2022 alle ore 06:25 PengHui Li
> <peng...@apache.org> ha scritto:
> >
> > > What if the topic owner creates an internal subscription, consumes the
> > messages, and updates a count per filter.
> >
> > I agree with this approach. If we need to scan all the backlogs to
> > calculate the
> > accurate backlogs for each operation, it's so expensive and difficult to
> > apply to
> > the production environment. With the counter for each
> filter(subscription)
> > and only
> > re-scan the data after the filter changes will reduce a lot of overhead.
>
> This approach is very expensive when you don't need this feature.
> Because in order to have this value you have to read everything (once
> per subscription),
> especially when you have a subscription without consumers and the
> topic is being written.
>
> As you can't know when you will need this value, you are going to
> waste resources in order to calculate
> something that you won't use.
>
>
> This API to analyze a subscription is not meant to be used often, but
> only to debug (manually) problems.
>
> Also, the filter may be dependent on some other environment variables,
> like the wall clock time,
> if you have a filter that depends on time and you pre-calculate the
> backlog your counter won't be correct.
>
> >
> > If we want to expose the accurate backlogs in the Prometheus endpoint,
> > it's almost impossible.
>
> I don't think this is actually possible if you want to take into
> consideration the filters.
> We are in the case of general purpose filtering (actually we allow
> Java code to be plugged into the browser),
> so pre-calculating the counters won't work well.
>
>
> Enrico
>
> >
> > Thanks,
> > Penghui
> >
> > On Wed, Jul 20, 2022 at 11:23 PM Asaf Mesika <asaf.mes...@gmail.com>
> wrote:
> >
> > > On Wed, Jul 20, 2022 at 5:46 PM Enrico Olivelli <eolive...@gmail.com>
> > > wrote:
> > >
> > > > Asaf,
> > > >
> > > > Il giorno mer 20 lug 2022 alle ore 15:40 Asaf Mesika
> > > > <asaf.mes...@gmail.com> ha scritto:
> > > > >
> > > > > I'm not sure I understand the context exactly:
> > > > >
> > > > > You say today we can only know the number of entries, hence we'll
> have
> > > a
> > > > > wrong number of backlog for subscription since:
> > > > > 1. One entry contains multiple messages (batch message)
> > > > > 2. Subscription may contain a filter, which requires you to read
> the
> > > > entire
> > > > > backlog to know it
> > > >
> > > > correct
> > > >
> > > > >
> > > > > There are two things I don't understand:
> > > > >
> > > > > 1. We're adding an observability API, which you need to pay all the
> > > read
> > > > > cost just to know the count. I presume people would want to run
> this
> > > more
> > > > > than once. So they will read same data multiple times - why would a
> > > user
> > > > be
> > > > > willing to pay such a hefty price?
> > > >
> > > > sometimes it is the case, because processing a message may have a
> high
> > > > cost.
> > > > So having 10 entries of 100 messages is not correctly representing
> the
> > > > amount of work that must be done by the consumers
> > > > and so the user may wish to have an exact count.
> > > >
> > > > Having the filters adds more complexity because you cannot predict
> how
> > > > many entries will be filtered out
> > > >
> > > >
> > > > So it's mainly serving that specific use case of reading the entire
> > > messages over and over (every interval) is an order of magnitude less
> > > expensive than the processing it self.
> > >
> > >
> > > > > 2. If the user needs to know an accurate backlog, can't they use
> the
> > > > > ability to create a very large number of topics, thus they will
> know an
> > > > > accurate backlog without the huge cost?
> > > >
> > > > I can't understand why creating many topics will help.
> > > > instead with filters it is very likely that you have only fewer
> topics
> > > > with many subscriptions with different filters
> > > >
> > > > as you don't know the filters while writing you cannot route the
> > > > messages to some topic
> > > > also you would need to write the message to potentially multiple
> > > > topics, and that would be a huge write amplification
> > > > (think about a topic with 100 subscriptions)
> > > >
> > > > Yes, I haven't thought about that.
> > > What I was thinking is that those filters are mutually exclusive
> therefor
> > > topics, but in your case, if you have 100 different filters, and they
> > > overlap, yes it would be way more expensive to write them 100 times.
> > >
> > > >
> > > > > I have an idea, if that's ok:
> > > > >
> > > > > What if you can keep, as you said in your document, a metric
> counting
> > > > > messages per filter upon write.
> > > > This is not possible as described above
> > > >
> > >
> > > You wrote above that:
> > >
> > > ---
> > > you cannot know which subscriptions will be created in a topic
> > > subscription can be created from the past (Earliest)
> > > subscription filters may change over time: they are usually configured
> > > using Subscription Properties, and those properties are dynamic
> > > doing computations on the write path (like running filters) kills
> > > latency and thoughtput
> > >
> > > Use a client to clone the subscription and consume data.
> > > This doesn't work because you have to transfer the data to the client,
> > > and this is possibly a huge amount of work and a waste of resources.
> > > ---
> > >
> > > What if we don't do it directly on the write path.
> > > What if the topic owner creates an internal subscription, consumes the
> > > messages, and updates a count per filter.
> > > Thus, those computation will have less effect directly on the write
> path.
> > >
> > > I'm trying to compare that cost of compuations, with consuming all the
> > > messages, again and again, running filter computation for them, every
> > > interval (say 1min).
> > > The amount of computation in the latter would be more costly, no?
> > >
> > >
> > > > When you update the filter / add a filter
> > > > > by adding a new subscription, you can run code that reads from the
> > > > > beginning of the subscription (first unacked message) to catch up
> and
> > > > then
> > > > > continues. This may be done async, so the metric will take some
> time to
> > > > > catch up.
> > > > > Amortized, it has less cost on the system overall, if compared to
> > > reading
> > > > > all the messages multiple times to get a period size of the
> > > subscription.
> > > > > Both solutions are expensive as opposed to nothing of course. Both
> has
> > > to
> > > > > be a well documented conscious choice.
> > > > > WDYT?
> > > >
> > > >
> > > > Enrico
> > > > >
> > > > > Asaf
> > > > >
> > > > >
> > > > > On Thu, Jul 14, 2022 at 10:34 AM Enrico Olivelli <
> eolive...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hello,
> > > > > > this is a PIP to implement a tool to analyse the subscription
> backlog
> > > > > >
> > > > > > Link: https://github.com/apache/pulsar/issues/16597
> > > > > > Prototype: https://github.com/apache/pulsar/pull/16545
> > > > > >
> > > > > > Below you can find the proposal (I will amend the GH issue while
> we
> > > > > > discuss, as usual)
> > > > > >
> > > > > > Enrico
> > > > > >
> > > > > > Motivation
> > > > > >
> > > > > > Currently there is no way to have a accurate backlog for a
> > > > subscription:
> > > > > >
> > > > > > you have only the number of "entries", not messages
> > > > > > server side filters (PIP-105) may filter out some messages
> > > > > >
> > > > > > Having the number of entries is sometimes not enough because with
> > > > > > batch messages the amount of work on the Consumers is
> proportional to
> > > > > > the number of messages, that may vary from entry to entry.
> > > > > >
> > > > > > Goal
> > > > > >
> > > > > > The idea of this patch is to provide a dedicate API (REST,
> > > > > > pulsar-admin, and Java PulsarAdmin) to "analise" a subscription
> and
> > > > > > provide detailed information about that is expected to be
> delivered
> > > to
> > > > > > Consumers.
> > > > > >
> > > > > > The operation will be quite expensive because we have to load the
> > > > > > messages from storage and pass them to the filters, but due to
> the
> > > > > > dynamic nature of Pulsar subscriptions there is no other way to
> have
> > > > > > this value.
> > > > > >
> > > > > > One good strategy to do monitoring/alerting is to setup alerts
> on the
> > > > > > usual "stats" and use this new API to inspect the subscription
> > > deeper,
> > > > > > typically be issuing a manual command.
> > > > > >
> > > > > > API Changes
> > > > > >
> > > > > > internal ManagedCursor API:
> > > > > >
> > > > > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition,
> long
> > > > > > maxEntries, long timeOutMs);
> > > > > >
> > > > > > This method scans the Cursor from the lastMarkDelete position to
> the
> > > > tail.
> > > > > > There is a time limit and a maxEntries limit, these are needed in
> > > > > > order to prevent huge (and useless) scans.
> > > > > > The Predicate can stop the scan, if it doesn't want to continue
> the
> > > > > > processing for some reasons.
> > > > > >
> > > > > > New REST API:
> > > > > >
> > > > > >     @GET
> > > > > >
> > > > > >
> > > >
> > >
> @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
> > > > > >     @ApiOperation(value = "Analyse a subscription, by scanning
> all
> > > the
> > > > > > unprocessed messages")
> > > > > >
> > > > > >     public void analiseSubscriptionBacklog(
> > > > > >            @Suspended final AsyncResponse asyncResponse,
> > > > > >             @ApiParam(value = "Specify the tenant", required =
> true)
> > > > > >             @PathParam("tenant") String tenant,
> > > > > >             @ApiParam(value = "Specify the namespace", required =
> > > true)
> > > > > >             @PathParam("namespace") String namespace,
> > > > > >             @ApiParam(value = "Specify topic name", required =
> true)
> > > > > >             @PathParam("topic") @Encoded String encodedTopic,
> > > > > >             @ApiParam(value = "Subscription", required = true)
> > > > > >             @PathParam("subName") String encodedSubName,
> > > > > >             @ApiParam(value = "Is authentication required to
> perform
> > > > > > this operation")
> > > > > >             @QueryParam("authoritative") @DefaultValue("false")
> > > > > > boolean authoritative) {
> > > > > >
> > > > > > API response model:
> > > > > >
> > > > > > public class AnaliseSubscriptionBacklogResult {
> > > > > >     private long entries;
> > > > > >     private long messages;
> > > > > >
> > > > > >     private long filterRejectedEntries;
> > > > > >     private long filterAcceptedEntries;
> > > > > >     private long filterRescheduledEntries;
> > > > > >
> > > > > >     private long filterRejectedMessages;
> > > > > >     private long filterAcceptedMessages;
> > > > > >     private long filterRescheduledMessages;
> > > > > >
> > > > > >     private boolean aborted;
> > > > > >
> > > > > > The response contains "aborted=true" is the request has been
> aborted
> > > > > > by some internal limitations, like a timeout or the scan hit the
> max
> > > > > > number of entries.
> > > > > > We are not going to provide more details about the reason of the
> > > stop.
> > > > > > It will make the API too detailed and harder to maintain. Also,
> in
> > > the
> > > > > > logs of the broker you will find the details.
> > > > > >
> > > > > > New PulsarAdmin API:
> > > > > >
> > > > > > /**
> > > > > >      * Analise subscription backlog.
> > > > > >      * This is a potentially expensive operation, as it requires
> > > > > >      * to read the messages from storage.
> > > > > >      * This function takes into consideration batch messages
> > > > > >      * and also Subscription filters.
> > > > > >      * @param topic
> > > > > >      *            Topic name
> > > > > >      * @param subscriptionName
> > > > > >      *            the subscription
> > > > > >      * @return an accurate analysis of the backlog
> > > > > >      * @throws PulsarAdminException
> > > > > >      *            Unexpected error
> > > > > >      */
> > > > > >     AnaliseSubscriptionBacklogResult
> > > analiseSubscriptionBacklog(String
> > > > > > topic, String subscriptionName)
> > > > > >             throws PulsarAdminException;
> > > > > >
> > > > > >     /**
> > > > > >      * Analise subscription backlog.
> > > > > >      * This is a potentially expensive operation, as it requires
> > > > > >      * to read the messages from storage.
> > > > > >      * This function takes into consideration batch messages
> > > > > >      * and also Subscription filters.
> > > > > >      * @param topic
> > > > > >      *            Topic name
> > > > > >      * @param subscriptionName
> > > > > >      *            the subscription
> > > > > >      * @return an accurate analysis of the backlog
> > > > > >      * @throws PulsarAdminException
> > > > > >      *            Unexpected error
> > > > > >      */
> > > > > >     CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > > > > analiseSubscriptionBacklogAsync(String topic,
> > > > > >
> > > > > >                  String subscriptionName);
> > > > > >
> > > > > > A pulsar-admin command will be added as well as usual.
> > > > > >
> > > > > > New configuration entries in broker.conf:
> > > > > >
> > > > > > @FieldContext(
> > > > > >          category = CATEGORY_POLICIES,
> > > > > >          doc = "Maximum time to spend while scanning a
> subscription
> > > to
> > > > > > calculate the accurate backlog"
> > > > > >  )
> > > > > >  private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > > > > >  @FieldContext(
> > > > > >          category = CATEGORY_POLICIES,
> > > > > >          doc = "Maximum number of entries to process while
> scanning a
> > > > > > subscription to calculate the accurate backlog"
> > > > > >  )
> > > > > >  private long subscriptionBacklogScanMaxEntries = 10_000;
> > > > > >
> > > > > > Implementation
> > > > > >
> > > > > > The implementation is pretty straightforward:
> > > > > >
> > > > > > add a new API in ManagedCursor to do the Scan
> > > > > > add the REST API
> > > > > > implement in PersistentSubscription a analiseBacklog method that
> does
> > > > the
> > > > > > scan
> > > > > >
> > > > > > The the PersistentSubscription runs the scan:
> > > > > >
> > > > > > it applies the filters if they are present
> > > > > > it considers individuallyDeletedMessages
> > > > > >
> > > > > > Non trivial problem regarding the Dispatcher:
> > > > > > The Filters are loaded by a AbstractBaseDispatcher, but
> > > > > > PersistentSubscription starts a Dispatcher only when the first
> > > > > > consumer is connecter.
> > > > > > This happens because the Subscription itself doesn't have a type
> > > > > > (Failover,Exclusive,Shared...) and KeySharedMetadata, all this
> stuff
> > > > > > is decided by the first consumer coming (after the load of the
> topic,
> > > > > > so the subscription type may change after a topic unload).
> > > > > > This PIP won't fix this "problem", and so in case of missing
> > > > > > Dispatcher we are going to use a ephemeral Dispatcher without
> Type.
> > > > > > Maybe in the future it will be better to persist the Subscription
> > > Type
> > > > > > and other metadata, this way we can create the Dispatcher while
> > > > > > instantiating the Subscription.
> > > > > >
> > > > > > Reject Alternatives
> > > > > >
> > > > > > We could store somehow some counter about the number of logical
> > > > > > messages during writes. But that does not work for a few reasons:
> > > > > >
> > > > > > you cannot know which subscriptions will be created in a topic
> > > > > > subscription can be created from the past (Earliest)
> > > > > > subscription filters may change over time: they are usually
> > > configured
> > > > > > using Subscription Properties, and those properties are dynamic
> > > > > > doing computations on the write path (like running filters) kills
> > > > > > latency and thoughtput
> > > > > >
> > > > > > Use a client to clone the subscription and consume data.
> > > > > > This doesn't work because you have to transfer the data to the
> > > client,
> > > > > > and this is possibly a huge amount of work and a waste of
> resources.
> > > > > >
> > > >
> > >
>

Reply via email to