Hi all, We have drafted a proposal for supporting lightweight raw Message metadata which can be found at https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-raw-Message-metadata and https://docs.google.com/document/d/1IgnF9AJzL6JG6G4EL_xcoQxvOpd7bUXcgxFApBiPOFY
Also, I copy it to the email thread for easier viewing. Any suggestions or ideas are welcomed to join the discussion. ## PIP-70: Introduce lightweight raw Message metadata ### 1. Motivation For messages in Pulsar, If we want to add new property, we always change the `MessageMetadata` in protocol(PulsarApi.proto), this kind of property could be understood by both the broker side and client side by deserializing the `MessageMetadata` . But in some different cases,, the property needs to be added from the broker side, Or need to be understood by the broker side in a low cost way. When the broker side gets the message produced from the client, we could add the property at a new area, which does not combine with `MessageMetadata`, and no need deserializing original `MessageMetadata` when gets it out ; and when the broker sends the message to client, we could choose to filter out this part of property(or not as the client needs). We call this kind of property “raw Message metadata”. By this way, the “raw Message metadata” consumption is independent, and not related with the original `MessageMetadata`. The benefit for this kind of “raw Message metadata” is that the broker does not need to serialize/deserialize for the protobuf-ed `MessageMetadata`, this will provide a better performance. And also could provide a lot of features that are not supported yet. Here are some of the use cases for raw Message metadata: 1) Provide ordered messages by time(broker side) sequence to make message seek by time more accurate. Currently, each message has a `publish_time`, it uses client side time, but for different producers in different clients, the time may not align between clients, and cause the message order and the message time (`publish_time`) order may be different. But each topic-partition only has one owner broker, if we append broker side time in the “raw Message metadata”, we could make sure the message order is aligned with broker side time. With this feature, we could handle the message seek by time more accurately. 2) Provide continuous message sequence-Id for messages in one topic-partition. MessageId is a combination of ledgerId+entryId+batchIndex; for a partition that contains more than one ledger, the Ids inside is not continuous. By this solution, we could append a sequence-Id at the end of each Message. This will make the message sequence management earlier. In this proposal, we will take count in the first feature “provide ordered message by time(broker side) sequence” mentioned above, this will be easier to go through the proposal. ### 2. Message and “raw Message metadata” structure changes. As mentioned above, there are 2 main benefits in this proposal: 1. Most of all the change happened on the Broker side. 2. Avoid to serialize/deserialize for the protobuf-ed `MessageMetadata`. #### 2.1 Raw Message metadata structure in Protobuf Protobuf used a lot in Pulsar, we could use Protobuf to do the raw Message metadata serialize/deserialize. In this example, we will save the broker side timestamp when each message is sent from the broker to BookKeeper. So the definition is very simple. ```protobuf message RawMessageMetadata { optional uint64 broker_timestamp = 1; } ``` #### 2.2 Message and “raw Message metadata” structure details Each message is send from producer client to broker in this frame format: ``` [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD] ``` The first 3 fields “[TOTAL_SIZE] [CMD_SIZE ] [CMD]” will be read in `LengthFieldBasedFrameDecoder` and `PulsarDecoder`, and left the rest part handled in method `org.apache.pulsar.broker.service.Producer.publishMessage`. The left part “[MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD]” is usually treated as “headersAndPayload” in the code. As described above, we do not want this part to be changed at all, so we could take this part as a whole package. ``` [MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD] ==> [HEADERS_AND_PAYLOAD] ``` We need to add some fields to make ”Raw Message metadata” work well. ``` [RAW_METADATA_MAGIC_NUMBER] [RAW_METADATA_SIZE] [RAW_METADATA] [HEADERS_AND_PAYLOAD] ``` RAW_METADATA_MAGIC_NUMBER is used to identify whether this feature is on, and the message is handled by this feature. RAW_METADATA_SIZE is added for this feature, and records the size of the serialised raw Message metadata content. RAW_METADATA is the serialised raw Message metadata content. “HEADERS_AND_PAYLOAD” is the original ByteBuf data that contains metadata and payload. We put the new added fields before instead of after "HEADERS_AND_PAYLOAD" here, this is because - Firstly , the “raw Message metadata” is an addition for origin protocol and it can support this new feature without much modification for origin wire protocol, especially it does not need to serialize/deserialize for the protobuf-ed `MessageMetadata` build from producer. - Secondly, if we put new fields after "HEADERS_AND_PAYLOAD" , we can't know where the offset for new added fields since we don't know the the length of "PAYLOAD", and also, fields after "PAYLOAD" will change the CRC32 checksum which means we need to recalculate the checksum one more time. #### 2.3 Message and “raw Message metadata” lifecycle The message process logic would be like this: 1. Producer client send the original message to broker, and parsed by “PulsarDecoder”, only “[HEADERS_AND_PAYLOAD]” left; 2. originally, “[HEADERS_AND_PAYLOAD]” will be write into BookKeeper by method `asyncAddEntry` in `ManagedLedger`; but now we first add above “[RAW_METADATA]” and related parts along with “[HEADERS_AND_PAYLOAD]”, then write the whole ByteBuf into BookKeeper by `asyncAddEntry` ; so the "raw Message metadata" is kept together with original message in BookKeeper. 3. When some features need to use this “raw Message metadata”, read the new BookKeeper “Entry”, and get the raw Message metadata information to serve the new feature. 4. In this “provide ordered message by time(broker side) sequence” feature, when a seek-by-time operation passes into the broker, we readout “broker_timestamp” from “[RAW_METADATA]”. 5. After brokers read BookKeeper Entry out, and before send to the consumer client, we need to filter out the raw Message metadata related part, and only return “[HEADERS_AND_PAYLOAD]” part. By this way, we don't need to serialize/deserialize for the protobuf-ed `MessageMetadata`. For more details, in these steps: ##### a)) produced message handle and store Producer client side message send to broker will be handled by method ``` publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked) ``` in class `org.apache.pulsar.broker.service.Producer`. And it will finally call into the method `asyncAddEntry` of `ManagedLedger`, it will write the passed in serialized message as an Entry stored in BookKeeper. ``` void asyncAddEntry(ByteBuf headersAndPayload, AddEntryCallback callback, Object ctx) ``` For raw Message metadata content, we need to put the current broker timestamp in `message RawMessageMetadata` and use protobuf to serialize it into `ByteBuf`. Then we put the size and content of serialized `ByteBuf` along with original `ByteBuf headersAndPayload`. Then “[HEADERS_AND_PAYLOAD]” turned into “[RAW_METADATA_MAGIC_NUMBER] [RAW_METADATA_SIZE] [RAW_METADATA] [HEADERS_AND_PAYLOAD]” and stored in BookKeeper. ##### b)) seek by time happens Originally, when a seek_by_time comes in, it will finally calls into class `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages` and the original logic is like this ```java public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback callback) { … cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> { MessageImpl msg = null; msg = MessageImpl.deserialize(entry.getDataBuffer()); < === deserialize return msg.getPublishTime() < timestamp; }, this, callback); … ``` Find message through the topic-partition, and read out the target Entry from BookKeeper, deserialize Entry into a MessageImpl and read the message publish time that match the requirements; if not match, iterate to another round of read another message. By the new design, after reading out the target Entry from BookKeeper, it does not need to do the deserialize, but only need to read out the “broker_timestamp” from the “raw Message metadata”. As mentioned above, the frame stored in BookKeeper is as : “[RAW_METADATA_MAGIC_NUMBER] [RAW_METADATA_SIZE] [RAW_METADATA] [HEADERS_AND_PAYLOAD]”, 1. Still use `entry.getDataBuffer()` to get the ByteBuf out; 2. Read and check “[RAW_METADATA_MAGIC_NUMBER]” to see if “raw Message metadata”feature is enabled; if not enabled, turn into old behaviour; 3. If enabled, read “ [RAW_METADATA_SIZE] “, then read “[RAW_METADATA] ”. 4. Use protobuf to deserialize “[RAW_METADATA]”, and read “broker_timestamp” out. ##### c)) read an entry from BookKeeper and send it to the consumer client The main logic happened in method `sendMessages` in class `org.apache.pulsar.broker.service.Consumer`, the change is similar to above “seek by time”, after read BookKeeper Entry, we need to filter out "RAW_METADATA" related part, and only return “[HEADERS_AND_PAYLOAD]” to consumer client. ### 3 Summary of changes that need Here is a summary of above changes that related: 1. Add raw Message metadata protobuf ```protobuf message RawMessageMetadata { optional uint64 broker_timestamp = 1; } ``` 2. change how produced message is saved in bookkeeper: `org.apache.pulsar.broker.service.Producer` 3. change how message is seek-by-time: `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages` 4. change how message send back to Consumer `org.apache.pulsar.broker.service.Consumer` other changes: 1. A broker config to enable this raw Message metadata feature. e.g. “useBrokerTimestampsForMessageSeek”. 2. besides change `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages`, search other places, which called `MessageImpl.deserialize(entry.getDataBuffer())` and `msg.getPublishTime()`, such as `PersistentTopic.isOldestMessageExpired()` to read message publish-time from new “raw Message metadata” instead of deserialize the whole BookKeeper Entry into Message and call `Message.getPublishTime()` if this feature enabled. ### 4. Compatibility, Deprecation and Migration Plan For the first feature, since most of the changes are internally changes in brokers, and it is a new feature which doesn’t change pulsar’s wire protocol(between broker and client), and not changed public api. There is no backward compatibility issue. It is a newly added feature. So there is nothing to deprecate or migrate. In the future, we will consider the compatibility when implementing the secondary feature of “continuous message sequence-Id” since we should send messages with “sequence_id” to consumers. In this case, we will take different actions dependent on the data format, feature enable or not and consumer client version . Data is new format if it contains "[RAW_METADATA_MAGIC_NUMBER][RAW_METADATA_SIZE][RAW_METADATA]" and consumer is a new version if it can process message contains "[RAW_METADATA]". ### 5. Test Plan * Unit tests for each individual change: broker/client worked well for produce/consume. * Integration tests or UT for end-to-end pipeline: multi producers set message's time in random time order; seek-by-time could seek to the right message. * Load testing for ensuring performance: seek should be more quick. -- Best, Aloys.