+1

Penghui

On Wed, Jul 20, 2022 at 9:41 PM Asaf Mesika <asaf.mes...@gmail.com> wrote:

> Sorry to barge in the vote - I forgot to send my reply on the discussion 2
> days ago :)
>
>
> On Tue, Jul 19, 2022 at 11:22 PM Nicolò Boschi <boschi1...@gmail.com>
> wrote:
>
> > +1, thanks
> >
> > Nicolò Boschi
> >
> > Il Mar 19 Lug 2022, 22:16 Christophe Bornet <bornet.ch...@gmail.com> ha
> > scritto:
> >
> > > +1
> > >
> > > Le mar. 19 juil. 2022 à 20:01, Andrey Yegorov <
> > andrey.yego...@datastax.com
> > > >
> > > a écrit :
> > >
> > > > +1
> > > >
> > > > On Tue, Jul 19, 2022 at 10:51 AM Dave Fisher <w...@apache.org>
> wrote:
> > > >
> > > > > +1 (binding)
> > > > >
> > > > > I support this enhancement for when a user occasionally requires
> > > accurate
> > > > > backlog stats. Once we bring this into service we can see if
> further
> > > > > guardrails are required.
> > > > >
> > > > > Regards,
> > > > > Dave
> > > > >
> > > > > > On Jul 19, 2022, at 10:02 AM, Enrico Olivelli <
> eolive...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > This is the VOTE thread for PIP-187
> > > > > >
> > > > > > This is the GH issue:
> > https://github.com/apache/pulsar/issues/16597
> > > > > > This is the PR: https://github.com/apache/pulsar/pull/16545
> > > > > >
> > > > > > The vote is open for at least 48 hours
> > > > > >
> > > > > > Below you can find a copy of the text of the PIP
> > > > > >
> > > > > > Best regards
> > > > > > Enrico
> > > > > >
> > > > > >
> > > > > > Motivation
> > > > > >
> > > > > > Currently there is no way to have a accurate backlog for a
> > > > subscription:
> > > > > >
> > > > > > you have only the number of "entries", not messages
> > > > > > server side filters (PIP-105) may filter out some messages
> > > > > >
> > > > > > Having the number of entries is sometimes not enough because with
> > > > > > batch messages the amount of work on the Consumers is
> proportional
> > to
> > > > > > the number of messages, that may vary from entry to entry.
> > > > > >
> > > > > > Goal
> > > > > >
> > > > > > The idea of this patch is to provide a dedicate API (REST,
> > > > > > pulsar-admin, and Java PulsarAdmin) to "analyze" a subscription
> and
> > > > > > provide detailed information about that is expected to be
> delivered
> > > to
> > > > > > Consumers.
> > > > > >
> > > > > > The operation will be quite expensive because we have to load the
> > > > > > messages from storage and pass them to the filters, but due to
> the
> > > > > > dynamic nature of Pulsar subscriptions there is no other way to
> > have
> > > > > > this value.
> > > > > >
> > > > > > One good strategy to do monitoring/alerting is to setup alerts on
> > the
> > > > > > usual "stats" and use this new API to inspect the subscription
> > > deeper,
> > > > > > typically be issuing a manual command.
> > > > > >
> > > > > > API Changes
> > > > > >
> > > > > > internal ManagedCursor API:
> > > > > >
> > > > > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition,
> > long
> > > > > > maxEntries, long timeOutMs);
> > > > > >
> > > > > > This method scans the Cursor from the lastMarkDelete position to
> > the
> > > > > tail.
> > > > > > There is a time limit and a maxEntries limit, these are needed in
> > > > > > order to prevent huge (and useless) scans.
> > > > > > The Predicate can stop the scan, if it doesn't want to continue
> the
> > > > > > processing for some reasons.
> > > > > >
> > > > > > New REST API:
> > > > > >
> > > > > >    @GET
> > > > > >
> > > > >
> > > >
> > >
> >
> @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog
> > > > > > Backlog")
> > > > > >    @ApiOperation(value = "Analyze a subscription, by scanning all
> > the
> > > > > > unprocessed messages")
> > > > > >
> > > > > >    public void analyzeBacklog SubscriptionBacklog(
> > > > > >           @Suspended final AsyncResponse asyncResponse,
> > > > > >            @ApiParam(value = "Specify the tenant", required =
> true)
> > > > > >            @PathParam("tenant") String tenant,
> > > > > >            @ApiParam(value = "Specify the namespace", required =
> > > true)
> > > > > >            @PathParam("namespace") String namespace,
> > > > > >            @ApiParam(value = "Specify topic name", required =
> true)
> > > > > >            @PathParam("topic") @Encoded String encodedTopic,
> > > > > >            @ApiParam(value = "Subscription", required = true)
> > > > > >            @PathParam("subName") String encodedSubName,
> > > > > >            @ApiParam(value = "Is authentication required to
> perform
> > > > > > this operation")
> > > > > >            @QueryParam("authoritative") @DefaultValue("false")
> > > > > > boolean authoritative) {
> > > > > >
> > > > > > API response model:
> > > > > >
> > > > > > public class AnalyzeSubscriptionBacklogResult {
> > > > > >    private long entries;
> > > > > >    private long messages;
> > > > > >
> > > > > >    private long filterRejectedEntries;
> > > > > >    private long filterAcceptedEntries;
> > > > > >    private long filterRescheduledEntries;
> > > > > >
> > > > > >    private long filterRejectedMessages;
> > > > > >    private long filterAcceptedMessages;
> > > > > >    private long filterRescheduledMessages;
> > > > > >
> > > > > >    private boolean aborted;
> > > > > >
> > > > > > The response contains "aborted=true" is the request has been
> > aborted
> > > > > > by some internal limitations, like a timeout or the scan hit the
> > max
> > > > > > number of entries.
> > > > > > We are not going to provide more details about the reason of the
> > > stop.
> > > > > > It will make the API too detailed and harder to maintain. Also,
> in
> > > the
> > > > > > logs of the broker you will find the details.
> > > > > >
> > > > > > New PulsarAdmin API:
> > > > > >
> > > > > > /**
> > > > > >     * Analyze subscription backlog.
> > > > > >     * This is a potentially expensive operation, as it requires
> > > > > >     * to read the messages from storage.
> > > > > >     * This function takes into consideration batch messages
> > > > > >     * and also Subscription filters.
> > > > > >     * @param topic
> > > > > >     *            Topic name
> > > > > >     * @param subscriptionName
> > > > > >     *            the subscription
> > > > > >     * @return an accurate analysis of the backlog
> > > > > >     * @throws PulsarAdminException
> > > > > >     *            Unexpected error
> > > > > >     */
> > > > > >    AnalyzeSubscriptionBacklogResult
> > analyzeSubscriptionBacklog(String
> > > > > > topic, String subscriptionName)
> > > > > >            throws PulsarAdminException;
> > > > > >
> > > > > >    /**
> > > > > >     * Analyze subscription backlog.
> > > > > >     * This is a potentially expensive operation, as it requires
> > > > > >     * to read the messages from storage.
> > > > > >     * This function takes into consideration batch messages
> > > > > >     * and also Subscription filters.
> > > > > >     * @param topic
> > > > > >     *            Topic name
> > > > > >     * @param subscriptionName
> > > > > >     *            the subscription
> > > > > >     * @return an accurate analysis of the backlog
> > > > > >     * @throws PulsarAdminException
> > > > > >     *            Unexpected error
> > > > > >     */
> > > > > >    CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > > > > analiseSubscriptionBacklogAsync(String topic,
> > > > > >
> > > > > >                 String subscriptionName);
> > > > > >
> > > > > > A pulsar-admin command will be added as well as usual.
> > > > > >
> > > > > > New configuration entries in broker.conf:
> > > > > >
> > > > > > @FieldContext(
> > > > > >         category = CATEGORY_POLICIES,
> > > > > >         doc = "Maximum time to spend while scanning a
> subscription
> > to
> > > > > > calculate the accurate backlog"
> > > > > > )
> > > > > > private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > > > > > @FieldContext(
> > > > > >         category = CATEGORY_POLICIES,
> > > > > >         doc = "Maximum number of entries to process while
> scanning
> > a
> > > > > > subscription to calculate the accurate backlog"
> > > > > > )
> > > > > > private long subscriptionBacklogScanMaxEntries = 10_000;
> > > > > >
> > > > > > Implementation
> > > > > >
> > > > > > The implementation is pretty straightforward:
> > > > > >
> > > > > > add a new API in ManagedCursor to do the Scan
> > > > > > add the REST API
> > > > > > implement in PersistentSubscription a analiseBacklog method that
> > does
> > > > > the scan
> > > > > >
> > > > > > The the PersistentSubscription runs the scan:
> > > > > >
> > > > > > it applies the filters if they are present
> > > > > > it considers individuallyDeletedMessages
> > > > > >
> > > > > > Reject Alternatives
> > > > > >
> > > > > > We could store somehow some counter about the number of logical
> > > > > > messages during writes. But that does not work for a few reasons:
> > > > > >
> > > > > > you cannot know which subscriptions will be created in a topic
> > > > > > subscription can be created from the past (Earliest)
> > > > > > subscription filters may change over time: they are usually
> > > configured
> > > > > > using Subscription Properties, and those properties are dynamic
> > > > > > doing computations on the write path (like running filters) kills
> > > > > > latency and thoughtput
> > > > > >
> > > > > > Use a client to clone the subscription and consume data.
> > > > > > This doesn't work because you have to transfer the data to the
> > > client,
> > > > > > and this is possibly a huge amount of work and a waste of
> > resources.
> > > > >
> > > > >
> > > >
> > > > --
> > > > Andrey Yegorov
> > > >
> > >
> >
>

Reply via email to