Matteo, Improving Protobuf serialization and deserialization can be a separate PIP because it doesn't solve the problems that this PIP tries to solve.
This PIP probably doesn't state the problem clearly. "Raw Metadata" is probably an inaccurate name. It should probably be called "Broker Metadata" which can be used for distinguishing the existing metadata that is generated at the client-side. The "Broker Metadata" is used for storing all the information generated by brokers, such as broker-side publish (append) timestamp, monotonically increasing log index, and etc. The main reason we don't re-use existing "metadata" for this purpose is not serialization & deserialization concerns. It is more about checksum concern. Because the current metadata section is generated and the corresponding checksum is set by the clients. If we want to mutate the metadata, we have to re-generate the checksum. This would hugely impact performance. Introducing a "Broker Metadata" section can avoid re-generating checksum for a message batch. Also, it separates the client-generated metadata from broker-generated metadata, avoiding any mistakes in touching client-generated metadata. Hope this clarifies the problems that this PIP tries to solve. Aloys, If you agree with my comments above, I think we should rename "Raw Metadata" to "Broker Metadata" to make it clear to avoid any confusion. Thanks, Sijie On Wed, Nov 18, 2020 at 10:47 AM Matteo Merli <matteo.me...@gmail.com> wrote: > The goal of this proposal can be achieved automatically by using a > better ser/de generator that doesn't have overhead for ignored fields. > > I'm preparing a revamping of the current Protobuf serialization and > I'll send a proposal soon. > > Matteo > > > -- > Matteo Merli > <matteo.me...@gmail.com> > > On Sun, Nov 8, 2020 at 10:25 PM Aloys Zhang <lofterzh...@gmail.com> wrote: > > > > Hi all, > > > > We have drafted a proposal for supporting lightweight raw Message > metadata > > which can be found at > > > https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-raw-Message-metadata > > and > > > https://docs.google.com/document/d/1IgnF9AJzL6JG6G4EL_xcoQxvOpd7bUXcgxFApBiPOFY > > > > Also, I copy it to the email thread for easier viewing. > > > > Any suggestions or ideas are welcomed to join the discussion. > > > > > > > > ## PIP-70: Introduce lightweight raw Message metadata > > > > ### 1. Motivation > > > > For messages in Pulsar, If we want to add new property, we always change > > the `MessageMetadata` in protocol(PulsarApi.proto), this kind of property > > could be understood by both the broker side and client side by > > deserializing the `MessageMetadata` . But in some different cases,, the > > property needs to be added from the broker side, Or need to be understood > > by the broker side in a low cost way. When the broker side gets the > message > > produced from the client, we could add the property at a new area, which > > does not combine with `MessageMetadata`, and no need deserializing > original > > `MessageMetadata` when gets it out ; and when the broker sends the > message > > to client, we could choose to filter out this part of property(or not as > > the client needs). We call this kind of property “raw Message metadata”. > By > > this way, the “raw Message metadata” consumption is independent, and not > > related with the original `MessageMetadata`. > > > > The benefit for this kind of “raw Message metadata” is that the broker > does > > not need to serialize/deserialize for the protobuf-ed `MessageMetadata`, > > this will provide a better performance. And also could provide a lot of > > features that are not supported yet. > > > > Here are some of the use cases for raw Message metadata: > > 1) Provide ordered messages by time(broker side) sequence to make message > > seek by time more accurate. > > Currently, each message has a `publish_time`, it uses client side time, > but > > for different producers in different clients, the time may not align > > between clients, and cause the message order and the message time > > (`publish_time`) order may be different. But each topic-partition only > has > > one owner broker, if we append broker side time in the “raw Message > > metadata”, we could make sure the message order is aligned with broker > side > > time. With this feature, we could handle the message seek by time more > > accurately. > > > > 2) Provide continuous message sequence-Id for messages in one > > topic-partition. > > MessageId is a combination of ledgerId+entryId+batchIndex; for a > partition > > that contains more than one ledger, the Ids inside is not continuous. By > > this solution, we could append a sequence-Id at the end of each Message. > > This will make the message sequence management earlier. > > > > In this proposal, we will take count in the first feature “provide > ordered > > message by time(broker side) sequence” mentioned above, this will be > easier > > to go through the proposal. > > > > ### 2. Message and “raw Message metadata” structure changes. > > > > As mentioned above, there are 2 main benefits in this proposal: > > > > 1. Most of all the change happened on the Broker side. > > 2. Avoid to serialize/deserialize for the protobuf-ed `MessageMetadata`. > > > > #### 2.1 Raw Message metadata structure in Protobuf > > > > Protobuf used a lot in Pulsar, we could use Protobuf to do the raw > Message > > metadata serialize/deserialize. > > In this example, we will save the broker side timestamp when each message > > is sent from the broker to BookKeeper. So the definition is very simple. > > > > ```protobuf > > message RawMessageMetadata { > > optional uint64 broker_timestamp = 1; > > } > > ``` > > > > #### 2.2 Message and “raw Message metadata” structure details > > > > Each message is send from producer client to broker in this frame format: > > > > ``` > > [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] > > [METADATA] [PAYLOAD] > > ``` > > > > The first 3 fields “[TOTAL_SIZE] [CMD_SIZE ] [CMD]” will be read in > > `LengthFieldBasedFrameDecoder` and `PulsarDecoder`, and left the rest > part > > handled in method > > `org.apache.pulsar.broker.service.Producer.publishMessage`. The left part > > “[MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD]” is > usually > > treated as “headersAndPayload” in the code. As described above, we do not > > want this part to be changed at all, so we could take this part as a > whole > > package. > > > > ``` > > [MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD] ==> > > [HEADERS_AND_PAYLOAD] > > ``` > > > > We need to add some fields to make ”Raw Message metadata” work well. > > > > ``` > > [RAW_METADATA_MAGIC_NUMBER] [RAW_METADATA_SIZE] [RAW_METADATA] > > [HEADERS_AND_PAYLOAD] > > ``` > > > > RAW_METADATA_MAGIC_NUMBER is used to identify whether this feature is on, > > and the message is handled by this feature. > > > > RAW_METADATA_SIZE is added for this feature, and records the size of the > > serialised raw Message metadata content. > > > > RAW_METADATA is the serialised raw Message metadata content. > > > > “HEADERS_AND_PAYLOAD” is the original ByteBuf data that contains metadata > > and payload. > > > > We put the new added fields before instead of after "HEADERS_AND_PAYLOAD" > > here, this is because > > > > - Firstly , the “raw Message metadata” is an addition for origin protocol > > and it can support this new feature without much modification for origin > > wire protocol, especially it does not need to serialize/deserialize for > the > > protobuf-ed `MessageMetadata` build from producer. > > - Secondly, if we put new fields after "HEADERS_AND_PAYLOAD" , we can't > > know where the offset for new added fields since we don't know the the > > length of "PAYLOAD", and also, fields after "PAYLOAD" will change the > CRC32 > > checksum which means we need to recalculate the checksum one more time. > > > > #### 2.3 Message and “raw Message metadata” lifecycle > > > > The message process logic would be like this: > > > > 1. Producer client send the original message to broker, and parsed by > > “PulsarDecoder”, only “[HEADERS_AND_PAYLOAD]” left; > > 2. originally, “[HEADERS_AND_PAYLOAD]” will be write into BookKeeper by > > method `asyncAddEntry` in `ManagedLedger`; but now we first add above > > “[RAW_METADATA]” and related parts along with “[HEADERS_AND_PAYLOAD]”, > then > > write the whole ByteBuf into BookKeeper by `asyncAddEntry` ; so the "raw > > Message metadata" is kept together with original message in BookKeeper. > > 3. When some features need to use this “raw Message metadata”, read the > new > > BookKeeper “Entry”, and get the raw Message metadata information to > serve > > the new feature. > > 4. In this “provide ordered message by time(broker side) sequence” > feature, > > when a seek-by-time operation passes into the broker, we readout > > “broker_timestamp” from “[RAW_METADATA]”. > > 5. After brokers read BookKeeper Entry out, and before send to the > consumer > > client, we need to filter out the raw Message metadata related part, and > > only return “[HEADERS_AND_PAYLOAD]” part. > > > > By this way, we don't need to serialize/deserialize for the protobuf-ed > > `MessageMetadata`. > > > > For more details, in these steps: > > > > ##### a)) produced message handle and store > > > > Producer client side message send to broker will be handled by method > > > > ``` > > publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long > > batchSize, boolean isChunked) > > ``` > > > > in class `org.apache.pulsar.broker.service.Producer`. And it will finally > > call into the method `asyncAddEntry` of `ManagedLedger`, it will write > the > > passed in serialized message as an Entry stored in BookKeeper. > > > > ``` > > void asyncAddEntry(ByteBuf headersAndPayload, AddEntryCallback callback, > > Object ctx) > > ``` > > > > For raw Message metadata content, we need to put the current broker > > timestamp in `message RawMessageMetadata` and use protobuf to serialize > it > > into `ByteBuf`. Then we put the size and content of serialized `ByteBuf` > > along with original `ByteBuf headersAndPayload`. > > > > Then “[HEADERS_AND_PAYLOAD]” turned into “[RAW_METADATA_MAGIC_NUMBER] > > [RAW_METADATA_SIZE] [RAW_METADATA] [HEADERS_AND_PAYLOAD]” and stored in > > BookKeeper. > > > > ##### b)) seek by time happens > > > > Originally, when a seek_by_time comes in, it will finally calls into > class > > > `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages` > > and the original logic is like this > > > > ```java > > public void findMessages(final long timestamp, > > AsyncCallbacks.FindEntryCallback callback) { > > … > > > > > cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, > > entry -> { > > MessageImpl msg = null; > > msg = MessageImpl.deserialize(entry.getDataBuffer()); < === > > deserialize > > return msg.getPublishTime() < timestamp; > > }, this, callback); > > … > > ``` > > > > Find message through the topic-partition, and read out the target Entry > > from BookKeeper, deserialize Entry into a MessageImpl and read the > message > > publish time that match the requirements; if not match, iterate to > another > > round of read another message. > > > > By the new design, after reading out the target Entry from BookKeeper, it > > does not need to do the deserialize, but only need to read out the > > “broker_timestamp” from the “raw Message metadata”. > > As mentioned above, the frame stored in BookKeeper is as : > > “[RAW_METADATA_MAGIC_NUMBER] [RAW_METADATA_SIZE] [RAW_METADATA] > > [HEADERS_AND_PAYLOAD]”, > > > > 1. Still use `entry.getDataBuffer()` to get the ByteBuf out; > > 2. Read and check “[RAW_METADATA_MAGIC_NUMBER]” to see if “raw Message > > metadata”feature is enabled; if not enabled, turn into old behaviour; > > 3. If enabled, read “ [RAW_METADATA_SIZE] “, then read “[RAW_METADATA] ”. > > 4. Use protobuf to deserialize “[RAW_METADATA]”, and read > > “broker_timestamp” out. > > > > ##### c)) read an entry from BookKeeper and send it to the consumer > client > > > > The main logic happened in method `sendMessages` in class > > `org.apache.pulsar.broker.service.Consumer`, the change is similar to > above > > “seek by time”, after read BookKeeper Entry, we need to filter out > > "RAW_METADATA" related part, and only return “[HEADERS_AND_PAYLOAD]” to > > consumer client. > > > > ### 3 Summary of changes that need > > > > Here is a summary of above changes that related: > > > > 1. Add raw Message metadata protobuf > > > > ```protobuf > > message RawMessageMetadata { > > optional uint64 broker_timestamp = 1; > > } > > ``` > > > > 2. change how produced message is saved in bookkeeper: > > `org.apache.pulsar.broker.service.Producer` > > > > 3. change how message is seek-by-time: > > > > > `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages` > > > > 4. change how message send back to Consumer > > `org.apache.pulsar.broker.service.Consumer` > > > > other changes: > > > > 1. A broker config to enable this raw Message metadata feature. e.g. > > “useBrokerTimestampsForMessageSeek”. > > 2. besides change > > > `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages`, > > search other places, > > which called `MessageImpl.deserialize(entry.getDataBuffer())` and > > `msg.getPublishTime()`, such as > `PersistentTopic.isOldestMessageExpired()` > > to read message publish-time from new “raw Message metadata” instead of > > deserialize the whole BookKeeper Entry into Message and call > > `Message.getPublishTime()` if this feature enabled. > > > > > > ### 4. Compatibility, Deprecation and Migration Plan > > > > For the first feature, since most of the changes are internally changes > in > > brokers, and it is a new feature which doesn’t change pulsar’s wire > > protocol(between broker and client), and not changed public api. There is > > no backward compatibility issue. It is a newly added feature. So there is > > nothing to deprecate or migrate. > > > > In the future, we will consider the compatibility when implementing the > > secondary feature of “continuous message sequence-Id” since we should > send > > messages with “sequence_id” to consumers. In this case, we will take > > different actions dependent on the data format, feature enable or not and > > consumer client version . Data is new format if it contains > > "[RAW_METADATA_MAGIC_NUMBER][RAW_METADATA_SIZE][RAW_METADATA]" and > consumer > > is a new version if it can process message contains "[RAW_METADATA]". > > > > ### 5. Test Plan > > > > * Unit tests for each individual change: broker/client worked well for > > produce/consume. > > * Integration tests or UT for end-to-end pipeline: multi producers set > > message's time in random time order; seek-by-time could seek to the right > > message. > > * Load testing for ensuring performance: seek should be more quick. > > > > > > -- > > Best, > > Aloys. >