Hello,

I created a PIP for handling large PositionInfo state (large number of
unacked ranges in cursor.)

PIP PR: https://github.com/apache/pulsar/pull/23328
Proposed implementation: https://github.com/apache/pulsar/pull/22799

Relevant excerpts from PIP:
-----------
Background knowledge
<https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#background-knowledge>

In case of KEY_SHARED subscription and out-of-order acknowledgments, the
PositionInfo state can be persisted to preserve the state, with
configurable maximum number of ranges to persist:

# Max number of "acknowledgment holes" that are going to be persistently stored.
# When acknowledging out of order, a consumer will leave holes that are supposed
# to be quickly filled by acking all the messages. The information of which
# messages are acknowledged is persisted by compressing in "ranges" of messages
# that were acknowledged. After the max number of ranges is reached,
the information
# will only be tracked in memory and messages will be redelivered in case of
# crashes.
managedLedgerMaxUnackedRangesToPersist=10000

The PositionInfo state is stored to the BookKeeper as a single entry, and
it can grow large if the number of ranges is large. Currently, this means
that BookKeeper can fail persisting too large PositionInfo state, e.g. over
1MB by default and the ManagedCursor recovery on topic reload might not
succeed.
Motivation

While keeping the number of ranges low to prevent such problems is a common
sense solution, there are cases where the higher number of ranges is
required. For example, in case of the JMS protocol handler, JMS consumers
with filters may end up processing data out of order and/or at different
speed, and the number of ranges can grow large.
Goals
<https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#goals>

Store the PositionInfo state in a BookKeeper ledger as multiple entries if
the state grows too large to be stored as a single entry.
In Scope
<https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#in-scope>

Transparent backwards compatibility if the PositionInfo state is small
enough.
Out of Scope
<https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#out-of-scope>

Backwards compatibility in case of the PositionInfo state is too large to
be stored as a single entry.
High Level Design
<https://github.com/apache/pulsar/blob/124255a82d145160d6d729a6aebd6aad47fa051e/pip/pip-381-large-positioninfo.md#high-level-design>

Cursor state writes and reads are happening at the same cases as currently,
without changes.

Write path:

   1. serialize the PositionInfo state to a byte array.
   2. if the byte array is smaller than the threshold, store it as a single
   entry, as now. Done.
   3. if the byte array is larger than the threshold, split it to smaller
   chunks and store the chunks in a BookKeeper ledger.
   4. write the "footer" into the metadata store as a last entry.

See persistPositionToLedger() in ManagedCursorImpl for the implementation.

The footer is a JSON representation of

    public static final class ChunkSequenceFooter {
        private int numParts;
        private int length;
    }


Read path:

   1. read the last entry from the metadata store.
   2. if the entry does not appear to be a JSON, treat it as serialized
   PositionInfo state and use it as is. Done.
   3. if the footer is a JSON, parse number of chunks and length from the
   json.
   4. read the chunks from the BookKeeper ledger (entries from startPos =
   footerPosition - chunkSequenceFooter.numParts to footerPosition - 1) and
   merge them.
   5. parse the merged byte array as a PositionInfo state.

See recoverFromLedgerByEntryId() in ManagedCursorImpl for the
implementation.

---
Andrey

Reply via email to