I think protobuf has the ability to check if a field is enabled. i.e. 
RAW_METADATA_MAGIC_NUMBER and RAW_METADATA_SIZE are included in the protobuf-ed 
struct. In Kafka, a magic number represents the version of protocol not if the 
feature is enabled. If we need a *real* magic number, we must make it clear.

On 2020/11/09 06:24:18, Aloys Zhang <l...@gmail.com> wrote: 
> Hi all,> 
> 
> We have drafted a proposal for supporting lightweight raw Message metadata> 
> which can be found at> 
> https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-raw-Message-metadata>
>  
>  and> 
> https://docs.google.com/document/d/1IgnF9AJzL6JG6G4EL_xcoQxvOpd7bUXcgxFApBiPOFY>
>  
> 
> Also, I copy it to the email thread for easier viewing.> 
> 
> Any suggestions or ideas are welcomed to join the discussion.> 
> 
> 
> 
> ## PIP-70:  Introduce lightweight raw Message metadata> 
> 
> ### 1. Motivation> 
> 
> For messages in Pulsar, If we want to add new property, we always change> 
> the `MessageMetadata` in protocol(PulsarApi.proto), this kind of property> 
> could be understood by both the broker side and client side by> 
> deserializing the `MessageMetadata` . But in some different cases,, the> 
> property needs to be added from the broker side, Or need to be understood> 
> by the broker side in a low cost way. When the broker side gets the message> 
> produced from the client,  we could add the property at a new area, which> 
> does not combine with `MessageMetadata`, and no need deserializing original> 
> `MessageMetadata` when gets it out ; and when the broker sends the message> 
> to client, we could choose to filter out this part of property(or not as> 
> the client needs). We call this kind of property “raw Message metadata”. By> 
> this way, the “raw Message metadata” consumption is independent, and not> 
> related with the original `MessageMetadata`.> 
> 
> The benefit for this kind of “raw Message metadata” is that the broker does> 
> not need to  serialize/deserialize for the protobuf-ed `MessageMetadata`,> 
> this will provide a better performance. And also could provide a lot of> 
> features that are not supported yet.> 
> 
> Here are some of the use cases for raw Message metadata:> 
> 1) Provide ordered messages by time(broker side) sequence to make message> 
> seek by time more accurate.> 
> Currently, each message has a `publish_time`, it uses client side time, but> 
> for different producers in different clients, the time may not align> 
> between clients, and cause the message order and the message time> 
> (`publish_time`) order may be different.  But each topic-partition only has> 
> one owner broker, if we append broker side time in the “raw Message> 
> metadata”, we could make sure the message order is aligned with broker side> 
> time. With this feature, we could handle the message seek by time more> 
> accurately.> 
> 
> 2) Provide continuous message sequence-Id for messages in one> 
> topic-partition.> 
> MessageId is a combination of ledgerId+entryId+batchIndex; for a partition> 
> that contains more than one ledger, the Ids inside is not continuous. By> 
> this solution, we could append a sequence-Id at the end of each Message.> 
> This will make the message sequence management earlier.> 
> 
> In this proposal, we will take count in the first feature “provide ordered> 
> message by time(broker side) sequence” mentioned above, this will be easier> 
> to go through the proposal.> 
> 
> ### 2. Message and “raw Message metadata” structure changes.> 
> 
> As mentioned above, there are 2 main benefits in this proposal:> 
> 
> 1. Most of all the change happened on the Broker side.> 
> 2. Avoid to serialize/deserialize for the protobuf-ed `MessageMetadata`.> 
> 
> #### 2.1 Raw Message metadata structure in Protobuf> 
> 
> Protobuf used a lot in Pulsar, we could use Protobuf to do the raw Message> 
> metadata serialize/deserialize.> 
> In this example, we will save the broker side timestamp when each message> 
> is sent from the broker to BookKeeper. So the definition is very simple.> 
> 
> ```protobuf> 
> message RawMessageMetadata {> 
>     optional uint64 broker_timestamp = 1;> 
>    }> 
> ```> 
> 
> #### 2.2 Message and “raw Message metadata” structure details> 
> 
> Each message is send from producer client to broker in this frame format:> 
> 
> ```> 
> [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE]> 
> [METADATA] [PAYLOAD]> 
> ```> 
> 
> The first 3 fields “[TOTAL_SIZE] [CMD_SIZE ] [CMD]” will be read in> 
> `LengthFieldBasedFrameDecoder`  and `PulsarDecoder`, and left the rest part> 
> handled in method> 
> `org.apache.pulsar.broker.service.Producer.publishMessage`. The left part> 
> “[MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD]” is usually> 
> treated as “headersAndPayload” in the code. As described above, we do not> 
> want this part to be changed at all, so we could take this part as a whole> 
> package.> 
> 
> ```> 
> [MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD] ==>> 
> [HEADERS_AND_PAYLOAD]> 
> ```> 
> 
> We need to add some fields to make ”Raw Message metadata” work well.> 
> 
> ```> 
> [RAW_METADATA_MAGIC_NUMBER] [RAW_METADATA_SIZE] [RAW_METADATA]> 
> [HEADERS_AND_PAYLOAD]> 
> ```> 
> 
> RAW_METADATA_MAGIC_NUMBER is used to identify whether this feature is on,> 
> and the message is handled by this feature.> 
> 
> RAW_METADATA_SIZE is added for this feature, and records the size of the> 
> serialised raw Message metadata content.> 
> 
> RAW_METADATA is the serialised raw Message metadata content.> 
> 
> “HEADERS_AND_PAYLOAD” is the original ByteBuf data that contains metadata> 
> and payload.> 
> 
> We put the new added fields before instead of after "HEADERS_AND_PAYLOAD"> 
>  here, this is because> 
> 
> - Firstly , the “raw Message metadata” is an addition for origin protocol> 
> and it can support this new feature without much modification for origin> 
> wire protocol, especially it does not need to serialize/deserialize for the> 
> protobuf-ed `MessageMetadata` build from producer.> 
> - Secondly, if we put new fields after "HEADERS_AND_PAYLOAD" , we can't> 
> know where the offset for new added fields since we don't know the the> 
> length of "PAYLOAD", and also, fields after "PAYLOAD" will change the CRC32> 
> checksum which means we need to recalculate the checksum one more time.> 
> 
> #### 2.3 Message and “raw Message metadata” lifecycle> 
> 
> The message process logic would be like this:> 
> 
> 1. Producer client send the original message to broker, and parsed by> 
> “PulsarDecoder”,  only “[HEADERS_AND_PAYLOAD]” left;> 
> 2. originally, “[HEADERS_AND_PAYLOAD]” will be write into BookKeeper by> 
> method `asyncAddEntry` in `ManagedLedger`; but now we first add above> 
> “[RAW_METADATA]” and related parts along with “[HEADERS_AND_PAYLOAD]”, then> 
> write the whole ByteBuf into BookKeeper by `asyncAddEntry` ; so the "raw> 
> Message metadata" is kept together with original message in BookKeeper.> 
> 3. When some features need to use this “raw Message metadata”, read the new> 
>  BookKeeper “Entry”, and get the raw Message metadata information to serve> 
> the new feature.> 
> 4. In this “provide ordered message by time(broker side) sequence” feature,> 
> when a seek-by-time operation passes into the broker, we readout> 
> “broker_timestamp” from “[RAW_METADATA]”.> 
> 5. After brokers read BookKeeper Entry out, and before send to the consumer> 
> client, we need to filter out the raw Message metadata related part, and> 
> only return “[HEADERS_AND_PAYLOAD]” part.> 
> 
> By this way, we don't need to  serialize/deserialize for the protobuf-ed> 
> `MessageMetadata`.> 
> 
> For more details, in these steps:> 
> 
> ##### a)) produced message handle and store> 
> 
> Producer client side message send to broker will be handled by method> 
> 
> ```> 
> publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long> 
> batchSize, boolean isChunked)> 
> ```> 
> 
> in class `org.apache.pulsar.broker.service.Producer`. And it will finally> 
> call into the method `asyncAddEntry` of `ManagedLedger`, it will write the> 
> passed in serialized message as an Entry stored in BookKeeper.> 
> 
> ```> 
> void asyncAddEntry(ByteBuf headersAndPayload, AddEntryCallback callback,> 
> Object ctx)> 
> ```> 
> 
> For raw Message metadata content,  we need to put the current broker> 
> timestamp in `message RawMessageMetadata` and use protobuf to serialize it> 
> into `ByteBuf`.  Then we put the size and content of serialized `ByteBuf`> 
> along with original `ByteBuf headersAndPayload`.> 
> 
> Then “[HEADERS_AND_PAYLOAD]” turned into “[RAW_METADATA_MAGIC_NUMBER]> 
> [RAW_METADATA_SIZE] [RAW_METADATA] [HEADERS_AND_PAYLOAD]” and stored in> 
> BookKeeper.> 
> 
> ##### b)) seek by time happens> 
> 
> Originally, when a seek_by_time comes in, it will finally calls into class> 
>  
> `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages`>
>  
> and the original logic is like this> 
> 
> ```java> 
> public void findMessages(final long timestamp,> 
> AsyncCallbacks.FindEntryCallback callback) {> 
>        …> 
> 
> cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,>
>  
> entry -> {> 
>             MessageImpl msg = null;> 
>             msg = MessageImpl.deserialize(entry.getDataBuffer());  < ===> 
> deserialize> 
>             return msg.getPublishTime() < timestamp;> 
>         }, this, callback);> 
>       …> 
> ```> 
> 
> Find message through the topic-partition, and read out the target Entry> 
> from BookKeeper, deserialize Entry into a MessageImpl and read the message> 
> publish time that match the requirements; if not match, iterate to another> 
> round of read another message.> 
> 
> By the new design, after reading out the target Entry from BookKeeper, it> 
> does not need to do the deserialize, but only need to read out the> 
> “broker_timestamp” from the “raw Message metadata”.> 
> As mentioned above, the frame stored in BookKeeper is as :> 
> “[RAW_METADATA_MAGIC_NUMBER] [RAW_METADATA_SIZE] [RAW_METADATA]> 
> [HEADERS_AND_PAYLOAD]”,> 
> 
> 1. Still use `entry.getDataBuffer()` to get the ByteBuf out;> 
> 2. Read and check “[RAW_METADATA_MAGIC_NUMBER]” to see if “raw Message> 
> metadata”feature is enabled; if not enabled, turn into old behaviour;> 
> 3. If enabled, read “ [RAW_METADATA_SIZE] “, then read “[RAW_METADATA] ”.> 
> 4. Use protobuf to  deserialize “[RAW_METADATA]”, and read> 
> “broker_timestamp” out.> 
> 
> ##### c)) read an entry from BookKeeper and send it to the consumer client> 
> 
> The main logic happened in method `sendMessages` in class> 
> `org.apache.pulsar.broker.service.Consumer`, the change is similar to above> 
> “seek by time”, after read BookKeeper Entry, we need to filter out> 
>  "RAW_METADATA" related part, and only return “[HEADERS_AND_PAYLOAD]” to> 
> consumer client.> 
> 
> ### 3 Summary of changes that need> 
> 
> Here is a summary of above changes that related:> 
> 
> 1. Add raw Message metadata protobuf> 
> 
> ```protobuf> 
> message RawMessageMetadata {> 
>     optional uint64 broker_timestamp = 1;> 
> }> 
> ```> 
> 
> 2. change how produced message is saved in bookkeeper:> 
>    `org.apache.pulsar.broker.service.Producer`> 
> 
> 3. change how message is seek-by-time:> 
> 
>  
> `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages`>
>  
> 
> 4. change how message send back to Consumer> 
>    `org.apache.pulsar.broker.service.Consumer`> 
> 
> other changes:> 
> 
> 1. A broker config to enable this raw Message metadata feature. e.g.> 
> “useBrokerTimestampsForMessageSeek”.> 
> 2. besides change> 
> `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages`,>
>  
> search other places,> 
> which called `MessageImpl.deserialize(entry.getDataBuffer())` and> 
> `msg.getPublishTime()`, such as `PersistentTopic.isOldestMessageExpired()`> 
> to read message publish-time from new “raw Message metadata” instead of> 
> deserialize the whole BookKeeper Entry into Message and call> 
> `Message.getPublishTime()` if this feature enabled.> 
> 
> 
> ### 4. Compatibility, Deprecation and Migration Plan> 
> 
> For the first feature, since most of the changes are internally changes in> 
> brokers, and it is a new feature which doesn’t change pulsar’s wire> 
> protocol(between broker and client), and not changed public api. There is> 
> no backward compatibility issue. It is a newly added feature. So there is> 
> nothing to deprecate or migrate.> 
> 
> In the future, we will consider the compatibility when implementing the> 
> secondary feature of “continuous message sequence-Id” since we should send> 
> messages with “sequence_id” to consumers. In this case, we will take> 
> different actions dependent on the data format, feature enable or not and> 
> consumer client version . Data is new format if it contains> 
> "[RAW_METADATA_MAGIC_NUMBER][RAW_METADATA_SIZE][RAW_METADATA]" and consumer> 
> is a new version if it can process message contains  "[RAW_METADATA]".> 
> 
> ### 5. Test Plan> 
> 
> * Unit tests for each individual change: broker/client worked well for> 
> produce/consume.> 
> * Integration tests or UT for end-to-end pipeline: multi producers set> 
> message's time in random time order; seek-by-time could seek to the right> 
> message.> 
> * Load testing for ensuring performance: seek should be more quick.> 
> 
> 
> -- > 
> Best,> 
> Aloys.> 
> 

Reply via email to