Hi all,

We have drafted a proposal for supporting lightweight raw Message metadata
which can be found at
https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-raw-Message-metadata
 and
https://docs.google.com/document/d/1IgnF9AJzL6JG6G4EL_xcoQxvOpd7bUXcgxFApBiPOFY

Also, I copy it to the email thread for easier viewing.

Any suggestions or ideas are welcomed to join the discussion.



## PIP-70:  Introduce lightweight raw Message metadata

### 1. Motivation

For messages in Pulsar, If we want to add new property, we always change
the `MessageMetadata` in protocol(PulsarApi.proto), this kind of property
could be understood by both the broker side and client side by
deserializing the `MessageMetadata` . But in some different cases,, the
property needs to be added from the broker side, Or need to be understood
by the broker side in a low cost way. When the broker side gets the message
produced from the client,  we could add the property at a new area, which
does not combine with `MessageMetadata`, and no need deserializing original
`MessageMetadata` when gets it out ; and when the broker sends the message
to client, we could choose to filter out this part of property(or not as
the client needs). We call this kind of property “raw Message metadata”. By
this way, the “raw Message metadata” consumption is independent, and not
related with the original `MessageMetadata`.

The benefit for this kind of “raw Message metadata” is that the broker does
not need to  serialize/deserialize for the protobuf-ed `MessageMetadata`,
this will provide a better performance. And also could provide a lot of
features that are not supported yet.

Here are some of the use cases for raw Message metadata:
1) Provide ordered messages by time(broker side) sequence to make message
seek by time more accurate.
Currently, each message has a `publish_time`, it uses client side time, but
for different producers in different clients, the time may not align
between clients, and cause the message order and the message time
(`publish_time`) order may be different.  But each topic-partition only has
one owner broker, if we append broker side time in the “raw Message
metadata”, we could make sure the message order is aligned with broker side
time. With this feature, we could handle the message seek by time more
accurately.

2) Provide continuous message sequence-Id for messages in one
topic-partition.
MessageId is a combination of ledgerId+entryId+batchIndex; for a partition
that contains more than one ledger, the Ids inside is not continuous. By
this solution, we could append a sequence-Id at the end of each Message.
This will make the message sequence management earlier.

In this proposal, we will take count in the first feature “provide ordered
message by time(broker side) sequence” mentioned above, this will be easier
to go through the proposal.

### 2. Message and “raw Message metadata” structure changes.

As mentioned above, there are 2 main benefits in this proposal:

1. Most of all the change happened on the Broker side.
2. Avoid to serialize/deserialize for the protobuf-ed `MessageMetadata`.

#### 2.1 Raw Message metadata structure in Protobuf

Protobuf used a lot in Pulsar, we could use Protobuf to do the raw Message
metadata serialize/deserialize.
In this example, we will save the broker side timestamp when each message
is sent from the broker to BookKeeper. So the definition is very simple.

```protobuf
message RawMessageMetadata {
    optional uint64 broker_timestamp = 1;
   }
```

#### 2.2 Message and “raw Message metadata” structure details

Each message is send from producer client to broker in this frame format:

```
[TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE]
[METADATA] [PAYLOAD]
```

The first 3 fields “[TOTAL_SIZE] [CMD_SIZE ] [CMD]” will be read in
`LengthFieldBasedFrameDecoder`  and `PulsarDecoder`, and left the rest part
handled in method
`org.apache.pulsar.broker.service.Producer.publishMessage`. The left part
“[MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD]” is usually
treated as “headersAndPayload” in the code. As described above, we do not
want this part to be changed at all, so we could take this part as a whole
package.

```
[MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD] ==>
[HEADERS_AND_PAYLOAD]
```

We need to add some fields to make ”Raw Message metadata” work well.

```
[RAW_METADATA_MAGIC_NUMBER] [RAW_METADATA_SIZE] [RAW_METADATA]
[HEADERS_AND_PAYLOAD]
```

RAW_METADATA_MAGIC_NUMBER is used to identify whether this feature is on,
and the message is handled by this feature.

RAW_METADATA_SIZE is added for this feature, and records the size of the
serialised raw Message metadata content.

RAW_METADATA is the serialised raw Message metadata content.

“HEADERS_AND_PAYLOAD” is the original ByteBuf data that contains metadata
and payload.

We put the new added fields before instead of after "HEADERS_AND_PAYLOAD"
 here, this is because

- Firstly , the “raw Message metadata” is an addition for origin protocol
and it can support this new feature without much modification for origin
wire protocol, especially it does not need to serialize/deserialize for the
protobuf-ed `MessageMetadata` build from producer.
- Secondly, if we put new fields after "HEADERS_AND_PAYLOAD" , we can't
know where the offset for new added fields since we don't know the the
length of "PAYLOAD", and also, fields after "PAYLOAD" will change the CRC32
checksum which means we need to recalculate the checksum one more time.

#### 2.3 Message and “raw Message metadata” lifecycle

The message process logic would be like this:

1. Producer client send the original message to broker, and parsed by
“PulsarDecoder”,  only “[HEADERS_AND_PAYLOAD]” left;
2. originally, “[HEADERS_AND_PAYLOAD]” will be write into BookKeeper by
method `asyncAddEntry` in `ManagedLedger`; but now we first add above
“[RAW_METADATA]” and related parts along with “[HEADERS_AND_PAYLOAD]”, then
write the whole ByteBuf into BookKeeper by `asyncAddEntry` ; so the "raw
Message metadata" is kept together with original message in BookKeeper.
3. When some features need to use this “raw Message metadata”, read the new
 BookKeeper “Entry”, and get the raw Message metadata information to serve
the new feature.
4. In this “provide ordered message by time(broker side) sequence” feature,
when a seek-by-time operation passes into the broker, we readout
“broker_timestamp” from “[RAW_METADATA]”.
5. After brokers read BookKeeper Entry out, and before send to the consumer
client, we need to filter out the raw Message metadata related part, and
only return “[HEADERS_AND_PAYLOAD]” part.

By this way, we don't need to  serialize/deserialize for the protobuf-ed
`MessageMetadata`.

For more details, in these steps:

##### a)) produced message handle and store

Producer client side message send to broker will be handled by method

```
publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long
batchSize, boolean isChunked)
```

in class `org.apache.pulsar.broker.service.Producer`. And it will finally
call into the method `asyncAddEntry` of `ManagedLedger`, it will write the
passed in serialized message as an Entry stored in BookKeeper.

```
void asyncAddEntry(ByteBuf headersAndPayload, AddEntryCallback callback,
Object ctx)
```

For raw Message metadata content,  we need to put the current broker
timestamp in `message RawMessageMetadata` and use protobuf to serialize it
into `ByteBuf`.  Then we put the size and content of serialized `ByteBuf`
along with original `ByteBuf headersAndPayload`.

Then “[HEADERS_AND_PAYLOAD]” turned into “[RAW_METADATA_MAGIC_NUMBER]
[RAW_METADATA_SIZE] [RAW_METADATA] [HEADERS_AND_PAYLOAD]” and stored in
BookKeeper.

##### b)) seek by time happens

Originally, when a seek_by_time comes in, it will finally calls into class
 
`org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages`
and the original logic is like this

```java
public void findMessages(final long timestamp,
AsyncCallbacks.FindEntryCallback callback) {
       …

cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
entry -> {
            MessageImpl msg = null;
            msg = MessageImpl.deserialize(entry.getDataBuffer());  < ===
deserialize
            return msg.getPublishTime() < timestamp;
        }, this, callback);
      …
```

Find message through the topic-partition, and read out the target Entry
from BookKeeper, deserialize Entry into a MessageImpl and read the message
publish time that match the requirements; if not match, iterate to another
round of read another message.

By the new design, after reading out the target Entry from BookKeeper, it
does not need to do the deserialize, but only need to read out the
“broker_timestamp” from the “raw Message metadata”.
As mentioned above, the frame stored in BookKeeper is as :
“[RAW_METADATA_MAGIC_NUMBER] [RAW_METADATA_SIZE] [RAW_METADATA]
[HEADERS_AND_PAYLOAD]”,

1. Still use `entry.getDataBuffer()` to get the ByteBuf out;
2. Read and check “[RAW_METADATA_MAGIC_NUMBER]” to see if “raw Message
metadata”feature is enabled; if not enabled, turn into old behaviour;
3. If enabled, read “ [RAW_METADATA_SIZE] “, then read “[RAW_METADATA] ”.
4. Use protobuf to  deserialize “[RAW_METADATA]”, and read
“broker_timestamp” out.

##### c)) read an entry from BookKeeper and send it to the consumer client

The main logic happened in method `sendMessages` in class
`org.apache.pulsar.broker.service.Consumer`, the change is similar to above
“seek by time”, after read BookKeeper Entry, we need to filter out
 "RAW_METADATA" related part, and only return “[HEADERS_AND_PAYLOAD]” to
consumer client.

### 3 Summary of changes that need

Here is a summary of above changes that related:

1. Add raw Message metadata protobuf

```protobuf
message RawMessageMetadata {
    optional uint64 broker_timestamp = 1;
}
```

2. change how produced message is saved in bookkeeper:
   `org.apache.pulsar.broker.service.Producer`

3. change how message is seek-by-time:

 
`org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages`

4. change how message send back to Consumer
   `org.apache.pulsar.broker.service.Consumer`

other changes:

1. A broker config to enable this raw Message metadata feature. e.g.
“useBrokerTimestampsForMessageSeek”.
2. besides change
`org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages`,
search other places,
which called `MessageImpl.deserialize(entry.getDataBuffer())` and
`msg.getPublishTime()`, such as `PersistentTopic.isOldestMessageExpired()`
to read message publish-time from new “raw Message metadata” instead of
deserialize the whole BookKeeper Entry into Message and call
`Message.getPublishTime()` if this feature enabled.


### 4. Compatibility, Deprecation and Migration Plan

For the first feature, since most of the changes are internally changes in
brokers, and it is a new feature which doesn’t change pulsar’s wire
protocol(between broker and client), and not changed public api. There is
no backward compatibility issue. It is a newly added feature. So there is
nothing to deprecate or migrate.

In the future, we will consider the compatibility when implementing the
secondary feature of “continuous message sequence-Id” since we should send
messages with “sequence_id” to consumers. In this case, we will take
different actions dependent on the data format, feature enable or not and
consumer client version . Data is new format if it contains
"[RAW_METADATA_MAGIC_NUMBER][RAW_METADATA_SIZE][RAW_METADATA]" and consumer
is a new version if it can process message contains  "[RAW_METADATA]".

### 5. Test Plan

* Unit tests for each individual change: broker/client worked well for
produce/consume.
* Integration tests or UT for end-to-end pipeline: multi producers set
message's time in random time order; seek-by-time could seek to the right
message.
* Load testing for ensuring performance: seek should be more quick.


-- 
Best,
Aloys.

Reply via email to