Yunze,

Thanks for you suggestion.  Protobuf does have the ability to check whether
a field is existed. But if we want to use this ability, we should get a
Protobuf object first.
In this proposal, RAW_METADATA_MAGIC_NUMBER is used to indicate what type
object we can get from the bytebuf of Entry. If RAW_METADATA_MAGIC_NUMBER
exists, the bytebuf head will be parsed as RAW_METADATA first, otherwise,
bytebuf will parsed as origin data format without RAW_METADATA.

Yunze Xu <y...@streamnative.io> 于2020年11月16日周一 上午10:08写道:

> I think protobuf has the ability to check if a field is enabled. i.e.
> RAW_METADATA_MAGIC_NUMBER and RAW_METADATA_SIZE are included in the
> protobuf-ed struct. In Kafka, a magic number represents the version of
> protocol not if the feature is enabled. If we need a *real* magic number,
> we must make it clear.
>
> On 2020/11/09 06:24:18, Aloys Zhang <l...@gmail.com> wrote:
> > Hi all,>
> >
> > We have drafted a proposal for supporting lightweight raw Message
> metadata>
> > which can be found at>
> >
> https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-raw-Message-metadata>
>
> >  and>
> >
> https://docs.google.com/document/d/1IgnF9AJzL6JG6G4EL_xcoQxvOpd7bUXcgxFApBiPOFY>
>
> >
> > Also, I copy it to the email thread for easier viewing.>
> >
> > Any suggestions or ideas are welcomed to join the discussion.>
> >
> >
> >
> > ## PIP-70:  Introduce lightweight raw Message metadata>
> >
> > ### 1. Motivation>
> >
> > For messages in Pulsar, If we want to add new property, we always
> change>
> > the `MessageMetadata` in protocol(PulsarApi.proto), this kind of
> property>
> > could be understood by both the broker side and client side by>
> > deserializing the `MessageMetadata` . But in some different cases,, the>
> > property needs to be added from the broker side, Or need to be
> understood>
> > by the broker side in a low cost way. When the broker side gets the
> message>
> > produced from the client,  we could add the property at a new area,
> which>
> > does not combine with `MessageMetadata`, and no need deserializing
> original>
> > `MessageMetadata` when gets it out ; and when the broker sends the
> message>
> > to client, we could choose to filter out this part of property(or not
> as>
> > the client needs). We call this kind of property “raw Message metadata”.
> By>
> > this way, the “raw Message metadata” consumption is independent, and
> not>
> > related with the original `MessageMetadata`.>
> >
> > The benefit for this kind of “raw Message metadata” is that the broker
> does>
> > not need to  serialize/deserialize for the protobuf-ed
> `MessageMetadata`,>
> > this will provide a better performance. And also could provide a lot of>
> > features that are not supported yet.>
> >
> > Here are some of the use cases for raw Message metadata:>
> > 1) Provide ordered messages by time(broker side) sequence to make
> message>
> > seek by time more accurate.>
> > Currently, each message has a `publish_time`, it uses client side time,
> but>
> > for different producers in different clients, the time may not align>
> > between clients, and cause the message order and the message time>
> > (`publish_time`) order may be different.  But each topic-partition only
> has>
> > one owner broker, if we append broker side time in the “raw Message>
> > metadata”, we could make sure the message order is aligned with broker
> side>
> > time. With this feature, we could handle the message seek by time more>
> > accurately.>
> >
> > 2) Provide continuous message sequence-Id for messages in one>
> > topic-partition.>
> > MessageId is a combination of ledgerId+entryId+batchIndex; for a
> partition>
> > that contains more than one ledger, the Ids inside is not continuous.
> By>
> > this solution, we could append a sequence-Id at the end of each
> Message.>
> > This will make the message sequence management earlier.>
> >
> > In this proposal, we will take count in the first feature “provide
> ordered>
> > message by time(broker side) sequence” mentioned above, this will be
> easier>
> > to go through the proposal.>
> >
> > ### 2. Message and “raw Message metadata” structure changes.>
> >
> > As mentioned above, there are 2 main benefits in this proposal:>
> >
> > 1. Most of all the change happened on the Broker side.>
> > 2. Avoid to serialize/deserialize for the protobuf-ed
> `MessageMetadata`.>
> >
> > #### 2.1 Raw Message metadata structure in Protobuf>
> >
> > Protobuf used a lot in Pulsar, we could use Protobuf to do the raw
> Message>
> > metadata serialize/deserialize.>
> > In this example, we will save the broker side timestamp when each
> message>
> > is sent from the broker to BookKeeper. So the definition is very
> simple.>
> >
> > ```protobuf>
> > message RawMessageMetadata {>
> >     optional uint64 broker_timestamp = 1;>
> >    }>
> > ```>
> >
> > #### 2.2 Message and “raw Message metadata” structure details>
> >
> > Each message is send from producer client to broker in this frame
> format:>
> >
> > ```>
> > [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE]>
> > [METADATA] [PAYLOAD]>
> > ```>
> >
> > The first 3 fields “[TOTAL_SIZE] [CMD_SIZE ] [CMD]” will be read in>
> > `LengthFieldBasedFrameDecoder`  and `PulsarDecoder`, and left the rest
> part>
> > handled in method>
> > `org.apache.pulsar.broker.service.Producer.publishMessage`. The left
> part>
> > “[MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD]” is
> usually>
> > treated as “headersAndPayload” in the code. As described above, we do
> not>
> > want this part to be changed at all, so we could take this part as a
> whole>
> > package.>
> >
> > ```>
> > [MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD] ==>>
> > [HEADERS_AND_PAYLOAD]>
> > ```>
> >
> > We need to add some fields to make ”Raw Message metadata” work well.>
> >
> > ```>
> > [RAW_METADATA_MAGIC_NUMBER] [RAW_METADATA_SIZE] [RAW_METADATA]>
> > [HEADERS_AND_PAYLOAD]>
> > ```>
> >
> > RAW_METADATA_MAGIC_NUMBER is used to identify whether this feature is
> on,>
> > and the message is handled by this feature.>
> >
> > RAW_METADATA_SIZE is added for this feature, and records the size of
> the>
> > serialised raw Message metadata content.>
> >
> > RAW_METADATA is the serialised raw Message metadata content.>
> >
> > “HEADERS_AND_PAYLOAD” is the original ByteBuf data that contains
> metadata>
> > and payload.>
> >
> > We put the new added fields before instead of after
> "HEADERS_AND_PAYLOAD">
> >  here, this is because>
> >
> > - Firstly , the “raw Message metadata” is an addition for origin
> protocol>
> > and it can support this new feature without much modification for
> origin>
> > wire protocol, especially it does not need to serialize/deserialize for
> the>
> > protobuf-ed `MessageMetadata` build from producer.>
> > - Secondly, if we put new fields after "HEADERS_AND_PAYLOAD" , we can't>
> > know where the offset for new added fields since we don't know the the>
> > length of "PAYLOAD", and also, fields after "PAYLOAD" will change the
> CRC32>
> > checksum which means we need to recalculate the checksum one more time.>
> >
> > #### 2.3 Message and “raw Message metadata” lifecycle>
> >
> > The message process logic would be like this:>
> >
> > 1. Producer client send the original message to broker, and parsed by>
> > “PulsarDecoder”,  only “[HEADERS_AND_PAYLOAD]” left;>
> > 2. originally, “[HEADERS_AND_PAYLOAD]” will be write into BookKeeper by>
> > method `asyncAddEntry` in `ManagedLedger`; but now we first add above>
> > “[RAW_METADATA]” and related parts along with “[HEADERS_AND_PAYLOAD]”,
> then>
> > write the whole ByteBuf into BookKeeper by `asyncAddEntry` ; so the
> "raw>
> > Message metadata" is kept together with original message in BookKeeper.>
> > 3. When some features need to use this “raw Message metadata”, read the
> new>
> >  BookKeeper “Entry”, and get the raw Message metadata information to
> serve>
> > the new feature.>
> > 4. In this “provide ordered message by time(broker side) sequence”
> feature,>
> > when a seek-by-time operation passes into the broker, we readout>
> > “broker_timestamp” from “[RAW_METADATA]”.>
> > 5. After brokers read BookKeeper Entry out, and before send to the
> consumer>
> > client, we need to filter out the raw Message metadata related part,
> and>
> > only return “[HEADERS_AND_PAYLOAD]” part.>
> >
> > By this way, we don't need to  serialize/deserialize for the
> protobuf-ed>
> > `MessageMetadata`.>
> >
> > For more details, in these steps:>
> >
> > ##### a)) produced message handle and store>
> >
> > Producer client side message send to broker will be handled by method>
> >
> > ```>
> > publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long>
> > batchSize, boolean isChunked)>
> > ```>
> >
> > in class `org.apache.pulsar.broker.service.Producer`. And it will
> finally>
> > call into the method `asyncAddEntry` of `ManagedLedger`, it will write
> the>
> > passed in serialized message as an Entry stored in BookKeeper.>
> >
> > ```>
> > void asyncAddEntry(ByteBuf headersAndPayload, AddEntryCallback
> callback,>
> > Object ctx)>
> > ```>
> >
> > For raw Message metadata content,  we need to put the current broker>
> > timestamp in `message RawMessageMetadata` and use protobuf to serialize
> it>
> > into `ByteBuf`.  Then we put the size and content of serialized
> `ByteBuf`>
> > along with original `ByteBuf headersAndPayload`.>
> >
> > Then “[HEADERS_AND_PAYLOAD]” turned into “[RAW_METADATA_MAGIC_NUMBER]>
> > [RAW_METADATA_SIZE] [RAW_METADATA] [HEADERS_AND_PAYLOAD]” and stored in>
> > BookKeeper.>
> >
> > ##### b)) seek by time happens>
> >
> > Originally, when a seek_by_time comes in, it will finally calls into
> class>
> >
> `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages`>
>
> > and the original logic is like this>
> >
> > ```java>
> > public void findMessages(final long timestamp,>
> > AsyncCallbacks.FindEntryCallback callback) {>
> >        …>
> >
> >
> cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,>
>
> > entry -> {>
> >             MessageImpl msg = null;>
> >             msg = MessageImpl.deserialize(entry.getDataBuffer());  <
> ===>
> > deserialize>
> >             return msg.getPublishTime() < timestamp;>
> >         }, this, callback);>
> >       …>
> > ```>
> >
> > Find message through the topic-partition, and read out the target Entry>
> > from BookKeeper, deserialize Entry into a MessageImpl and read the
> message>
> > publish time that match the requirements; if not match, iterate to
> another>
> > round of read another message.>
> >
> > By the new design, after reading out the target Entry from BookKeeper,
> it>
> > does not need to do the deserialize, but only need to read out the>
> > “broker_timestamp” from the “raw Message metadata”.>
> > As mentioned above, the frame stored in BookKeeper is as :>
> > “[RAW_METADATA_MAGIC_NUMBER] [RAW_METADATA_SIZE] [RAW_METADATA]>
> > [HEADERS_AND_PAYLOAD]”,>
> >
> > 1. Still use `entry.getDataBuffer()` to get the ByteBuf out;>
> > 2. Read and check “[RAW_METADATA_MAGIC_NUMBER]” to see if “raw Message>
> > metadata”feature is enabled; if not enabled, turn into old behaviour;>
> > 3. If enabled, read “ [RAW_METADATA_SIZE] “, then read “[RAW_METADATA]
> ”.>
> > 4. Use protobuf to  deserialize “[RAW_METADATA]”, and read>
> > “broker_timestamp” out.>
> >
> > ##### c)) read an entry from BookKeeper and send it to the consumer
> client>
> >
> > The main logic happened in method `sendMessages` in class>
> > `org.apache.pulsar.broker.service.Consumer`, the change is similar to
> above>
> > “seek by time”, after read BookKeeper Entry, we need to filter out>
> >  "RAW_METADATA" related part, and only return “[HEADERS_AND_PAYLOAD]”
> to>
> > consumer client.>
> >
> > ### 3 Summary of changes that need>
> >
> > Here is a summary of above changes that related:>
> >
> > 1. Add raw Message metadata protobuf>
> >
> > ```protobuf>
> > message RawMessageMetadata {>
> >     optional uint64 broker_timestamp = 1;>
> > }>
> > ```>
> >
> > 2. change how produced message is saved in bookkeeper:>
> >    `org.apache.pulsar.broker.service.Producer`>
> >
> > 3. change how message is seek-by-time:>
> >
> >
> `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages`>
>
> >
> > 4. change how message send back to Consumer>
> >    `org.apache.pulsar.broker.service.Consumer`>
> >
> > other changes:>
> >
> > 1. A broker config to enable this raw Message metadata feature. e.g.>
> > “useBrokerTimestampsForMessageSeek”.>
> > 2. besides change>
> >
> `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages`,>
>
> > search other places,>
> > which called `MessageImpl.deserialize(entry.getDataBuffer())` and>
> > `msg.getPublishTime()`, such as
> `PersistentTopic.isOldestMessageExpired()`>
> > to read message publish-time from new “raw Message metadata” instead of>
> > deserialize the whole BookKeeper Entry into Message and call>
> > `Message.getPublishTime()` if this feature enabled.>
> >
> >
> > ### 4. Compatibility, Deprecation and Migration Plan>
> >
> > For the first feature, since most of the changes are internally changes
> in>
> > brokers, and it is a new feature which doesn’t change pulsar’s wire>
> > protocol(between broker and client), and not changed public api. There
> is>
> > no backward compatibility issue. It is a newly added feature. So there
> is>
> > nothing to deprecate or migrate.>
> >
> > In the future, we will consider the compatibility when implementing the>
> > secondary feature of “continuous message sequence-Id” since we should
> send>
> > messages with “sequence_id” to consumers. In this case, we will take>
> > different actions dependent on the data format, feature enable or not
> and>
> > consumer client version . Data is new format if it contains>
> > "[RAW_METADATA_MAGIC_NUMBER][RAW_METADATA_SIZE][RAW_METADATA]" and
> consumer>
> > is a new version if it can process message contains  "[RAW_METADATA]".>
> >
> > ### 5. Test Plan>
> >
> > * Unit tests for each individual change: broker/client worked well for>
> > produce/consume.>
> > * Integration tests or UT for end-to-end pipeline: multi producers set>
> > message's time in random time order; seek-by-time could seek to the
> right>
> > message.>
> > * Load testing for ensuring performance: seek should be more quick.>
> >
> >
> > -- >
> > Best,>
> > Aloys.>
> >



-- 
Best,
Aloys.

Reply via email to