Hello,
this is a PIP to implement a tool to analyse the subscription backlog

Link: https://github.com/apache/pulsar/issues/16597
Prototype: https://github.com/apache/pulsar/pull/16545

Below you can find the proposal (I will amend the GH issue while we
discuss, as usual)

Enrico

Motivation

Currently there is no way to have a accurate backlog for a subscription:

you have only the number of "entries", not messages
server side filters (PIP-105) may filter out some messages

Having the number of entries is sometimes not enough because with
batch messages the amount of work on the Consumers is proportional to
the number of messages, that may vary from entry to entry.

Goal

The idea of this patch is to provide a dedicate API (REST,
pulsar-admin, and Java PulsarAdmin) to "analise" a subscription and
provide detailed information about that is expected to be delivered to
Consumers.

The operation will be quite expensive because we have to load the
messages from storage and pass them to the filters, but due to the
dynamic nature of Pulsar subscriptions there is no other way to have
this value.

One good strategy to do monitoring/alerting is to setup alerts on the
usual "stats" and use this new API to inspect the subscription deeper,
typically be issuing a manual command.

API Changes

internal ManagedCursor API:

CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
maxEntries, long timeOutMs);

This method scans the Cursor from the lastMarkDelete position to the tail.
There is a time limit and a maxEntries limit, these are needed in
order to prevent huge (and useless) scans.
The Predicate can stop the scan, if it doesn't want to continue the
processing for some reasons.

New REST API:

    @GET
    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
    @ApiOperation(value = "Analyse a subscription, by scanning all the
unprocessed messages")

    public void analiseSubscriptionBacklog(
           @Suspended final AsyncResponse asyncResponse,
            @ApiParam(value = "Specify the tenant", required = true)
            @PathParam("tenant") String tenant,
            @ApiParam(value = "Specify the namespace", required = true)
            @PathParam("namespace") String namespace,
            @ApiParam(value = "Specify topic name", required = true)
            @PathParam("topic") @Encoded String encodedTopic,
            @ApiParam(value = "Subscription", required = true)
            @PathParam("subName") String encodedSubName,
            @ApiParam(value = "Is authentication required to perform
this operation")
            @QueryParam("authoritative") @DefaultValue("false")
boolean authoritative) {

API response model:

public class AnaliseSubscriptionBacklogResult {
    private long entries;
    private long messages;

    private long filterRejectedEntries;
    private long filterAcceptedEntries;
    private long filterRescheduledEntries;

    private long filterRejectedMessages;
    private long filterAcceptedMessages;
    private long filterRescheduledMessages;

    private boolean aborted;

The response contains "aborted=true" is the request has been aborted
by some internal limitations, like a timeout or the scan hit the max
number of entries.
We are not going to provide more details about the reason of the stop.
It will make the API too detailed and harder to maintain. Also, in the
logs of the broker you will find the details.

New PulsarAdmin API:

/**
     * Analise subscription backlog.
     * This is a potentially expensive operation, as it requires
     * to read the messages from storage.
     * This function takes into consideration batch messages
     * and also Subscription filters.
     * @param topic
     *            Topic name
     * @param subscriptionName
     *            the subscription
     * @return an accurate analysis of the backlog
     * @throws PulsarAdminException
     *            Unexpected error
     */
    AnaliseSubscriptionBacklogResult analiseSubscriptionBacklog(String
topic, String subscriptionName)
            throws PulsarAdminException;

    /**
     * Analise subscription backlog.
     * This is a potentially expensive operation, as it requires
     * to read the messages from storage.
     * This function takes into consideration batch messages
     * and also Subscription filters.
     * @param topic
     *            Topic name
     * @param subscriptionName
     *            the subscription
     * @return an accurate analysis of the backlog
     * @throws PulsarAdminException
     *            Unexpected error
     */
    CompletableFuture<AnaliseSubscriptionBacklogResult>
analiseSubscriptionBacklogAsync(String topic,

                 String subscriptionName);

A pulsar-admin command will be added as well as usual.

New configuration entries in broker.conf:

@FieldContext(
         category = CATEGORY_POLICIES,
         doc = "Maximum time to spend while scanning a subscription to
calculate the accurate backlog"
 )
 private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
 @FieldContext(
         category = CATEGORY_POLICIES,
         doc = "Maximum number of entries to process while scanning a
subscription to calculate the accurate backlog"
 )
 private long subscriptionBacklogScanMaxEntries = 10_000;

Implementation

The implementation is pretty straightforward:

add a new API in ManagedCursor to do the Scan
add the REST API
implement in PersistentSubscription a analiseBacklog method that does the scan

The the PersistentSubscription runs the scan:

it applies the filters if they are present
it considers individuallyDeletedMessages

Non trivial problem regarding the Dispatcher:
The Filters are loaded by a AbstractBaseDispatcher, but
PersistentSubscription starts a Dispatcher only when the first
consumer is connecter.
This happens because the Subscription itself doesn't have a type
(Failover,Exclusive,Shared...) and KeySharedMetadata, all this stuff
is decided by the first consumer coming (after the load of the topic,
so the subscription type may change after a topic unload).
This PIP won't fix this "problem", and so in case of missing
Dispatcher we are going to use a ephemeral Dispatcher without Type.
Maybe in the future it will be better to persist the Subscription Type
and other metadata, this way we can create the Dispatcher while
instantiating the Subscription.

Reject Alternatives

We could store somehow some counter about the number of logical
messages during writes. But that does not work for a few reasons:

you cannot know which subscriptions will be created in a topic
subscription can be created from the past (Earliest)
subscription filters may change over time: they are usually configured
using Subscription Properties, and those properties are dynamic
doing computations on the write path (like running filters) kills
latency and thoughtput

Use a client to clone the subscription and consume data.
This doesn't work because you have to transfer the data to the client,
and this is possibly a huge amount of work and a waste of resources.

Reply via email to