## Motivation

The current implementation of the read cache in the Pulsar broker has largely
remained unchanged for a long time, except for a few minor tweaks.

While the implementation is stable and reasonably efficient for
typical workloads,
the overhead required for managing the cache evictions in a broker
that is running
many topics can be pretty high in terms of extra CPU utilization and on the JVM
garbage collection to track an increased number of medium-lived objects.

The goal is to provide an alternative implementation that can adapt better to
a wider variety of operating conditions.

### Current implementation details

The broker cache is implemented as part of the `ManagedLedger` component,
which sits in the Pulsar broker and provides a higher level of
abstraction of top
of BookKeeper.

Each topic (and managed-ledger) has its own private cache space. This
cache is implemented
as a `ConcurrentSkipList` sorted map that maps `(ledgerId, entryId) ->
payload`. The payload
is a `ByteBuf` reference that can either be a slice of a `ByteBuf` that we got
when reading from a socket, or it can be a copied buffer.

Each topic cache is allowed to use the full broker max cache size before an
eviction is triggered. The total cache size is effectively a resource
shared across all
the topics, where a topic can use a more prominent portion of it if it
"asks for more".

When the eviction happens, we need to do an expensive ranking of all
the caches in the broker
and do an eviction in a proportional way to the currently used space
for each of them.

The bigger problem is represented by the `ConcurrentSkipList` and the
`ByteBuf` objects
that need to be tracked. The skip list is essentially like a "tree"
structure and needs to
maintain Java objects for each entry in the cache. We also need to
potentially have
a huge number of ByteBuf objects.

A cache workload is typically the worst-case scenario for each garbage
collector implementation because it involves creating objects, storing
them for some amount of
time and then throwing them away. During that time, the GC would have
already tenured these
objects and copy them into an "old generation" space, and sometime
later, a costly compaction
of that memory would have to be performed.

To mitigate the effect of the cache workload on the GC, we're being
very aggressive in
purging the cache by triggering time-based eviction. By putting a max
TTL on the elements in
the cache, we can avoid keeping the objects around for too long to be
a problem for the GC.

The reverse side of this is that we're artificially reducing the cache
capacity to a very
short time frame, reducing the cache usefulness.

The other problem is the CPU cost involved in doing these frequent
evictions, which can
be very high when there are 10s of thousands of topics in a broker.


## Proposed changes

Instead of dealing with individual caches for each topic, let's adopt
a model where
there is a single cache space for the broker.

This cache is broken into N segments which act as a circular buffer.
Whenever a segment
is full, we start writing into the next one, and when we reach the
last one, we will
restart recycling the first segment.

This model has been working very well for the BookKeeper `ReadCache`:
https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java

The eviction becomes a completely trivial operation, buffers are just
rotated and
overwritten. We don't need to do any per-topic task or keep track of
utilization.

Today, there are 2 ways of configuring the cache, one that "copies"
data into the cache
and another that will just use reference-counting on the original
buffers to avoid
payload copies.

### Memory copies into the cache

Each segment is composed of a buffer, an offset, and a hashmap which maps
`(ledgerId, entryId) -> offset`.


The advantage of this approach is that entries are copied into the cache buffer
(in direct memory), and we don't need to keep any long-lived Java objects around

### Keeping reference-counted buffers in the cache

Each segment in the cache will contain a map `(ledgerId, entryId) -> ByteBuf`.
Buffers will have an increase reference count that will keep the data
alive as long
as the buffer is in the cache and it will be released when the cache
segment is rotated.

The advantage is we avoid any memory copy when inserting into or
reading from the cache.
The disadvantage is that we will have references to all the `ByteBuf`
objects that are in the cache.

### API changes

No user-facing API changes are required.

### New configuration options

The existing cache implementation will not be removed at this point. Users will
be able to configure the old implementation in `broker.conf`.

This option will be helpful in case of performance regressions would be seen for
some use cases with the new cache implementation.



--
Matteo Merli
<mme...@apache.org>

Reply via email to