+1

On Tue, Jul 19, 2022 at 10:51 AM Dave Fisher <w...@apache.org> wrote:

> +1 (binding)
>
> I support this enhancement for when a user occasionally requires accurate
> backlog stats. Once we bring this into service we can see if further
> guardrails are required.
>
> Regards,
> Dave
>
> > On Jul 19, 2022, at 10:02 AM, Enrico Olivelli <eolive...@gmail.com>
> wrote:
> >
> > This is the VOTE thread for PIP-187
> >
> > This is the GH issue: https://github.com/apache/pulsar/issues/16597
> > This is the PR: https://github.com/apache/pulsar/pull/16545
> >
> > The vote is open for at least 48 hours
> >
> > Below you can find a copy of the text of the PIP
> >
> > Best regards
> > Enrico
> >
> >
> > Motivation
> >
> > Currently there is no way to have a accurate backlog for a subscription:
> >
> > you have only the number of "entries", not messages
> > server side filters (PIP-105) may filter out some messages
> >
> > Having the number of entries is sometimes not enough because with
> > batch messages the amount of work on the Consumers is proportional to
> > the number of messages, that may vary from entry to entry.
> >
> > Goal
> >
> > The idea of this patch is to provide a dedicate API (REST,
> > pulsar-admin, and Java PulsarAdmin) to "analyze" a subscription and
> > provide detailed information about that is expected to be delivered to
> > Consumers.
> >
> > The operation will be quite expensive because we have to load the
> > messages from storage and pass them to the filters, but due to the
> > dynamic nature of Pulsar subscriptions there is no other way to have
> > this value.
> >
> > One good strategy to do monitoring/alerting is to setup alerts on the
> > usual "stats" and use this new API to inspect the subscription deeper,
> > typically be issuing a manual command.
> >
> > API Changes
> >
> > internal ManagedCursor API:
> >
> > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
> > maxEntries, long timeOutMs);
> >
> > This method scans the Cursor from the lastMarkDelete position to the
> tail.
> > There is a time limit and a maxEntries limit, these are needed in
> > order to prevent huge (and useless) scans.
> > The Predicate can stop the scan, if it doesn't want to continue the
> > processing for some reasons.
> >
> > New REST API:
> >
> >    @GET
> >
> @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog
> > Backlog")
> >    @ApiOperation(value = "Analyze a subscription, by scanning all the
> > unprocessed messages")
> >
> >    public void analyzeBacklog SubscriptionBacklog(
> >           @Suspended final AsyncResponse asyncResponse,
> >            @ApiParam(value = "Specify the tenant", required = true)
> >            @PathParam("tenant") String tenant,
> >            @ApiParam(value = "Specify the namespace", required = true)
> >            @PathParam("namespace") String namespace,
> >            @ApiParam(value = "Specify topic name", required = true)
> >            @PathParam("topic") @Encoded String encodedTopic,
> >            @ApiParam(value = "Subscription", required = true)
> >            @PathParam("subName") String encodedSubName,
> >            @ApiParam(value = "Is authentication required to perform
> > this operation")
> >            @QueryParam("authoritative") @DefaultValue("false")
> > boolean authoritative) {
> >
> > API response model:
> >
> > public class AnalyzeSubscriptionBacklogResult {
> >    private long entries;
> >    private long messages;
> >
> >    private long filterRejectedEntries;
> >    private long filterAcceptedEntries;
> >    private long filterRescheduledEntries;
> >
> >    private long filterRejectedMessages;
> >    private long filterAcceptedMessages;
> >    private long filterRescheduledMessages;
> >
> >    private boolean aborted;
> >
> > The response contains "aborted=true" is the request has been aborted
> > by some internal limitations, like a timeout or the scan hit the max
> > number of entries.
> > We are not going to provide more details about the reason of the stop.
> > It will make the API too detailed and harder to maintain. Also, in the
> > logs of the broker you will find the details.
> >
> > New PulsarAdmin API:
> >
> > /**
> >     * Analyze subscription backlog.
> >     * This is a potentially expensive operation, as it requires
> >     * to read the messages from storage.
> >     * This function takes into consideration batch messages
> >     * and also Subscription filters.
> >     * @param topic
> >     *            Topic name
> >     * @param subscriptionName
> >     *            the subscription
> >     * @return an accurate analysis of the backlog
> >     * @throws PulsarAdminException
> >     *            Unexpected error
> >     */
> >    AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String
> > topic, String subscriptionName)
> >            throws PulsarAdminException;
> >
> >    /**
> >     * Analyze subscription backlog.
> >     * This is a potentially expensive operation, as it requires
> >     * to read the messages from storage.
> >     * This function takes into consideration batch messages
> >     * and also Subscription filters.
> >     * @param topic
> >     *            Topic name
> >     * @param subscriptionName
> >     *            the subscription
> >     * @return an accurate analysis of the backlog
> >     * @throws PulsarAdminException
> >     *            Unexpected error
> >     */
> >    CompletableFuture<AnaliseSubscriptionBacklogResult>
> > analiseSubscriptionBacklogAsync(String topic,
> >
> >                 String subscriptionName);
> >
> > A pulsar-admin command will be added as well as usual.
> >
> > New configuration entries in broker.conf:
> >
> > @FieldContext(
> >         category = CATEGORY_POLICIES,
> >         doc = "Maximum time to spend while scanning a subscription to
> > calculate the accurate backlog"
> > )
> > private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > @FieldContext(
> >         category = CATEGORY_POLICIES,
> >         doc = "Maximum number of entries to process while scanning a
> > subscription to calculate the accurate backlog"
> > )
> > private long subscriptionBacklogScanMaxEntries = 10_000;
> >
> > Implementation
> >
> > The implementation is pretty straightforward:
> >
> > add a new API in ManagedCursor to do the Scan
> > add the REST API
> > implement in PersistentSubscription a analiseBacklog method that does
> the scan
> >
> > The the PersistentSubscription runs the scan:
> >
> > it applies the filters if they are present
> > it considers individuallyDeletedMessages
> >
> > Reject Alternatives
> >
> > We could store somehow some counter about the number of logical
> > messages during writes. But that does not work for a few reasons:
> >
> > you cannot know which subscriptions will be created in a topic
> > subscription can be created from the past (Earliest)
> > subscription filters may change over time: they are usually configured
> > using Subscription Properties, and those properties are dynamic
> > doing computations on the write path (like running filters) kills
> > latency and thoughtput
> >
> > Use a client to clone the subscription and consume data.
> > This doesn't work because you have to transfer the data to the client,
> > and this is possibly a huge amount of work and a waste of resources.
>
>

-- 
Andrey Yegorov

Reply via email to