Hi Andrey,

Thanks for submitting the PR as we have been facing this issue for a long
time now and we also have PR which solves this issue in a simple and a
fundamental way with proven perf results as well.

PR: https://github.com/apache/pulsar/pull/9292

But again I am not sure some folks blocked this PR without saying the
reason even after asking multiple times and PR progress has been blocked
since then , and it could be a good time to revive it and try to find the
right solution to address this issue.

Thanks,
Rajan

On Fri, Sep 20, 2024 at 4:40 PM Andrey Yegorov <ayego...@apache.org> wrote:

> Hello,
>
> I created a PIP for handling large PositionInfo state (large number of
> unacked ranges in cursor.)
>
> PIP PR: https://github.com/apache/pulsar/pull/23328
> Proposed implementation: https://github.com/apache/pulsar/pull/22799
>
> Relevant excerpts from PIP:
> -----------
> Background knowledge
> <
> https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#background-knowledge
> >
>
> In case of KEY_SHARED subscription and out-of-order acknowledgments, the
> PositionInfo state can be persisted to preserve the state, with
> configurable maximum number of ranges to persist:
>
> # Max number of "acknowledgment holes" that are going to be persistently
> stored.
> # When acknowledging out of order, a consumer will leave holes that are
> supposed
> # to be quickly filled by acking all the messages. The information of which
> # messages are acknowledged is persisted by compressing in "ranges" of
> messages
> # that were acknowledged. After the max number of ranges is reached,
> the information
> # will only be tracked in memory and messages will be redelivered in case
> of
> # crashes.
> managedLedgerMaxUnackedRangesToPersist=10000
>
> The PositionInfo state is stored to the BookKeeper as a single entry, and
> it can grow large if the number of ranges is large. Currently, this means
> that BookKeeper can fail persisting too large PositionInfo state, e.g. over
> 1MB by default and the ManagedCursor recovery on topic reload might not
> succeed.
> Motivation
>
> While keeping the number of ranges low to prevent such problems is a common
> sense solution, there are cases where the higher number of ranges is
> required. For example, in case of the JMS protocol handler, JMS consumers
> with filters may end up processing data out of order and/or at different
> speed, and the number of ranges can grow large.
> Goals
> <
> https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#goals
> >
>
> Store the PositionInfo state in a BookKeeper ledger as multiple entries if
> the state grows too large to be stored as a single entry.
> In Scope
> <
> https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#in-scope
> >
>
> Transparent backwards compatibility if the PositionInfo state is small
> enough.
> Out of Scope
> <
> https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#out-of-scope
> >
>
> Backwards compatibility in case of the PositionInfo state is too large to
> be stored as a single entry.
> High Level Design
> <
> https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#high-level-design
> >
>
> Cursor state writes and reads are happening at the same cases as currently,
> without changes.
>
> Write path:
>
>    1. serialize the PositionInfo state to a byte array.
>    2. if the byte array is smaller than the threshold, store it as a single
>    entry, as now. Done.
>    3. if the byte array is larger than the threshold, split it to smaller
>    chunks and store the chunks in a BookKeeper ledger.
>    4. write the "footer" into the metadata store as a last entry.
>
> See persistPositionToLedger() in ManagedCursorImpl for the implementation.
>
> The footer is a JSON representation of
>
>     public static final class ChunkSequenceFooter {
>         private int numParts;
>         private int length;
>     }
>
>
> Read path:
>
>    1. read the last entry from the metadata store.
>    2. if the entry does not appear to be a JSON, treat it as serialized
>    PositionInfo state and use it as is. Done.
>    3. if the footer is a JSON, parse number of chunks and length from the
>    json.
>    4. read the chunks from the BookKeeper ledger (entries from startPos =
>    footerPosition - chunkSequenceFooter.numParts to footerPosition - 1) and
>    merge them.
>    5. parse the merged byte array as a PositionInfo state.
>
> See recoverFromLedgerByEntryId() in ManagedCursorImpl for the
> implementation.
>
> ---
> Andrey
>

Reply via email to