Yunze, Thanks for you suggestion. Protobuf does have the ability to check whether a field is existed. But if we want to use this ability, we should get a Protobuf object first. In this proposal, RAW_METADATA_MAGIC_NUMBER is used to indicate what type object we can get from the bytebuf of Entry. If RAW_METADATA_MAGIC_NUMBER exists, the bytebuf head will be parsed as RAW_METADATA first, otherwise, bytebuf will parsed as origin data format without RAW_METADATA.
Yunze Xu <y...@streamnative.io> 于2020年11月16日周一 上午10:08写道: > I think protobuf has the ability to check if a field is enabled. i.e. > RAW_METADATA_MAGIC_NUMBER and RAW_METADATA_SIZE are included in the > protobuf-ed struct. In Kafka, a magic number represents the version of > protocol not if the feature is enabled. If we need a *real* magic number, > we must make it clear. > > On 2020/11/09 06:24:18, Aloys Zhang <l...@gmail.com> wrote: > > Hi all,> > > > > We have drafted a proposal for supporting lightweight raw Message > metadata> > > which can be found at> > > > https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-raw-Message-metadata> > > > and> > > > https://docs.google.com/document/d/1IgnF9AJzL6JG6G4EL_xcoQxvOpd7bUXcgxFApBiPOFY> > > > > > Also, I copy it to the email thread for easier viewing.> > > > > Any suggestions or ideas are welcomed to join the discussion.> > > > > > > > > ## PIP-70: Introduce lightweight raw Message metadata> > > > > ### 1. Motivation> > > > > For messages in Pulsar, If we want to add new property, we always > change> > > the `MessageMetadata` in protocol(PulsarApi.proto), this kind of > property> > > could be understood by both the broker side and client side by> > > deserializing the `MessageMetadata` . But in some different cases,, the> > > property needs to be added from the broker side, Or need to be > understood> > > by the broker side in a low cost way. When the broker side gets the > message> > > produced from the client, we could add the property at a new area, > which> > > does not combine with `MessageMetadata`, and no need deserializing > original> > > `MessageMetadata` when gets it out ; and when the broker sends the > message> > > to client, we could choose to filter out this part of property(or not > as> > > the client needs). We call this kind of property “raw Message metadata”. > By> > > this way, the “raw Message metadata” consumption is independent, and > not> > > related with the original `MessageMetadata`.> > > > > The benefit for this kind of “raw Message metadata” is that the broker > does> > > not need to serialize/deserialize for the protobuf-ed > `MessageMetadata`,> > > this will provide a better performance. And also could provide a lot of> > > features that are not supported yet.> > > > > Here are some of the use cases for raw Message metadata:> > > 1) Provide ordered messages by time(broker side) sequence to make > message> > > seek by time more accurate.> > > Currently, each message has a `publish_time`, it uses client side time, > but> > > for different producers in different clients, the time may not align> > > between clients, and cause the message order and the message time> > > (`publish_time`) order may be different. But each topic-partition only > has> > > one owner broker, if we append broker side time in the “raw Message> > > metadata”, we could make sure the message order is aligned with broker > side> > > time. With this feature, we could handle the message seek by time more> > > accurately.> > > > > 2) Provide continuous message sequence-Id for messages in one> > > topic-partition.> > > MessageId is a combination of ledgerId+entryId+batchIndex; for a > partition> > > that contains more than one ledger, the Ids inside is not continuous. > By> > > this solution, we could append a sequence-Id at the end of each > Message.> > > This will make the message sequence management earlier.> > > > > In this proposal, we will take count in the first feature “provide > ordered> > > message by time(broker side) sequence” mentioned above, this will be > easier> > > to go through the proposal.> > > > > ### 2. Message and “raw Message metadata” structure changes.> > > > > As mentioned above, there are 2 main benefits in this proposal:> > > > > 1. Most of all the change happened on the Broker side.> > > 2. Avoid to serialize/deserialize for the protobuf-ed > `MessageMetadata`.> > > > > #### 2.1 Raw Message metadata structure in Protobuf> > > > > Protobuf used a lot in Pulsar, we could use Protobuf to do the raw > Message> > > metadata serialize/deserialize.> > > In this example, we will save the broker side timestamp when each > message> > > is sent from the broker to BookKeeper. So the definition is very > simple.> > > > > ```protobuf> > > message RawMessageMetadata {> > > optional uint64 broker_timestamp = 1;> > > }> > > ```> > > > > #### 2.2 Message and “raw Message metadata” structure details> > > > > Each message is send from producer client to broker in this frame > format:> > > > > ```> > > [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE]> > > [METADATA] [PAYLOAD]> > > ```> > > > > The first 3 fields “[TOTAL_SIZE] [CMD_SIZE ] [CMD]” will be read in> > > `LengthFieldBasedFrameDecoder` and `PulsarDecoder`, and left the rest > part> > > handled in method> > > `org.apache.pulsar.broker.service.Producer.publishMessage`. The left > part> > > “[MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD]” is > usually> > > treated as “headersAndPayload” in the code. As described above, we do > not> > > want this part to be changed at all, so we could take this part as a > whole> > > package.> > > > > ```> > > [MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD] ==>> > > [HEADERS_AND_PAYLOAD]> > > ```> > > > > We need to add some fields to make ”Raw Message metadata” work well.> > > > > ```> > > [RAW_METADATA_MAGIC_NUMBER] [RAW_METADATA_SIZE] [RAW_METADATA]> > > [HEADERS_AND_PAYLOAD]> > > ```> > > > > RAW_METADATA_MAGIC_NUMBER is used to identify whether this feature is > on,> > > and the message is handled by this feature.> > > > > RAW_METADATA_SIZE is added for this feature, and records the size of > the> > > serialised raw Message metadata content.> > > > > RAW_METADATA is the serialised raw Message metadata content.> > > > > “HEADERS_AND_PAYLOAD” is the original ByteBuf data that contains > metadata> > > and payload.> > > > > We put the new added fields before instead of after > "HEADERS_AND_PAYLOAD"> > > here, this is because> > > > > - Firstly , the “raw Message metadata” is an addition for origin > protocol> > > and it can support this new feature without much modification for > origin> > > wire protocol, especially it does not need to serialize/deserialize for > the> > > protobuf-ed `MessageMetadata` build from producer.> > > - Secondly, if we put new fields after "HEADERS_AND_PAYLOAD" , we can't> > > know where the offset for new added fields since we don't know the the> > > length of "PAYLOAD", and also, fields after "PAYLOAD" will change the > CRC32> > > checksum which means we need to recalculate the checksum one more time.> > > > > #### 2.3 Message and “raw Message metadata” lifecycle> > > > > The message process logic would be like this:> > > > > 1. Producer client send the original message to broker, and parsed by> > > “PulsarDecoder”, only “[HEADERS_AND_PAYLOAD]” left;> > > 2. originally, “[HEADERS_AND_PAYLOAD]” will be write into BookKeeper by> > > method `asyncAddEntry` in `ManagedLedger`; but now we first add above> > > “[RAW_METADATA]” and related parts along with “[HEADERS_AND_PAYLOAD]”, > then> > > write the whole ByteBuf into BookKeeper by `asyncAddEntry` ; so the > "raw> > > Message metadata" is kept together with original message in BookKeeper.> > > 3. When some features need to use this “raw Message metadata”, read the > new> > > BookKeeper “Entry”, and get the raw Message metadata information to > serve> > > the new feature.> > > 4. In this “provide ordered message by time(broker side) sequence” > feature,> > > when a seek-by-time operation passes into the broker, we readout> > > “broker_timestamp” from “[RAW_METADATA]”.> > > 5. After brokers read BookKeeper Entry out, and before send to the > consumer> > > client, we need to filter out the raw Message metadata related part, > and> > > only return “[HEADERS_AND_PAYLOAD]” part.> > > > > By this way, we don't need to serialize/deserialize for the > protobuf-ed> > > `MessageMetadata`.> > > > > For more details, in these steps:> > > > > ##### a)) produced message handle and store> > > > > Producer client side message send to broker will be handled by method> > > > > ```> > > publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long> > > batchSize, boolean isChunked)> > > ```> > > > > in class `org.apache.pulsar.broker.service.Producer`. And it will > finally> > > call into the method `asyncAddEntry` of `ManagedLedger`, it will write > the> > > passed in serialized message as an Entry stored in BookKeeper.> > > > > ```> > > void asyncAddEntry(ByteBuf headersAndPayload, AddEntryCallback > callback,> > > Object ctx)> > > ```> > > > > For raw Message metadata content, we need to put the current broker> > > timestamp in `message RawMessageMetadata` and use protobuf to serialize > it> > > into `ByteBuf`. Then we put the size and content of serialized > `ByteBuf`> > > along with original `ByteBuf headersAndPayload`.> > > > > Then “[HEADERS_AND_PAYLOAD]” turned into “[RAW_METADATA_MAGIC_NUMBER]> > > [RAW_METADATA_SIZE] [RAW_METADATA] [HEADERS_AND_PAYLOAD]” and stored in> > > BookKeeper.> > > > > ##### b)) seek by time happens> > > > > Originally, when a seek_by_time comes in, it will finally calls into > class> > > > `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages`> > > > and the original logic is like this> > > > > ```java> > > public void findMessages(final long timestamp,> > > AsyncCallbacks.FindEntryCallback callback) {> > > …> > > > > > cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,> > > > entry -> {> > > MessageImpl msg = null;> > > msg = MessageImpl.deserialize(entry.getDataBuffer()); < > ===> > > deserialize> > > return msg.getPublishTime() < timestamp;> > > }, this, callback);> > > …> > > ```> > > > > Find message through the topic-partition, and read out the target Entry> > > from BookKeeper, deserialize Entry into a MessageImpl and read the > message> > > publish time that match the requirements; if not match, iterate to > another> > > round of read another message.> > > > > By the new design, after reading out the target Entry from BookKeeper, > it> > > does not need to do the deserialize, but only need to read out the> > > “broker_timestamp” from the “raw Message metadata”.> > > As mentioned above, the frame stored in BookKeeper is as :> > > “[RAW_METADATA_MAGIC_NUMBER] [RAW_METADATA_SIZE] [RAW_METADATA]> > > [HEADERS_AND_PAYLOAD]”,> > > > > 1. Still use `entry.getDataBuffer()` to get the ByteBuf out;> > > 2. Read and check “[RAW_METADATA_MAGIC_NUMBER]” to see if “raw Message> > > metadata”feature is enabled; if not enabled, turn into old behaviour;> > > 3. If enabled, read “ [RAW_METADATA_SIZE] “, then read “[RAW_METADATA] > ”.> > > 4. Use protobuf to deserialize “[RAW_METADATA]”, and read> > > “broker_timestamp” out.> > > > > ##### c)) read an entry from BookKeeper and send it to the consumer > client> > > > > The main logic happened in method `sendMessages` in class> > > `org.apache.pulsar.broker.service.Consumer`, the change is similar to > above> > > “seek by time”, after read BookKeeper Entry, we need to filter out> > > "RAW_METADATA" related part, and only return “[HEADERS_AND_PAYLOAD]” > to> > > consumer client.> > > > > ### 3 Summary of changes that need> > > > > Here is a summary of above changes that related:> > > > > 1. Add raw Message metadata protobuf> > > > > ```protobuf> > > message RawMessageMetadata {> > > optional uint64 broker_timestamp = 1;> > > }> > > ```> > > > > 2. change how produced message is saved in bookkeeper:> > > `org.apache.pulsar.broker.service.Producer`> > > > > 3. change how message is seek-by-time:> > > > > > `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages`> > > > > > 4. change how message send back to Consumer> > > `org.apache.pulsar.broker.service.Consumer`> > > > > other changes:> > > > > 1. A broker config to enable this raw Message metadata feature. e.g.> > > “useBrokerTimestampsForMessageSeek”.> > > 2. besides change> > > > `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages`,> > > > search other places,> > > which called `MessageImpl.deserialize(entry.getDataBuffer())` and> > > `msg.getPublishTime()`, such as > `PersistentTopic.isOldestMessageExpired()`> > > to read message publish-time from new “raw Message metadata” instead of> > > deserialize the whole BookKeeper Entry into Message and call> > > `Message.getPublishTime()` if this feature enabled.> > > > > > > ### 4. Compatibility, Deprecation and Migration Plan> > > > > For the first feature, since most of the changes are internally changes > in> > > brokers, and it is a new feature which doesn’t change pulsar’s wire> > > protocol(between broker and client), and not changed public api. There > is> > > no backward compatibility issue. It is a newly added feature. So there > is> > > nothing to deprecate or migrate.> > > > > In the future, we will consider the compatibility when implementing the> > > secondary feature of “continuous message sequence-Id” since we should > send> > > messages with “sequence_id” to consumers. In this case, we will take> > > different actions dependent on the data format, feature enable or not > and> > > consumer client version . Data is new format if it contains> > > "[RAW_METADATA_MAGIC_NUMBER][RAW_METADATA_SIZE][RAW_METADATA]" and > consumer> > > is a new version if it can process message contains "[RAW_METADATA]".> > > > > ### 5. Test Plan> > > > > * Unit tests for each individual change: broker/client worked well for> > > produce/consume.> > > * Integration tests or UT for end-to-end pipeline: multi producers set> > > message's time in random time order; seek-by-time could seek to the > right> > > message.> > > * Load testing for ensuring performance: seek should be more quick.> > > > > > > -- > > > Best,> > > Aloys.> > > -- Best, Aloys.