Thank you all for the feedback. My take from this is the feature is needed and the general consensus is to proceed with it. I'll start a vote thread.
Compression of the state (already used if enabled) and a more compact serialization format (as in Rajan's PR) alone are partial solutions that move the limit further but do not solve the problem completely. @Lari: I think we can simply use managedLedgerMaxUnackedRangesToPersist as a limiter, as the state is already persisted with the cursor. Adding another config to allow single entry/multi entry storage of the state feels like unnecessary complication for the configuration, which is not that easy to understand for users. persistentUnackedRangesWithMultipleEntriesEnabled alerady used in RangeSetWrapper and I don't think we should use it for other purposes. On Sun, Sep 22, 2024 at 11:52 PM Lari Hotari <lhot...@apache.org> wrote: > Thanks for driving this, Andrey. This proposal is needed and very useful. > > One detail that should be addressed is the fact that there's an earlier > PIP which wasn't fully implemented. It's "PIP 81: Split the individual > acknowledgments into multiple entries." > > https://github.com/apache/pulsar/wiki/PIP-81%3A-Split-the-individual-acknowledgments-into-multiple-entries > > 300 PIPs later, and here we are. :) > > There was a larger PR for PIP-81 that was closed: > https://github.com/apache/pulsar/pull/10729 > Some parts of it were split and merged, such as #15425 and #15607. > https://github.com/apache/pulsar/pull/15425 > https://github.com/apache/pulsar/pull/15607 > > After this, the implementation stalled. The explanation is in the comment > https://github.com/apache/pulsar/pull/22799#issuecomment-2365128854: > "Since a PR implemented the compression of PositionInfo, the size of > PositionInfo can be greatly reduced, and the problem of Entry size > exceeding the threshold will no longer occur, so this PIP was not further > promoted." > It seems that the comment refers to "ManagedLedgerInfo compression," > https://github.com/apache/pulsar/pull/11490 or "PIP-146: > ManagedCursorInfo compression," > https://github.com/apache/pulsar/issues/14529. > It seems that the compression resolved it for many use cases, but the > problem wasn't addressed if the compressed size goes over the threshold. > > PR 15607 (one part of PIP-81 implementation) added a configuration option > "persistentUnackedRangesWithMultipleEntriesEnabled" into broker.conf: > > https://github.com/apache/pulsar/blob/9012422bcbaac7b38820ce545cd5a3b4f8b586d0/conf/broker.conf#L1944-L1946 > > I'd suggest that PIP-381 also resolves this situation where we have > pending changes from PIP-81 in the codebase. One possibility would be to > take over the PIP-81 changes and also use > "persistentUnackedRangesWithMultipleEntriesEnabled" as the feature toggle. > > -Lari > > On 2024/09/20 23:40:11 Andrey Yegorov wrote: > > Hello, > > > > I created a PIP for handling large PositionInfo state (large number of > > unacked ranges in cursor.) > > > > PIP PR: https://github.com/apache/pulsar/pull/23328 > > Proposed implementation: https://github.com/apache/pulsar/pull/22799 > > > > Relevant excerpts from PIP: > > ----------- > > Background knowledge > > < > https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#background-knowledge > > > > > > In case of KEY_SHARED subscription and out-of-order acknowledgments, the > > PositionInfo state can be persisted to preserve the state, with > > configurable maximum number of ranges to persist: > > > > # Max number of "acknowledgment holes" that are going to be persistently > stored. > > # When acknowledging out of order, a consumer will leave holes that are > supposed > > # to be quickly filled by acking all the messages. The information of > which > > # messages are acknowledged is persisted by compressing in "ranges" of > messages > > # that were acknowledged. After the max number of ranges is reached, > > the information > > # will only be tracked in memory and messages will be redelivered in > case of > > # crashes. > > managedLedgerMaxUnackedRangesToPersist=10000 > > > > The PositionInfo state is stored to the BookKeeper as a single entry, and > > it can grow large if the number of ranges is large. Currently, this means > > that BookKeeper can fail persisting too large PositionInfo state, e.g. > over > > 1MB by default and the ManagedCursor recovery on topic reload might not > > succeed. > > Motivation > > > > While keeping the number of ranges low to prevent such problems is a > common > > sense solution, there are cases where the higher number of ranges is > > required. For example, in case of the JMS protocol handler, JMS consumers > > with filters may end up processing data out of order and/or at different > > speed, and the number of ranges can grow large. > > Goals > > < > https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#goals > > > > > > Store the PositionInfo state in a BookKeeper ledger as multiple entries > if > > the state grows too large to be stored as a single entry. > > In Scope > > < > https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#in-scope > > > > > > Transparent backwards compatibility if the PositionInfo state is small > > enough. > > Out of Scope > > < > https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#out-of-scope > > > > > > Backwards compatibility in case of the PositionInfo state is too large to > > be stored as a single entry. > > High Level Design > > < > https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#high-level-design > > > > > > Cursor state writes and reads are happening at the same cases as > currently, > > without changes. > > > > Write path: > > > > 1. serialize the PositionInfo state to a byte array. > > 2. if the byte array is smaller than the threshold, store it as a > single > > entry, as now. Done. > > 3. if the byte array is larger than the threshold, split it to smaller > > chunks and store the chunks in a BookKeeper ledger. > > 4. write the "footer" into the metadata store as a last entry. > > > > See persistPositionToLedger() in ManagedCursorImpl for the > implementation. > > > > The footer is a JSON representation of > > > > public static final class ChunkSequenceFooter { > > private int numParts; > > private int length; > > } > > > > > > Read path: > > > > 1. read the last entry from the metadata store. > > 2. if the entry does not appear to be a JSON, treat it as serialized > > PositionInfo state and use it as is. Done. > > 3. if the footer is a JSON, parse number of chunks and length from the > > json. > > 4. read the chunks from the BookKeeper ledger (entries from startPos = > > footerPosition - chunkSequenceFooter.numParts to footerPosition - 1) > and > > merge them. > > 5. parse the merged byte array as a PositionInfo state. > > > > See recoverFromLedgerByEntryId() in ManagedCursorImpl for the > > implementation. > > > > --- > > Andrey > > > -- Andrey Yegorov