This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 516ce04d346 [improve][pip] PIP-429: Optimize Handling of Compacted
Last Entry by Skipping Payload Buffer Parsing (#24439)
516ce04d346 is described below
commit 516ce04d3468a08b846483e839d7a28955212740
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Jul 16 16:28:56 2025 +0800
[improve][pip] PIP-429: Optimize Handling of Compacted Last Entry by
Skipping Payload Buffer Parsing (#24439)
---
pip/pip-429.md | 201 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 201 insertions(+)
diff --git a/pip/pip-429.md b/pip/pip-429.md
new file mode 100644
index 00000000000..8d56455ced0
--- /dev/null
+++ b/pip/pip-429.md
@@ -0,0 +1,201 @@
+# PIP-429: Optimize Handling of Compacted Last Entry by Skipping Payload
Buffer Parsing
+
+# Background knowledge
+
+The typical reader's work flow looks like:
+
+```java
+while (reader.hasMessageAvailable()) {
+ final var msg = reader.readNext();
+ handleMessage(msg);
+}
+```
+
+`hasMessageAvailable` could perform the `GetLastMessageId` RPC to get the last
message ID from broker. However, when the reader is a compacted reader, i.e.
`readCompacted(true)` is configured when creating the reader, the server side
could compute the message ID from the last entry in the compaction service.
+
+Generally, with the built-in compaction service, when the entry represents a
batch of messages, the compacted entry buffer consists of:
+
+1. Serialized `MessageMetadata`
+2. Serialized payload buffer, which can be compressed or encrypted. The
uncompressed payload buffer consists of a list of `SingleMessageMetadata` and
value buffers.
+
+Take a typical example, when a producer that configures `LZ4` as the
compression type sends the following messages in a batch:
+
+```java
+producer.newMessage().key("k0").value("v0").sendAsync();
+producer.newMessage().key("k0").value("v1").sendAsync();
+producer.newMessage().key("k1").value("v0").sendAsync();
+producer.newMessage().key("k1").value(null).sendAsync();
+```
+
+After the compaction, the compacted entry buffer could be represented as
follows:
+
+```yaml
+metadata: # MessageMetadata
+ num_messages_in_batch: 4
+ compression: LZ4
+payload:
+ - singleMetadata: # SingleMessageMetadata
+ key: k0
+ compactedOut: true
+ value: ""
+ - singleMetadata:
+ key: k0
+ compactedOut: false
+ value: v1
+ - singleMetadata:
+ key: k1
+ compactedOut: true
+ value: ""
+ - singleMetadata:
+ key: k1
+ compactedOut: true
+ nullValue: true
+ value: ""
+```
+
+- For a given key, only the latest value will be retained, so `k0 => v0` will
be compacted out.
+- A null value means the key will be removed, so `k1 => v0` and `k1 => null`
will be compacted out.
+
+Prior to [#18877](https://github.com/apache/pulsar/pull/18877), the
`hasMessageAvailable` and `readNext` loop might encounter issues because the
`GetLastMessageId` RPC returns `{ledger, entry, batchIndex=3}` as the last
message ID, which represents `k1 => null`.
+
+The issue occurs because the batch index of the last message ID is calculated
as `num_messages_in_batch - 1` without considering certain edge cases.
[#18877](https://github.com/apache/pulsar/pull/18877) resolves this problem by
uncompressing the compacted entry buffer on the broker side and filtering out
messages where the individual metadata has `compactedOut` set to `true`. This
ensures that only valid messages are considered when determining the last
message ID.
+
+The `compacted_out` field was first introduced in the early stages of
development through [#1361](https://github.com/apache/pulsar/pull/1361).
However, as part of the overall payload buffer, parsing a
`SingleMessageMetadata` currently requires decompressing the compacted entry
buffer. This process can be resource-intensive, particularly when handling
large topics or encrypted messages, leading to potential performance
bottlenecks.
+
+# Motivation
+
+Decompressing the payload buffer solely to check whether individual messages
have the `compacted_out` field set is both inefficient and restrictive, as it
imposes constraints on the payload buffer format. Furthermore, when using a
custom topic compaction service, the entry buffer in the compacted ledger may
not include a `SingleMessageMetadata` for every single message, adding further
complexity to the process.
+
+This challenge is exacerbated when messages are encrypted, as decryption is
not possible without the public key required for decryption. This limitation
also impacts the current compaction service, as encrypted messages cannot be
compacted. Consequently, operations such as the `GetLastMessageId` RPC will
fail, resulting in an error similar to the following:
+
+```
+org.apache.pulsar.client.api.PulsarClientException$BrokerMetadataException:
The subscription sub of the topic <topic> gets the last message id was failed
+{"errorMsg":"Failed to read last entry of the compacted Ledger Invalid unknown
tag type: 3","reqId":3317275583068061944, "remote":"localhost/127.0.0.1:50818",
"local":"/127.0.0.1:50823"}
+```
+
+Instead, the expected behavior is to return the last message ID (e.g., `k1 =>
null` in the previous example).
+
+Another issue arises from the assumption made by the `GetLastMessageId` RPC
that the compacted entry's payload buffer must always contain a
`SingleMessageMetadata` list. However, this is not always the case. For
instance, a custom topic compaction service might write a payload buffer that
omits the `SingleMessageMetadata`. In such cases, the `compactedOut`
information could instead be stored in the properties of the `MessageMetadata`,
but the `GetLastMessageId` RPC will always fail.
+
+The custom topic compaction service has the flexibility to serialize and
deserialize the payload buffer in a different format. However, it still depends
on the `GetLastMessageId` RPC in the `hasMessageAvailableAsync` and
`getLastMessageIdAsync` methods of the `RawReader` to compute the last message
ID. This reliance creates a compatibility issue, as the `GetLastMessageId` RPC
will fail when working with a payload buffer in a non-standard format, breaking
the functionality of these methods.
+
+# Goals
+
+## In Scope
+
+Instead of relying on the `compacted_out` field in `SingleMessageMetadata`,
this PIP proposes to use `MessageMetadata` to determine the last message ID in
the compacted last entry. Since `compacted_out` is no longer used, the payload
buffer's format could be improved as well.
+
+## Out of Scope
+
+# High Level Design
+
+To enhance efficiency and simplify the handling of compacted entries, a new
field will be added to `MessageMetadata` to record the batch indexes of all
retained single messages. This change allows the server to determine the last
message ID directly from the new field in `MessageMetadata` when processing a
`GetLastMessageId` request, rather than relying on the `compacted_out` field in
`SingleMessageMetadata`.
+
+With this update, the `compacted_out` field will no longer be used, and only
the retained messages will be included in the payload buffer. For example, the
previous representation:
+
+```yaml
+metadata: # MessageMetadata
+ num_messages_in_batch: 4
+ compression: LZ4
+ compacted_batch_indexes: [1] # Retained messages' batch indexes
+```
+
+The improvements are:
+1. **Reduced Payload Size**:
+ - In the new format, only the retained messages are included in the payload
buffer.
+ - For the example above, the new format contains just 1 pair of
`SingleMessageMetadata` and its corresponding value buffer, compared to 4 pairs
in the original format.
+
+2. **Efficient Batch Index Retrieval**:
+ - The actual batch index of retained messages can now be retrieved from the
`compacted_batch_indexes` field in `MessageMetadata`.
+ - In this example, the retained message's batch index is
`compacted_batch_indexes[0] = 1`.
+
+## Public-facing Changes
+
+### Public API
+
+Remove `readLastCompactedEntry` and `findEntryByEntryIndex` methods from
`TopicCompactionService` interface.
+- `findEntryByEntryIndex`: it's never used other in tests
+- `readLastCompactedEntry`: it exposes the `Entry` to the caller, while the
caller should only care about the position of the last message.
+
+Change the return value of `findEntryByPublishTime` from
`CompletableFuture<Entry>` to `CompletableFuture<Position>`. This change is
made because the caller only needs the position of the last message, not the
entire entry.
+
+Add the following new methods:
+
+```java
+/**
+ * Retrieve the position of the last message before compaction.
+ *
+ * @return A future that completes with the position of the last message
before compaction, or
+ * {@link MessagePosition#EARLIEST} if no such message exists.
+ */
+CompletableFuture<MessagePosition> getLastMessagePosition();
+
+/**
+ * Represents the position of a message.
+ * <p>
+ * The `ledgerId` and `entryId` together specify the exact entry to which the
message belongs. For batched messages,
+ * the `batchIndex` field indicates the index of the message within the batch.
If the message is not part of a
+ * batch, the `batchIndex` field is set to -1. The `publishTime` field
corresponds to the publishing time of the
+ * entry's metadata, providing a timestamp for when the entry was published.
+ * </p>
+ */
+record MessagePosition(long ledgerId, long entryId, int batchIndex, long
publishTime) {
+
+ public static final MessagePosition EARLIEST = new MessagePosition(-1L,
-1L, 0, 0);
+}
+```
+
+After this change, the processing of the entry buffer will be handled within
the `TopicCompactionService` instead of being managed on the caller side
(`ServerCnx`). This adjustment enhances the flexibility of the implementation,
enabling more advanced use cases. For example, it allows the position to be
stored in an external metadata service, providing greater scalability and
modularity.
+
+### Binary protocol
+
+A new field will be added to `MessageMetadata`:
+
+```protobuf
+ // Indicates the indexes of messages retained in the batch after
compaction. When a batch is compacted,
+ // some messages may be removed (compacted out). For example, if the
original batch contains:
+ // `k0 => v0, k1 => v1, k2 => v2, k1 => null`, the compacted batch will
retain only `k0 => v0` and `k2 => v2`.
+ // In this case, this field will be set to `[0, 2]`, and the payload
buffer will only include the retained messages.
+ //
+ // Note: Batches compacted by older versions of the compaction service do
not include this field. For such batches,
+ // the `compacted_out` field in `SingleMessageMetadata` must be checked to
identify and filter out compacted messages
+ // (e.g., `k1 => v1` and `k1 => null` in the example above).
+ repeated int32 compacted_batch_indexes = 31;
+```
+
+# Backward & Forward Compatibility
+
+For entry buffers written by old version brokers, there is no
`compacted_batch_indexes` field in the `MessageMetadata`. In this case, the
`GetLastMessageId` RPC will still work as before, relying on the
`compacted_out` field in `SingleMessageMetadata` to determine the last message
ID.
+
+Downgrading remains safe because the `compacted_batch_indexes` field, which is
unknown to older versions, will simply be ignored when parsing
`MessageMetadata`. Additionally, this proposal ensures backward compatibility
by retaining the `compacted_out` field in `SingleMessageMetadata`.
+
+# Alternatives
+
+### Using a Property in `MessageMetadata`
+
+[#24431](https://github.com/apache/pulsar/pull/24431) proposed a solution to
add a property to the `MessageMetadata` to indicate the last available
message's batch index in the batch. However, it's not flexiable and might have
conflicts with user provided properties.
+
+### Client-Side Computation of Last Compacted Message ID
+
+The previous solution this proposal is to pass the compacted last entry's
buffer as well as the `GetLastMessageId` response, so the client can compute
the last message ID by itself. It can even handle the encrypted messages
because the client side should have the public key to decrypt the message.
+
+However, it's unnecessarily complicated that the whole entry buffer will be
sent to the client for each `GetLastMessageId` RPC.
+
+### Payload Buffer Format Improvement
+
+Actually, we can reduce the payload buffer size by removing the
`SingleMessageMetadata` and empty value buffers for messages that are compacted
out.
+
+```yaml
+payload:
+ - singleMetadata:
+ key: k0
+ compactedOut: false
+ value: v1
+```
+
+However, it would bring a compatibility issue that older clients would not be
able to parse the new format unless a `MessagePayloadProcessor` is configured.
+
+# Links
+
+* Mailing List discussion thread:
https://lists.apache.org/thread/4jghjg7yd36cl8yonbsvk5njmm96vy0k
+* Mailing List voting thread:
https://lists.apache.org/thread/04wormrz3qpcl4w0vpcrm7401wmp4q3w