Hi Andrey, Thanks for submitting the PR as we have been facing this issue for a long time now and we also have PR which solves this issue in a simple and a fundamental way with proven perf results as well.
PR: https://github.com/apache/pulsar/pull/9292 But again I am not sure some folks blocked this PR without saying the reason even after asking multiple times and PR progress has been blocked since then , and it could be a good time to revive it and try to find the right solution to address this issue. Thanks, Rajan On Fri, Sep 20, 2024 at 4:40 PM Andrey Yegorov <ayego...@apache.org> wrote: > Hello, > > I created a PIP for handling large PositionInfo state (large number of > unacked ranges in cursor.) > > PIP PR: https://github.com/apache/pulsar/pull/23328 > Proposed implementation: https://github.com/apache/pulsar/pull/22799 > > Relevant excerpts from PIP: > ----------- > Background knowledge > < > https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#background-knowledge > > > > In case of KEY_SHARED subscription and out-of-order acknowledgments, the > PositionInfo state can be persisted to preserve the state, with > configurable maximum number of ranges to persist: > > # Max number of "acknowledgment holes" that are going to be persistently > stored. > # When acknowledging out of order, a consumer will leave holes that are > supposed > # to be quickly filled by acking all the messages. The information of which > # messages are acknowledged is persisted by compressing in "ranges" of > messages > # that were acknowledged. After the max number of ranges is reached, > the information > # will only be tracked in memory and messages will be redelivered in case > of > # crashes. > managedLedgerMaxUnackedRangesToPersist=10000 > > The PositionInfo state is stored to the BookKeeper as a single entry, and > it can grow large if the number of ranges is large. Currently, this means > that BookKeeper can fail persisting too large PositionInfo state, e.g. over > 1MB by default and the ManagedCursor recovery on topic reload might not > succeed. > Motivation > > While keeping the number of ranges low to prevent such problems is a common > sense solution, there are cases where the higher number of ranges is > required. For example, in case of the JMS protocol handler, JMS consumers > with filters may end up processing data out of order and/or at different > speed, and the number of ranges can grow large. > Goals > < > https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#goals > > > > Store the PositionInfo state in a BookKeeper ledger as multiple entries if > the state grows too large to be stored as a single entry. > In Scope > < > https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#in-scope > > > > Transparent backwards compatibility if the PositionInfo state is small > enough. > Out of Scope > < > https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#out-of-scope > > > > Backwards compatibility in case of the PositionInfo state is too large to > be stored as a single entry. > High Level Design > < > https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#high-level-design > > > > Cursor state writes and reads are happening at the same cases as currently, > without changes. > > Write path: > > 1. serialize the PositionInfo state to a byte array. > 2. if the byte array is smaller than the threshold, store it as a single > entry, as now. Done. > 3. if the byte array is larger than the threshold, split it to smaller > chunks and store the chunks in a BookKeeper ledger. > 4. write the "footer" into the metadata store as a last entry. > > See persistPositionToLedger() in ManagedCursorImpl for the implementation. > > The footer is a JSON representation of > > public static final class ChunkSequenceFooter { > private int numParts; > private int length; > } > > > Read path: > > 1. read the last entry from the metadata store. > 2. if the entry does not appear to be a JSON, treat it as serialized > PositionInfo state and use it as is. Done. > 3. if the footer is a JSON, parse number of chunks and length from the > json. > 4. read the chunks from the BookKeeper ledger (entries from startPos = > footerPosition - chunkSequenceFooter.numParts to footerPosition - 1) and > merge them. > 5. parse the merged byte array as a PositionInfo state. > > See recoverFromLedgerByEntryId() in ManagedCursorImpl for the > implementation. > > --- > Andrey >