If I understand correctly: this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors), config.getCursorPositionFlushSeconds(), config.getCursorPositionFlushSeconds(), TimeUnit.SECONDS);
which calls ml.getCursors().forEach(c -> ((ManagedCursorImpl) c).flush()); which calls asyncMarkDelete(lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties, new MarkDeleteCallback() { which gets to here // Apply rate limiting to mark-delete operations if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) { isDirty = true; updateLastMarkDeleteEntryToLatest(newPosition, properties); callback.markDeleteComplete(ctx); return; } internalAsyncMarkDelete(newPosition, properties, callback, ctx); So you end up in the rate limiter check even if you come via the scheduled route, no? On Wed, Jun 15, 2022 at 9:11 PM Matteo Merli <matteo.me...@gmail.com> wrote: > There is a background flush process that was added for this precise reason. > > In broker.conf > ---- > # How frequently to flush the cursor positions that were accumulated > due to rate limiting. (seconds). > # Default is 60 seconds > managedLedgerCursorPositionFlushSeconds=60 > > -- > Matteo Merli > <matteo.me...@gmail.com> > > On Wed, Jun 15, 2022 at 8:45 AM Asaf Mesika <asaf.mes...@gmail.com> wrote: > > > > Hi, > > > > In the Managed Cursor implementation, I saw that the position state is > > persisted in the ledger whenever a user calls acknowledge, be it > individual > > or cumulative. > > > > For performance reasons there is a rate limiter, defaulting to 1 persist > > action per second. > > > > The bug: > > 17:00:00.003 - user ack a single message --> state it written to ledger > > 17:00:00.004 - 17:00:00.400 - use acks a single message 100 times. > > nothing happens after that on that subscription. > > > > 18:00 - machine crashed. > > You "lost" the state hence 100 messages have to be redelivered. > > > > I was wondering if it makes sense to add a timeout - say 10sec. If > nothing > > happens to the subscription, flush the state after 10sec if it's dirty. > > > > WDYT? >