Thanks for driving this, Andrey. This proposal is needed and very useful.

One detail that should be addressed is the fact that there's an earlier PIP 
which wasn't fully implemented. It's "PIP 81: Split the individual 
acknowledgments into multiple entries."
https://github.com/apache/pulsar/wiki/PIP-81%3A-Split-the-individual-acknowledgments-into-multiple-entries

300 PIPs later, and here we are. :)

There was a larger PR for PIP-81 that was closed: 
https://github.com/apache/pulsar/pull/10729
Some parts of it were split and merged, such as #15425 and #15607.
https://github.com/apache/pulsar/pull/15425
https://github.com/apache/pulsar/pull/15607

After this, the implementation stalled. The explanation is in the comment 
https://github.com/apache/pulsar/pull/22799#issuecomment-2365128854:
"Since a PR implemented the compression of PositionInfo, the size of 
PositionInfo can be greatly reduced, and the problem of Entry size exceeding 
the threshold will no longer occur, so this PIP was not further promoted."
It seems that the comment refers to "ManagedLedgerInfo compression," 
https://github.com/apache/pulsar/pull/11490 or "PIP-146: ManagedCursorInfo 
compression," https://github.com/apache/pulsar/issues/14529.
It seems that the compression resolved it for many use cases, but the problem 
wasn't addressed if the compressed size goes over the threshold.

PR 15607 (one part of PIP-81 implementation) added a configuration option 
"persistentUnackedRangesWithMultipleEntriesEnabled" into broker.conf:
https://github.com/apache/pulsar/blob/9012422bcbaac7b38820ce545cd5a3b4f8b586d0/conf/broker.conf#L1944-L1946

I'd suggest that PIP-381 also resolves this situation where we have pending 
changes from PIP-81 in the codebase. One possibility would be to take over the 
PIP-81 changes and also use "persistentUnackedRangesWithMultipleEntriesEnabled" 
as the feature toggle.

-Lari
 
On 2024/09/20 23:40:11 Andrey Yegorov wrote:
> Hello,
> 
> I created a PIP for handling large PositionInfo state (large number of
> unacked ranges in cursor.)
> 
> PIP PR: https://github.com/apache/pulsar/pull/23328
> Proposed implementation: https://github.com/apache/pulsar/pull/22799
> 
> Relevant excerpts from PIP:
> -----------
> Background knowledge
> <https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#background-knowledge>
> 
> In case of KEY_SHARED subscription and out-of-order acknowledgments, the
> PositionInfo state can be persisted to preserve the state, with
> configurable maximum number of ranges to persist:
> 
> # Max number of "acknowledgment holes" that are going to be persistently 
> stored.
> # When acknowledging out of order, a consumer will leave holes that are 
> supposed
> # to be quickly filled by acking all the messages. The information of which
> # messages are acknowledged is persisted by compressing in "ranges" of 
> messages
> # that were acknowledged. After the max number of ranges is reached,
> the information
> # will only be tracked in memory and messages will be redelivered in case of
> # crashes.
> managedLedgerMaxUnackedRangesToPersist=10000
> 
> The PositionInfo state is stored to the BookKeeper as a single entry, and
> it can grow large if the number of ranges is large. Currently, this means
> that BookKeeper can fail persisting too large PositionInfo state, e.g. over
> 1MB by default and the ManagedCursor recovery on topic reload might not
> succeed.
> Motivation
> 
> While keeping the number of ranges low to prevent such problems is a common
> sense solution, there are cases where the higher number of ranges is
> required. For example, in case of the JMS protocol handler, JMS consumers
> with filters may end up processing data out of order and/or at different
> speed, and the number of ranges can grow large.
> Goals
> <https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#goals>
> 
> Store the PositionInfo state in a BookKeeper ledger as multiple entries if
> the state grows too large to be stored as a single entry.
> In Scope
> <https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#in-scope>
> 
> Transparent backwards compatibility if the PositionInfo state is small
> enough.
> Out of Scope
> <https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#out-of-scope>
> 
> Backwards compatibility in case of the PositionInfo state is too large to
> be stored as a single entry.
> High Level Design
> <https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#high-level-design>
> 
> Cursor state writes and reads are happening at the same cases as currently,
> without changes.
> 
> Write path:
> 
>    1. serialize the PositionInfo state to a byte array.
>    2. if the byte array is smaller than the threshold, store it as a single
>    entry, as now. Done.
>    3. if the byte array is larger than the threshold, split it to smaller
>    chunks and store the chunks in a BookKeeper ledger.
>    4. write the "footer" into the metadata store as a last entry.
> 
> See persistPositionToLedger() in ManagedCursorImpl for the implementation.
> 
> The footer is a JSON representation of
> 
>     public static final class ChunkSequenceFooter {
>         private int numParts;
>         private int length;
>     }
> 
> 
> Read path:
> 
>    1. read the last entry from the metadata store.
>    2. if the entry does not appear to be a JSON, treat it as serialized
>    PositionInfo state and use it as is. Done.
>    3. if the footer is a JSON, parse number of chunks and length from the
>    json.
>    4. read the chunks from the BookKeeper ledger (entries from startPos =
>    footerPosition - chunkSequenceFooter.numParts to footerPosition - 1) and
>    merge them.
>    5. parse the merged byte array as a PositionInfo state.
> 
> See recoverFromLedgerByEntryId() in ManagedCursorImpl for the
> implementation.
> 
> ---
> Andrey
> 

Reply via email to