Right,

if there is constant activity on the cursor, the persisting frequency
is dictated by the rate limiter. If the forced flush is rate-limited
here it will not be a problem because the next ack operation (not
rate-limited) will eventually flush everything.

The problem the `flush()` is solving is when the the activity on the
cursor suddenly stops. Without the forced flush there will be no
guarantee that the latest acks are persisted in a timely manner and,
if the broker crashes, it was leading to re-delivery of very old
messages.
But, if there is no activity, the rate limiter will not limit the
forced flush of the cursor.




--
Matteo Merli
<matteo.me...@gmail.com>

On Thu, Jun 16, 2022 at 2:08 AM Asaf Mesika <asaf.mes...@gmail.com> wrote:
>
> If I understand correctly:
>
> this.flushCursorsTask =
> scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors),
>         config.getCursorPositionFlushSeconds(),
> config.getCursorPositionFlushSeconds(), TimeUnit.SECONDS);
>
> which calls
>
> ml.getCursors().forEach(c -> ((ManagedCursorImpl) c).flush());
>
>
> which calls
>
> asyncMarkDelete(lastMarkDeleteEntry.newPosition,
> lastMarkDeleteEntry.properties, new MarkDeleteCallback() {
>
>
> which gets to here
>
> // Apply rate limiting to mark-delete operations
> if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
>     isDirty = true;
>     updateLastMarkDeleteEntryToLatest(newPosition, properties);
>     callback.markDeleteComplete(ctx);
>     return;
> }
> internalAsyncMarkDelete(newPosition, properties, callback, ctx);
>
>
> So you end up in the rate limiter check even if you come via the scheduled
> route, no?
>
>
> On Wed, Jun 15, 2022 at 9:11 PM Matteo Merli <matteo.me...@gmail.com> wrote:
>
> > There is a background flush process that was added for this precise reason.
> >
> > In broker.conf
> > ----
> > # How frequently to flush the cursor positions that were accumulated
> > due to rate limiting. (seconds).
> > # Default is 60 seconds
> > managedLedgerCursorPositionFlushSeconds=60
> >
> > --
> > Matteo Merli
> > <matteo.me...@gmail.com>
> >
> > On Wed, Jun 15, 2022 at 8:45 AM Asaf Mesika <asaf.mes...@gmail.com> wrote:
> > >
> > > Hi,
> > >
> > > In the Managed Cursor implementation, I saw that the position state is
> > > persisted in the ledger whenever a user calls acknowledge, be it
> > individual
> > > or cumulative.
> > >
> > > For performance reasons there is a rate limiter, defaulting to 1 persist
> > > action per second.
> > >
> > > The bug:
> > > 17:00:00.003 - user ack a single message --> state it written to ledger
> > > 17:00:00.004 - 17:00:00.400 - use acks a single message 100 times.
> > > nothing happens after that on that subscription.
> > >
> > > 18:00 - machine crashed.
> > > You "lost" the state hence 100 messages have to be redelivered.
> > >
> > > I was wondering if it makes sense to add a timeout - say 10sec. If
> > nothing
> > > happens to the subscription, flush the state after 10sec if it's dirty.
> > >
> > > WDYT?
> >

Reply via email to