Right, if there is constant activity on the cursor, the persisting frequency is dictated by the rate limiter. If the forced flush is rate-limited here it will not be a problem because the next ack operation (not rate-limited) will eventually flush everything.
The problem the `flush()` is solving is when the the activity on the cursor suddenly stops. Without the forced flush there will be no guarantee that the latest acks are persisted in a timely manner and, if the broker crashes, it was leading to re-delivery of very old messages. But, if there is no activity, the rate limiter will not limit the forced flush of the cursor. -- Matteo Merli <matteo.me...@gmail.com> On Thu, Jun 16, 2022 at 2:08 AM Asaf Mesika <asaf.mes...@gmail.com> wrote: > > If I understand correctly: > > this.flushCursorsTask = > scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors), > config.getCursorPositionFlushSeconds(), > config.getCursorPositionFlushSeconds(), TimeUnit.SECONDS); > > which calls > > ml.getCursors().forEach(c -> ((ManagedCursorImpl) c).flush()); > > > which calls > > asyncMarkDelete(lastMarkDeleteEntry.newPosition, > lastMarkDeleteEntry.properties, new MarkDeleteCallback() { > > > which gets to here > > // Apply rate limiting to mark-delete operations > if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) { > isDirty = true; > updateLastMarkDeleteEntryToLatest(newPosition, properties); > callback.markDeleteComplete(ctx); > return; > } > internalAsyncMarkDelete(newPosition, properties, callback, ctx); > > > So you end up in the rate limiter check even if you come via the scheduled > route, no? > > > On Wed, Jun 15, 2022 at 9:11 PM Matteo Merli <matteo.me...@gmail.com> wrote: > > > There is a background flush process that was added for this precise reason. > > > > In broker.conf > > ---- > > # How frequently to flush the cursor positions that were accumulated > > due to rate limiting. (seconds). > > # Default is 60 seconds > > managedLedgerCursorPositionFlushSeconds=60 > > > > -- > > Matteo Merli > > <matteo.me...@gmail.com> > > > > On Wed, Jun 15, 2022 at 8:45 AM Asaf Mesika <asaf.mes...@gmail.com> wrote: > > > > > > Hi, > > > > > > In the Managed Cursor implementation, I saw that the position state is > > > persisted in the ledger whenever a user calls acknowledge, be it > > individual > > > or cumulative. > > > > > > For performance reasons there is a rate limiter, defaulting to 1 persist > > > action per second. > > > > > > The bug: > > > 17:00:00.003 - user ack a single message --> state it written to ledger > > > 17:00:00.004 - 17:00:00.400 - use acks a single message 100 times. > > > nothing happens after that on that subscription. > > > > > > 18:00 - machine crashed. > > > You "lost" the state hence 100 messages have to be redelivered. > > > > > > I was wondering if it makes sense to add a timeout - say 10sec. If > > nothing > > > happens to the subscription, flush the state after 10sec if it's dirty. > > > > > > WDYT? > >