This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cf3d7d17d92 [pip] PIP-430: Pulsar Broker cache improvements: 
refactoring eviction and adding a new cache strategy based on expected read 
count (#24444)
cf3d7d17d92 is described below

commit cf3d7d17d92d780fae8b2723a4a3e21c31c34960
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Jul 28 15:11:00 2025 +0300

    [pip] PIP-430: Pulsar Broker cache improvements: refactoring eviction and 
adding a new cache strategy based on expected read count (#24444)
---
 pip/pip-430.md | 335 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 335 insertions(+)

diff --git a/pip/pip-430.md b/pip/pip-430.md
new file mode 100644
index 00000000000..74d8065bfd8
--- /dev/null
+++ b/pip/pip-430.md
@@ -0,0 +1,335 @@
+# PIP-430: Pulsar Broker cache improvements: refactoring eviction and adding a 
new cache strategy based on expected read count
+
+# Background knowledge
+
+Apache Pulsar brokers maintain an in-memory entry cache to reduce latency and 
load on BookKeeper and tiered storage (S3) by serving frequently accessed 
message data directly from memory.
+
+Key concepts:
+- **`ManagedLedgerImpl`**: Manages the storage for a single topic partition. 
Each `ManagedLedgerImpl` instance has its own `EntryCache` (typically 
`RangeEntryCacheImpl`) instance.
+- **`EntryCache`**: Stores message entries (payloads) in memory. The default 
implementation is `RangeEntryCacheImpl`, which uses a `RangeCache` internally.
+- **`RangeCache`**: A specialized cache storing entries mapped by their 
`Position` (ledgerId, entryId). It supports range-based operations for 
retrieval and expiration.
+- **`EntryCacheManager` (`RangeEntryCacheManagerImpl`)**: A global component 
that limits the total size of all entry caches in a broker. When the total size 
exceeds a threshold, it triggers eviction.
+- **Cache Eviction Policies**: Mechanisms to remove entries from the cache to 
make space or remove old data.
+    - **Timestamp-based eviction**: Removes entries older than a configured 
threshold (e.g., `managedLedgerCacheEvictionTimeThresholdMillis`). This is 
currently handled periodically by `ManagedLedgerFactoryImpl` iterating over all 
managed ledgers.
+    - **Size-based eviction**: Removes entries when the total cache size 
exceeds a configured limit (`managedLedgerCacheSizeMB`). The current 
implementation resides in `EntryCacheDefaultEvictionPolicy`, which selects a 
subset of larger caches and proportionally evicts entries from each entry cache 
to keep the total cache size under the limit.
+    - **Cursor-based eviction**: Invalidates cache entries up to the slowest 
consumer's read position or mark-delete position, depending on 
`cacheEvictionByMarkDeletedPosition`.
+
+The broker cache serves various read patterns:
+- **Tailing reads**: Consumers reading the latest published messages.
+- **Catch-up reads (backlogged cursors)**: Consumers reading older messages to 
catch up.
+- **Key_Shared subscription reads**: Messages with the same key are routed to 
the same consumer. If a consumer is slow, messages might be replayed while the 
cursor reads more messages for available consumers.
+
+# Motivation
+
+The current Pulsar broker entry cache implementation and its eviction 
mechanisms face several challenges that impact performance, efficiency, and 
predictability:
+
+1.  **Inefficient and Flawed Size-Based Eviction**:
+    The `EntryCacheDefaultEvictionPolicy` (the current default for size-based 
eviction) does not guarantee the removal of the oldest entries globally. It 
sorts individual `EntryCache` instances by their size, selects a percentage of 
the largest caches, and then asks each of them to evict a proportional amount 
of data. This can lead to newer entries being evicted from large, active caches 
while older, less relevant entries remain in smaller or less active caches, 
resulting in suboptimal c [...]
+
+2.  **Inefficient and Incorrect Timestamp-Based Eviction**:
+    The existing timestamp-based eviction mechanism, triggered by 
`ManagedLedgerFactoryImpl`, has significant performance and correctness issues:
+    *   **Performance**: It iterates through *all* `ManagedLedgerImpl` 
instances and their respective `EntryCache` instances periodically (default 
every 10ms). In brokers with a large number of topics, this frequent and 
exhaustive iteration leads to high CPU utilization and memory pressure.
+    *   **Correctness**: The per-cache eviction (e.g., 
`RangeCache.evictLEntriesBeforeTimestamp`) often assumes entries within a 
single `RangeCache` are primarily ordered by timestamp due to typical 
append-only workloads. This assumption breaks down with mixed read patterns 
like catch-up reads or when entries are inserted out of their natural position 
order (Key_shared subscription replay queue scenario), potentially leading to 
incorrect eviction decisions or inefficient scanning.
+
+3.  **Limited Cache Scope and Effectiveness for Diverse Read Patterns**:
+    The original `RangeCache` was primarily designed with tailing reads in 
mind. While support for caching for backlogged cursors and replay queue reads 
was added later, the eviction algorithms were not holistically updated to 
effectively manage mixed read patterns (tailing, catch-up, replays in 
Key_Shared). This can lead to:
+    *   Unnecessary BookKeeper and tiered storage (S3) reads during catch-up 
scenarios, even if data was recently read for another consumer.
+    *   Poor cache hit rates for Key_Shared subscriptions with slow consumers, 
as entries might be evicted before a replayed message (due to consumer 
unacknowledgment or redelivery request) is read again.
+
+4.  **Foundation for Advanced Caching Strategies Needed**:
+    The current cache architecture makes it difficult to implement more 
intelligent caching strategies that could further optimize for common Pulsar 
use cases, such as efficiently handling fan-out to multiple shared consumers or 
retaining entries expected to be read by several cursors.
+
+Addressing these issues is crucial for improving broker performance, reducing 
operational costs (lower BookKeeper load), and providing a more robust caching 
layer that can adapt to diverse workloads.
+
+# Goals
+
+The refactoring aims to make the cache eviction more robust, performant, and 
predictable. The "expected read count" strategy is an attempt to make the cache 
more aware of Pulsar's specific consumption patterns.
+
+## In Scope
+
+-   **Refactor Cache Eviction Mechanism**:
+    -   Replace the existing per-cache iteration for timestamp eviction and 
the `EntryCacheDefaultEvictionPolicy` for size-based eviction with a 
centralized, insertion-order aware mechanism.
+    -   Implement a `RangeCacheRemovalQueue` that tracks all cached entries 
globally in approximate insertion order.
+    -   Ensure timestamp-based eviction reliably removes entries older than 
the threshold by processing this queue.
+    -   Ensure size-based eviction globally removes the oldest entries first 
from this queue until the target size is met.
+-   **Introduce "Expected Read Count" Cache Strategy**:
+    -   Implement a new caching strategy where entries track an "expected read 
count," representing how many active cursors are anticipated to read that entry.
+    -   Prioritize retaining entries with a positive expected read count 
during size-based eviction.
+    -   Provide a new configuration option 
(`cacheEvictionByExpectedReadCount`) to enable this strategy.
+-   **Improve Performance and Efficiency**:
+    -   Reduce CPU overhead associated with cache eviction, particularly 
timestamp-based eviction.
+    -   Improve overall cache hit rates by making better eviction decisions.
+-   **Enhance Correctness**: Ensure eviction policies work correctly across 
various read patterns.
+-   **Provide a Foundation for Future Cache Optimizations**: The refactored 
design should make it easier to implement further caching improvements, such as 
more sophisticated strategies for catch-up reads or Key_Shared subscriptions.
+-   **Simplify RangeCache Implementation**: Remove unnecessary generic type 
parameters from the RangeCache implementation. Since RangeCache is used 
exclusively for a single purpose within the Pulsar codebase (caching managed 
ledger entries), the generic key and value parameters add unnecessary 
complexity without providing any practical benefit. This simplification will 
improve code readability and reduce cognitive overhead.
+
+# High Level Design
+
+The proposed solution involves two main components: a refactored eviction 
mechanism using a centralized removal queue and a new cache strategy based on 
expected read count.
+
+## 1. Centralized Cache Eviction with `RangeCacheRemovalQueue`
+
+A new component, `RangeCacheRemovalQueue`, will be introduced at the 
`EntryCacheManager` level.
+-   **Entry Tracking**: When an entry is inserted into any 
`RangeEntryCacheImpl` (the per-ledger cache), a lightweight wrapper for this 
entry (`RangeCacheEntryWrapper`) is also added to this global 
`RangeCacheRemovalQueue`. This queue maintains entries in their global 
insertion order (FIFO). This wrapper is already necessary in 
`RangeEntryCacheImpl` to prevent consistency issues. The current internal 
wrapper class is refactored to a top-level class so that it can be used with 
the removal queue.
+-   **Timestamp-Based Eviction**: A single, periodic task (e.g., managed by 
`ManagedLedgerFactoryImpl`'s `cacheEvictionExecutor`) will process the 
`RangeCacheRemovalQueue`. It will iterate from the head of the queue, removing 
entries whose `timestampNanos` are older than the 
`cacheEvictionTimeThresholdNanos`. Since the queue is insertion-ordered, this 
process can often stop early once it encounters an entry that is not expired.
+-   **Size-Based Eviction**: When the `EntryCacheManager` detects that the 
total cache size exceeds `evictionTriggerThresholdPercent * maxSize`, it will 
trigger an eviction cycle. This cycle will also process the 
`RangeCacheRemovalQueue` from the head, removing the oldest entries (regardless 
of which specific ledger they belong to) until the cache size is brought down 
to `cacheEvictionWatermark * maxSize`.
+-   **Stashing Mechanism**: During size-based eviction, if an entry is 
encountered that should not be evicted immediately (e.g., due to a positive 
"expected read count" as per the new strategy, and it hasn't met timestamp 
expiration), it will be temporarily "stashed" (moved to a secondary list within 
`RangeCacheRemovalQueue`). Stashed entries are reconsidered in subsequent 
eviction passes or if they eventually expire by timestamp. This prevents 
premature eviction of entries that are like [...]
+
+This centralized approach replaces the distributed, per-cache iteration for 
timestamp eviction and the less precise `EntryCacheDefaultEvictionPolicy` for 
size eviction, leading to more globally optimal and efficient eviction 
decisions.
+
+## 2. "Expected Read Count" Cache Strategy
+
+This new strategy aims to improve cache hit rates by retaining entries that 
are likely to be read by multiple active consumers/cursors.
+-   **`EntryReadCountHandler`**: Each cached entry (`CachedEntryImpl`) will be 
associated with an `EntryReadCountHandlerImpl`. This handler maintains an 
`expectedReadCount` (an atomic integer).
+-   **Initialization**:
+    -   When a new entry is added to the ledger (`OpAddEntry`), its 
`expectedReadCount` is initialized to the number of *active* cursors.
+    -   When entries are read from BookKeeper or tiered storage and inserted 
into the cache (`RangeEntryCacheImpl.readFromStorage`), the `expectedReadCount` 
is initialized based on the current state of active cursors currently 
positioned before or at the entry being added. This information is sourced from 
`ManagedCursorContainer.getNumberOfCursorsAtSamePositionOrBefore(ManagedCursor)`.
+-   **Dynamic Updates**:
+    -   When an entry is actually delivered to a consumer from the cache and 
subsequently released by the delivery mechanism, its `expectedReadCount` is 
decremented (via `EntryReadCountHandler.markRead()`).
+    -   The `expectedReadCount` on a cached entry is incremented when the 
entry is added to the replay queue. This will de-prioritize the removal of the 
entry from the cache when size based eviction is performed so that when the 
Key_Shared consumer is available to read the entry, it would more likely be 
available in the cache.
+-   **Eviction Consideration**:
+    -   The `RangeCacheRemovalQueue`'s size-based eviction logic will consult 
`CachedEntry.canEvict()`. This method returns `true` if `expectedReadCount <= 
0`.
+    -   If `canEvict()` is false and the entry hasn't met timestamp 
expiration, it's stashed instead of being immediately evicted during a 
size-based eviction pass.
+    -   If an entry is forcefully removed from the cache (e.g., ledger 
deletion, or direct invalidation call that bypasses normal eviction), 
`EntryReadCountHandler.markEvicted()` is called to set a special state, 
preventing further increments and ensuring it's recognized as fully processed.
+-   **Configuration**: A new broker configuration setting, 
`cacheEvictionByExpectedReadCount` (boolean, default: `true`), will enable this 
strategy. If set to `false`, the cache will fall back to simpler eviction based 
on timestamp and potentially `cacheEvictionByMarkDeletedPosition` (if enabled, 
though the latter's effectiveness is reduced without the new read count logic).
+
+This strategy allows the cache to be more intelligent about retaining entries 
that have higher utility, especially in fan-out scenarios or with slightly 
lagging consumers in Key_Shared subscriptions. The replay queue is also used in 
Shared subscriptions when more entries are read than can be consumed by the 
available consumers. This strategy will also avoid cache misses in those 
scenarios.
+
+When using the expected read count caching strategy, there would be separate 
configuration values for specifying the time-to-live values for entries that 
have already reached their "expected read count" and for entries that still 
have a positive "expected read count". The current 
`managedLedgerCacheEvictionTimeThresholdMillis` setting would be used for 
entries that have reached their "expected read count". For entries that still 
have a positive "expected read count", a separate setting ` [...]
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### 1. `RangeCacheRemovalQueue`
+-   **Structure**:
+    -   `removalQueue`: 
`org.jctools.queues.MpscUnboundedArrayQueue<RangeCacheEntryWrapper>` to hold 
entries in insertion order.
+    -   `stash`: `RangeCacheRemovalQueueStash` (internal class), a list-based 
mechanism to temporarily hold entries that are not immediately evictable during 
size-based eviction (e.g., due to positive expected read count).
+        -   `RangeCacheRemovalQueueStash.add(RangeCacheEntryWrapper)`: Adds an 
entry to the stash.
+        -   `RangeCacheRemovalQueueStash.evictEntries(...)`: Iterates stashed 
entries, applying eviction predicate.
+        -   `RangeCacheRemovalQueueStash.maybeTrim()`: Compacts the stash list 
if many entries are removed to reclaim space.
+-   **Eviction Logic (`evictEntries` method with `EvictionPredicate`)**:
+    -   Synchronized method to ensure single-threaded eviction.
+    -   First processes the `stash`, then the `removalQueue`.
+    -   `EvictionPredicate` determines the `EvictionResult` for each entry:
+        -   `REMOVE`: Entry is evicted.
+        -   `STASH`: Entry is moved from `removalQueue` to `stash` (only 
during size-based eviction if entry is not evictable by read count but not yet 
timestamp-expired).
+        -   `STOP`: Stops processing further entries in the current pass 
(e.g., for timestamp eviction when a non-expired entry is found, or for size 
eviction when enough space is freed).
+        -   `MISSING`: Entry was already removed from cache by other means.
+    -   `evictLEntriesBeforeTimestamp(long timestampNanos)`: Uses a predicate: 
`e.timestampNanos < timestampNanos ? REMOVE : STOP`. (Note: The 
`STASH_AND_STOP` mentioned in a discussion snippet was refined; for timestamp 
eviction, it's just `STOP` if not expired, as stashing by itself is mainly for 
size-based eviction policy related to `expectedReadCount`).
+    -   `evictLeastAccessedEntries(long sizeToFree, long 
expirationTimestampNanos)`: Uses a predicate:
+        ```java
+        (e, c) -> {
+            if (c.removedSize >= sizeToFree) return EvictionResult.STOP;
+            boolean expired = e.timestampNanos < expirationTimestampNanos;
+            if (!(e.value.canEvict() || expired)) return EvictionResult.STASH; 
// Not evictable by read count and not expired
+            return EvictionResult.REMOVE;
+        }
+        ```
+
+### 2. `CachedEntry`
+
+Then value entries in the range cache are `CachedEntry` instances. The key is 
a `Position`.
+
+```java
+/**
+ * Interface for cached entries in the {@link RangeCache}.
+ */
+public interface CachedEntry extends Entry, ReferenceCounted {
+    boolean matchesKey(Position key);
+    boolean canEvict();
+    boolean increaseReadCount(int expectedReadCount);
+}
+```
+
+These are the relevant details related to the "expected read count" 
implementation:
+-   **`CachedEntry` interface**:
+    -   `boolean canEvict()`: Returns `true` if `expectedReadCount <= 0` (or 
if no read count handler is present).
+    -   `boolean increaseReadCount(int expectedReadCount)`: Increments the 
internal expected read count.
+-   **`CachedEntryImpl` class**: Implements `CachedEntry`. Holds a reference 
to `EntryReadCountHandlerImpl`.
+
+### 3. `EntryReadCountHandler`
+
+A `CachedEntry` implementatation holds a reference to a 
`EntryReadCountHandler` which handles the state related to read count.
+`EntryReadCountHandler` is used as an abstraction and integration between 
`RangeCacheRemovalQueue`, `CachedEntry` and the Managed Ledger layer. The 
`EntryImpl` in Managed Ledger layer has also been modified to hold an 
`EntryReadCountHandler` instance so that the information can be passed on. The 
implementation of `EntryReadCountHandler` holds the state in a single `volatile 
int` field.
+
+```java
+public interface EntryReadCountHandler {
+    int getExpectedReadCount();
+    boolean incrementExpectedReadCount();
+    boolean incrementExpectedReadCount(int increment);
+    void markRead();
+    void markEvicted();
+}
+```
+
+-   **`EntryReadCountHandler` interface**:
+    -   `int getExpectedReadCount()`
+    -   `boolean incrementExpectedReadCount(int increment)`
+    -   `boolean incrementExpectedReadCount()`
+    -   `void markRead()`: Decrements the count.
+    -   `void markEvicted()`: Sets count to a special "evicted" value (e.g., 
`Integer.MIN_VALUE / 2`) to stop further modifications and identify it as fully 
processed.
+-   **`EntryReadCountHandlerImpl` class**:
+    -   Uses `AtomicIntegerFieldUpdater` for `expectedReadCount`.
+    -   Handles the special `EVICTED_VALUE`.
+
+### 4. Integration with `ManagedLedgerImpl` and `ManagedCursorContainer`
+
+-   **`OpAddEntry`**:
+    -   When an entry is successfully added to a ledger:
+        ```java
+        if (!(ml instanceof ShadowManagedLedgerImpl)) {
+            int activeCursorCount = ml.getActiveCursors().size();
+            if (activeCursorCount > 0) {
+                int expectedReadCount = 0;
+                if (ml.getConfig().isCacheEvictionByExpectedReadCount()) {
+                    // For newly added entries, all active cursors are 
effectively "before" the added entry for future reads.
+                    expectedReadCount = activeCursorCount; 
+                }
+                EntryImpl entry = EntryImpl.create(ledgerId, entryId, data, 
expectedReadCount);
+                entry.setDecreaseReadCountOnRelease(false); // Cache owns the 
primary read count handling now
+                ml.entryCache.insert(entry);
+                entry.release();
+            }
+        }
+        ```
+-   **`ManagedLedgerImpl.asyncReadEntry`**:
+    -   The `expectedReadCount` parameter (an `IntSupplier`) is passed down to 
`RangeEntryCacheImpl.asyncReadEntry0`.
+    -   The `IntSupplier` would resolve to 
`opReadEntry.cursor.getNumberOfCursorsAtSamePositionOrBefore()` when 
`cacheEvictionByExpectedReadCount` is true.
+-   **`RangeEntryCacheImpl.readFromStorage`**:
+    -   Accepts an `IntSupplier expectedReadCount`.
+    -   When creating `EntryImpl` from `LedgerEntry`, it passes 
`expectedReadCount.getAsInt()` to `EntryImpl.create`.
+-   **`RangeEntryCacheImpl.insert(Entry entry)`**:
+    -   If an entry with the same position already exists in the cache:
+        ```java
+        CachedEntry previousEntry = entries.get(position);
+        if (previousEntry != null) {
+            try {
+                if (entry.getReadCountHandler() != null) {
+                    // If the entry is already in the cache, increase the 
expected read count on the existing entry
+                    // by the count from the new entry (which represents new 
potential readers for this insert op)
+                    if 
(previousEntry.increaseReadCount(entry.getReadCountHandler().getExpectedReadCount()))
 {
+                        return false; // Successfully updated existing entry's 
read count
+                    }
+                }
+            } finally {
+                previousEntry.release();
+            }
+        }
+        // ... proceed to create new CachedEntryImpl and put it ...
+        CachedEntryImpl cacheEntry = CachedEntryImpl.create(position, 
cachedData, (EntryReadCountHandlerImpl) entry.getReadCountHandler());
+        ```
+-   **`ManagedCursorContainer`**:
+    -   Needs an efficient way to count active cursors at or before a given 
position so that "expected read count" can be calculated for backlog reads. 
`ManagedCursorContainer` has been modified to use `TreeMap` so that cursors can 
be sorted by position.
+    ```java
+    // ManagedCursorContainer
+    private final NavigableMap<Position, List<Item>> sortedByPosition = new 
TreeMap<>();
+    // ... methods to add/remove/update items in sortedByPosition ...
+    public int getNumberOfCursorsAtSamePositionOrBefore(ManagedCursor cursor) {
+        long stamp = rwLock.readLock();
+        try {
+            Item item = cursors.get(cursor.getName()); // cursors is the 
existing ConcurrentMap
+            if (item == null || item.position == null) { // item.position can 
be null if cursor is not reading
+                return 0;
+            } else {
+                int sum = 0;
+                // Iterate through positions up to and including the cursor's 
current read position
+                for (Map.Entry<Position, List<Item>> entry : 
sortedByPosition.headMap(item.position, true).entrySet()) {
+                    sum += entry.getValue().size();
+                }
+                return sum;
+            }
+        } finally {
+            rwLock.unlockRead(stamp);
+        }
+    }
+    ```
+-   **`EntryImpl.release()`**:
+    -   A new flag `decreaseReadCountOnRelease` (default `false`) is added to 
`EntryImpl`.
+    -   When `EntryImpl.create(Entry other)` (copy constructor for dispatch) 
is called, this flag is set to `true` on the copy.
+    -   `EntryImpl.beforeDeallocate()`: if `decreaseReadCountOnRelease` is 
true and `readCountHandler` is not null, call `readCountHandler.markRead()`.
+    This ensures that only when an entry copy created for dispatch is 
released, the read count on the original cached entry's handler is decremented.
+
+### 4. `RangeEntryCacheManagerImpl`
+-   `doCacheEviction`: Called by the global eviction scheduler.
+    -   Invokes `evictionHandler.invalidateEntriesBeforeTimestampNanos`. Would 
also have to take separate `managedLedgerCacheEvictionTimeThresholdMillis` and 
`managedLedgerCacheEvictionTimeThresholdMillisMax` settings into account.
+    -   Calls `doEvictToWatermarkWhenOverThreshold()` to ensure cache size is 
within limits.
+-   `triggerEvictionWhenNeeded()`: Called after an entry is added.
+    -   If size > threshold, schedules `doEvictToWatermarkWhenOverThreshold()` 
on `cacheEvictionExecutor`.
+-   `doEvictToWatermarkWhenOverThreshold()`:
+    -   Calculates `sizeToEvict`.
+    -   Calls `evictionHandler.evictEntries(sizeToEvict, 
expirationTimestampNanosForNonEvictableEntries)`.
+
+## Note about entry cache size and how it's related to broker cache total size
+
+The calculation of the entry cache size is not accurate unless the 
`managedLedgerCacheCopyEntries` setting is set to `true`. This occurs because 
when the entry payload is read from BookKeeper or received from a Pulsar client 
publisher, it is "sliced" from an underlying buffer. The actual memory 
consumption can be higher when the other entries of the shared underlying 
buffer have been released and there is a single entry that retains the complete 
underlying buffer.
+
+For entries received from a Pulsar client publisher, the buffer size is 
between 16kB to 1MB. This is currently not configurable and it is set in code 
in [BrokerService.defaultServerBootstrap 
method](https://github.com/apache/pulsar/blob/19b4a05f888da35a584cf7ff6d594dd9df5e03d1/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L564-L565).
+
+```java
+        bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
+            new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 
1024));
+```
+
+The actual underlying buffer size will also depend on Netty's 
LengthFieldBasedFrameDecoder (extends ByteToMessageDecoder) logic which will 
cumulate buffers by copying until the complete entry has been read. In that 
case, the underlying buffer size can be larger than 1MB. The cumulated buffer 
will shared be shared across multiple entries since entry buffers are sliced 
from the cumulated buffer.
+
+For entries received from BookKeeper, it's similar. However, BookKeeper client 
defaults to Netty's [default channel config 
settings](https://github.com/netty/netty/blob/c3416d8ad2260f3c9bc98a9061d25bba92020f93/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java#L74-L76)
 for RCVBUF_ALLOCATOR, which has a maximum buffer size of 64kB.
+
+It would be useful to make the Pulsar broker side `RCVBUF_ALLOCATOR` 
parameters configurable so that the parameters could be tuned to a smaller 
size. It is very unlikely that the 1MB maximum size improves performance 
significantly. There is a performance tradeoff in the case where messages are 
large and buffers would have to be merged by copying in the 
LengthFieldBasedFrameDecoder. To avoid this overhead, it would be possible to 
make LengthFieldBasedFrameDecoder to use composite buffers  [...]
+
+Since in many cases, the underlying buffer where the slice is taken for a 
single entry is shared with entries that have been published or retrieved 
together, it is also common that these entries will get evicted together. 
Making the RCVBUF_ALLOCATOR settings configurable in the Pulsar broker is a 
sufficient mitigation for the problem.
+In cases where shared buffers add a lot of overhead of consumed memory, it 
will be possible to reduce it by setting the maximum size for 
`AdaptiveRecvByteBufAllocator` to 64kB, with the tradeoff of possible 
unnecessary buffer copies for messages exceeding 64kB. In BookKeeper server, 
there are settings `byteBufAllocatorSizeInitial`, `byteBufAllocatorSizeMin` and 
`byteBufAllocatorSizeMax` to configure the `AdaptiveRecvByteBufAllocator` 
parameters. The naming of these parameters in BookKeep [...]
+
+The proposal would be to add these configuration parameters to `broker.conf` 
for controlling Broker's `AdaptiveRecvByteBufAllocator` parameters:
+```properties
+# Netty adaptive receive buffer allocator's minimum size
+brokerAdaptiveRecvByteBufAllocatorMinimumSize=1024
+# Netty adaptive receive buffer allocator's initial size
+brokerAdaptiveRecvByteBufAllocatorInitialSize=16384
+# Netty receive adaptive buffer allocator's maximum size
+# Tune this value to a lower value to reduce overhead of the entries cached in 
the Broker cache due to shared underlying buffers
+brokerAdaptiveRecvByteBufAllocatorMaximumSize=1048576
+```
+
+## Public-facing Changes
+
+### Configuration
+-   **New Configuration**: `broker.conf`
+    -   `cacheEvictionByExpectedReadCount` (boolean): Enables the new eviction 
strategy based on expected read count. When true, entries with 
`expectedReadCount > 0` are less likely to be evicted by size-based eviction 
unless they also meet timestamp expiration.
+        Default: `true`.
+    -   `managedLedgerCacheEvictionTimeThresholdMillisMax` (Integer): cache 
entry time-to-live for entries that have pending expected reads (applies only 
when `cacheEvictionByExpectedReadCount=true`)
+        Default: `5000`
+    -   `brokerAdaptiveRecvByteBufAllocatorMinimumSize` (int): Netty adaptive 
receive buffer allocator's minimum size in bytes. Default: `1024`.
+    -   `brokerAdaptiveRecvByteBufAllocatorInitialSize` (int): Netty adaptive 
receive buffer allocator's initial size in bytes. Default: `16384`.
+    -   `brokerAdaptiveRecvByteBufAllocatorMaximumSize` (int): Netty adaptive 
receive buffer allocator's maximum size in bytes. Tune this value to a lower 
value to reduce overhead of the entries cached in the Broker cache due to 
shared underlying buffers. Default: `1048576`.
+-   **Modified Behavior of Existing Configurations**:
+    -   `managedLedgerCacheEvictionTimeThresholdMillis`: Applies only to 
entries which have reached the expected read count when 
`cacheEvictionByExpectedReadCount=true`.
+    -   `managedLedgerCacheSizeMB`: Still applies. The new size-based eviction 
will use the `RangeCacheRemovalQueue`.
+    -   `cacheEvictionByMarkDeletedPosition`: If 
`cacheEvictionByExpectedReadCount` is `true`, this setting's direct influence 
on preserving entries is diminished, as `expectedReadCount` provides a more 
granular control. However, mark-delete position updates still occur and will 
lead to `expectedReadCount` decrementing as cursors move past entries. If 
`cacheEvictionByExpectedReadCount` is `false`, this setting functions as before 
(though the underlying eviction for "up to slowest reader" [...]
+    -   `managedLedgerMinimumBacklogCursorsForCaching`, 
`managedLedgerMinimumBacklogEntriesForCaching`, 
`managedLedgerMaxBacklogBetweenCursorsForCaching`: These primarily affect 
whether a cursor is considered "active" for caching purposes (i.e., 
`ManagedCursorImpl.isCacheReadEntry()`). The `expectedReadCount` logic 
naturally incorporates this by only counting active cursors. If 
`cacheEvictionByExpectedReadCount` is true, these settings become less critical 
for direct cache retention deci [...]
+
+### Metrics & Monitoring
+
+Existing broker cache metrics will continue to function, reflecting the 
behavior of the new eviction system and broker cache strategy.
+
+### Backward & Forward Compatibility
+
+There are no compatibility concerns since the broker cache is handled at 
runtime, in a single broker.
+
+## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations
+-   This PIP does not directly interact with or change geo-replication 
mechanisms. Cache behavior is local to each broker in each cluster. 
Compatibility considerations are the same as for a standalone broker.
+
+# Alternatives
+
+1.  **Simple LRU (Least Recently Used) for Global Cache**:
+    -   A global LRU cache could be considered.
+    -   **Reason for rejection**: Standard LRU doesn't naturally account for 
Pulsar's specific read patterns where an entry might be predictably read 
multiple times (e.g., by different consumers in a shared subscription after a 
backlog is cleared, or by Key_Shared consumers). The "expected read count" 
provides a more domain-specific heuristic. Moreover, managing a global LRU 
across all topics with high concurrency and varying entry sizes can be complex 
and may require significant locking [...]
+
+2.  **Priority Queue for `RangeCacheRemovalQueue`**:
+    -   Instead of a simple FIFO queue, a priority queue ordered by 
`expectedReadCount` (ascending) and then timestamp (ascending) could be used.
+    -   **Reason for rejection (for this PIP)**: While potentially more 
precise, managing priorities in a queue where `expectedReadCount` can change 
dynamically for entries already in the queue adds significant complexity. 
Updates would require O(log N) or O(N) operations depending on the queue 
implementation and how updates are handled. The current MPSC queue with a 
"stash" for non-evictable items offers a simpler, lower-overhead approach for 
the initial refactoring. Performance of MPSC [...]
+
+# Links
+
+* Mailing List discussion thread: 
https://lists.apache.org/thread/o1ozbg468kxfd38pxk2ppzsstdnxnok2
+* Mailing List voting thread: 
https://lists.apache.org/thread/p6jroxhtjpk7hbgqt3hzjtjd2233zmxy

Reply via email to