Aloys,

Thank you for your confirmation and updates! I have updated the PIP to
https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata

I will review your PR closely.

- Sijie

On Thu, Dec 10, 2020 at 1:10 AM Aloys Zhang <lofterzh...@gmail.com> wrote:

> Sijie,
>
> "Broker Metadata"  is more precise for understanding.
>
> I have modify the PIP-70 in google docs
>
> https://docs.google.com/document/d/1IgnF9AJzL6JG6G4EL_xcoQxvOpd7bUXcgxFApBiPOFY/edit
> ,
> please correct me if anything is inappropriate.
>
>
>
> Sijie Guo <guosi...@gmail.com> 于2020年12月10日周四 下午1:45写道:
>
> > Matteo,
> >
> > Improving Protobuf serialization and deserialization can be a separate
> PIP
> > because it doesn't solve the problems that this PIP tries to solve.
> >
> > This PIP probably doesn't state the problem clearly. "Raw Metadata" is
> > probably an inaccurate name. It should probably be called "Broker
> Metadata"
> > which can be used for distinguishing the existing metadata that is
> > generated at the client-side.
> >
> > The "Broker Metadata" is used for storing all the information generated
> by
> > brokers, such as broker-side publish (append) timestamp, monotonically
> > increasing log index, and etc.
> >
> > The main reason we don't re-use existing "metadata" for this purpose is
> not
> > serialization & deserialization concerns. It is more about checksum
> > concern. Because the current metadata section is generated and the
> > corresponding checksum is set by the clients. If we want to mutate the
> > metadata, we have to re-generate the checksum. This would hugely impact
> > performance.
> >
> > Introducing a "Broker Metadata" section can avoid re-generating checksum
> > for a message batch. Also, it separates the client-generated metadata
> from
> > broker-generated metadata, avoiding any mistakes in touching
> > client-generated metadata.
> >
> > Hope this clarifies the problems that this PIP tries to solve.
> >
> >
> > Aloys,
> >
> > If you agree with my comments above, I think we should rename "Raw
> > Metadata" to "Broker Metadata" to make it clear to avoid any confusion.
> >
> > Thanks,
> > Sijie
> >
> > On Wed, Nov 18, 2020 at 10:47 AM Matteo Merli <matteo.me...@gmail.com>
> > wrote:
> >
> > > The goal of this proposal can be achieved automatically by using a
> > > better ser/de generator that doesn't have overhead for ignored fields.
> > >
> > > I'm preparing a revamping of the current Protobuf serialization and
> > > I'll send a proposal soon.
> > >
> > > Matteo
> > >
> > >
> > > --
> > > Matteo Merli
> > > <matteo.me...@gmail.com>
> > >
> > > On Sun, Nov 8, 2020 at 10:25 PM Aloys Zhang <lofterzh...@gmail.com>
> > wrote:
> > > >
> > > > Hi all,
> > > >
> > > > We have drafted a proposal for supporting lightweight raw Message
> > > metadata
> > > > which can be found at
> > > >
> > >
> >
> https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-raw-Message-metadata
> > > >  and
> > > >
> > >
> >
> https://docs.google.com/document/d/1IgnF9AJzL6JG6G4EL_xcoQxvOpd7bUXcgxFApBiPOFY
> > > >
> > > > Also, I copy it to the email thread for easier viewing.
> > > >
> > > > Any suggestions or ideas are welcomed to join the discussion.
> > > >
> > > >
> > > >
> > > > ## PIP-70:  Introduce lightweight raw Message metadata
> > > >
> > > > ### 1. Motivation
> > > >
> > > > For messages in Pulsar, If we want to add new property, we always
> > change
> > > > the `MessageMetadata` in protocol(PulsarApi.proto), this kind of
> > property
> > > > could be understood by both the broker side and client side by
> > > > deserializing the `MessageMetadata` . But in some different cases,,
> the
> > > > property needs to be added from the broker side, Or need to be
> > understood
> > > > by the broker side in a low cost way. When the broker side gets the
> > > message
> > > > produced from the client,  we could add the property at a new area,
> > which
> > > > does not combine with `MessageMetadata`, and no need deserializing
> > > original
> > > > `MessageMetadata` when gets it out ; and when the broker sends the
> > > message
> > > > to client, we could choose to filter out this part of property(or not
> > as
> > > > the client needs). We call this kind of property “raw Message
> > metadata”.
> > > By
> > > > this way, the “raw Message metadata” consumption is independent, and
> > not
> > > > related with the original `MessageMetadata`.
> > > >
> > > > The benefit for this kind of “raw Message metadata” is that the
> broker
> > > does
> > > > not need to  serialize/deserialize for the protobuf-ed
> > `MessageMetadata`,
> > > > this will provide a better performance. And also could provide a lot
> of
> > > > features that are not supported yet.
> > > >
> > > > Here are some of the use cases for raw Message metadata:
> > > > 1) Provide ordered messages by time(broker side) sequence to make
> > message
> > > > seek by time more accurate.
> > > > Currently, each message has a `publish_time`, it uses client side
> time,
> > > but
> > > > for different producers in different clients, the time may not align
> > > > between clients, and cause the message order and the message time
> > > > (`publish_time`) order may be different.  But each topic-partition
> only
> > > has
> > > > one owner broker, if we append broker side time in the “raw Message
> > > > metadata”, we could make sure the message order is aligned with
> broker
> > > side
> > > > time. With this feature, we could handle the message seek by time
> more
> > > > accurately.
> > > >
> > > > 2) Provide continuous message sequence-Id for messages in one
> > > > topic-partition.
> > > > MessageId is a combination of ledgerId+entryId+batchIndex; for a
> > > partition
> > > > that contains more than one ledger, the Ids inside is not continuous.
> > By
> > > > this solution, we could append a sequence-Id at the end of each
> > Message.
> > > > This will make the message sequence management earlier.
> > > >
> > > > In this proposal, we will take count in the first feature “provide
> > > ordered
> > > > message by time(broker side) sequence” mentioned above, this will be
> > > easier
> > > > to go through the proposal.
> > > >
> > > > ### 2. Message and “raw Message metadata” structure changes.
> > > >
> > > > As mentioned above, there are 2 main benefits in this proposal:
> > > >
> > > > 1. Most of all the change happened on the Broker side.
> > > > 2. Avoid to serialize/deserialize for the protobuf-ed
> > `MessageMetadata`.
> > > >
> > > > #### 2.1 Raw Message metadata structure in Protobuf
> > > >
> > > > Protobuf used a lot in Pulsar, we could use Protobuf to do the raw
> > > Message
> > > > metadata serialize/deserialize.
> > > > In this example, we will save the broker side timestamp when each
> > message
> > > > is sent from the broker to BookKeeper. So the definition is very
> > simple.
> > > >
> > > > ```protobuf
> > > > message RawMessageMetadata {
> > > >     optional uint64 broker_timestamp = 1;
> > > >    }
> > > > ```
> > > >
> > > > #### 2.2 Message and “raw Message metadata” structure details
> > > >
> > > > Each message is send from producer client to broker in this frame
> > format:
> > > >
> > > > ```
> > > > [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER] [CHECKSUM]
> [METADATA_SIZE]
> > > > [METADATA] [PAYLOAD]
> > > > ```
> > > >
> > > > The first 3 fields “[TOTAL_SIZE] [CMD_SIZE ] [CMD]” will be read in
> > > > `LengthFieldBasedFrameDecoder`  and `PulsarDecoder`, and left the
> rest
> > > part
> > > > handled in method
> > > > `org.apache.pulsar.broker.service.Producer.publishMessage`. The left
> > part
> > > > “[MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD]” is
> > > usually
> > > > treated as “headersAndPayload” in the code. As described above, we do
> > not
> > > > want this part to be changed at all, so we could take this part as a
> > > whole
> > > > package.
> > > >
> > > > ```
> > > > [MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD] ==>
> > > > [HEADERS_AND_PAYLOAD]
> > > > ```
> > > >
> > > > We need to add some fields to make ”Raw Message metadata” work well.
> > > >
> > > > ```
> > > > [RAW_METADATA_MAGIC_NUMBER] [RAW_METADATA_SIZE] [RAW_METADATA]
> > > > [HEADERS_AND_PAYLOAD]
> > > > ```
> > > >
> > > > RAW_METADATA_MAGIC_NUMBER is used to identify whether this feature is
> > on,
> > > > and the message is handled by this feature.
> > > >
> > > > RAW_METADATA_SIZE is added for this feature, and records the size of
> > the
> > > > serialised raw Message metadata content.
> > > >
> > > > RAW_METADATA is the serialised raw Message metadata content.
> > > >
> > > > “HEADERS_AND_PAYLOAD” is the original ByteBuf data that contains
> > metadata
> > > > and payload.
> > > >
> > > > We put the new added fields before instead of after
> > "HEADERS_AND_PAYLOAD"
> > > >  here, this is because
> > > >
> > > > - Firstly , the “raw Message metadata” is an addition for origin
> > protocol
> > > > and it can support this new feature without much modification for
> > origin
> > > > wire protocol, especially it does not need to serialize/deserialize
> for
> > > the
> > > > protobuf-ed `MessageMetadata` build from producer.
> > > > - Secondly, if we put new fields after "HEADERS_AND_PAYLOAD" , we
> can't
> > > > know where the offset for new added fields since we don't know the
> the
> > > > length of "PAYLOAD", and also, fields after "PAYLOAD" will change the
> > > CRC32
> > > > checksum which means we need to recalculate the checksum one more
> time.
> > > >
> > > > #### 2.3 Message and “raw Message metadata” lifecycle
> > > >
> > > > The message process logic would be like this:
> > > >
> > > > 1. Producer client send the original message to broker, and parsed by
> > > > “PulsarDecoder”,  only “[HEADERS_AND_PAYLOAD]” left;
> > > > 2. originally, “[HEADERS_AND_PAYLOAD]” will be write into BookKeeper
> by
> > > > method `asyncAddEntry` in `ManagedLedger`; but now we first add above
> > > > “[RAW_METADATA]” and related parts along with
> “[HEADERS_AND_PAYLOAD]”,
> > > then
> > > > write the whole ByteBuf into BookKeeper by `asyncAddEntry` ; so the
> > "raw
> > > > Message metadata" is kept together with original message in
> BookKeeper.
> > > > 3. When some features need to use this “raw Message metadata”, read
> the
> > > new
> > > >  BookKeeper “Entry”, and get the raw Message metadata information to
> > > serve
> > > > the new feature.
> > > > 4. In this “provide ordered message by time(broker side) sequence”
> > > feature,
> > > > when a seek-by-time operation passes into the broker, we readout
> > > > “broker_timestamp” from “[RAW_METADATA]”.
> > > > 5. After brokers read BookKeeper Entry out, and before send to the
> > > consumer
> > > > client, we need to filter out the raw Message metadata related part,
> > and
> > > > only return “[HEADERS_AND_PAYLOAD]” part.
> > > >
> > > > By this way, we don't need to  serialize/deserialize for the
> > protobuf-ed
> > > > `MessageMetadata`.
> > > >
> > > > For more details, in these steps:
> > > >
> > > > ##### a)) produced message handle and store
> > > >
> > > > Producer client side message send to broker will be handled by method
> > > >
> > > > ```
> > > > publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId,
> long
> > > > batchSize, boolean isChunked)
> > > > ```
> > > >
> > > > in class `org.apache.pulsar.broker.service.Producer`. And it will
> > finally
> > > > call into the method `asyncAddEntry` of `ManagedLedger`, it will
> write
> > > the
> > > > passed in serialized message as an Entry stored in BookKeeper.
> > > >
> > > > ```
> > > > void asyncAddEntry(ByteBuf headersAndPayload, AddEntryCallback
> > callback,
> > > > Object ctx)
> > > > ```
> > > >
> > > > For raw Message metadata content,  we need to put the current broker
> > > > timestamp in `message RawMessageMetadata` and use protobuf to
> serialize
> > > it
> > > > into `ByteBuf`.  Then we put the size and content of serialized
> > `ByteBuf`
> > > > along with original `ByteBuf headersAndPayload`.
> > > >
> > > > Then “[HEADERS_AND_PAYLOAD]” turned into “[RAW_METADATA_MAGIC_NUMBER]
> > > > [RAW_METADATA_SIZE] [RAW_METADATA] [HEADERS_AND_PAYLOAD]” and stored
> in
> > > > BookKeeper.
> > > >
> > > > ##### b)) seek by time happens
> > > >
> > > > Originally, when a seek_by_time comes in, it will finally calls into
> > > class
> > > >
> > >
> >
> `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages`
> > > > and the original logic is like this
> > > >
> > > > ```java
> > > > public void findMessages(final long timestamp,
> > > > AsyncCallbacks.FindEntryCallback callback) {
> > > >        …
> > > >
> > > >
> > >
> >
> cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
> > > > entry -> {
> > > >             MessageImpl msg = null;
> > > >             msg = MessageImpl.deserialize(entry.getDataBuffer());  <
> > ===
> > > > deserialize
> > > >             return msg.getPublishTime() < timestamp;
> > > >         }, this, callback);
> > > >       …
> > > > ```
> > > >
> > > > Find message through the topic-partition, and read out the target
> Entry
> > > > from BookKeeper, deserialize Entry into a MessageImpl and read the
> > > message
> > > > publish time that match the requirements; if not match, iterate to
> > > another
> > > > round of read another message.
> > > >
> > > > By the new design, after reading out the target Entry from
> BookKeeper,
> > it
> > > > does not need to do the deserialize, but only need to read out the
> > > > “broker_timestamp” from the “raw Message metadata”.
> > > > As mentioned above, the frame stored in BookKeeper is as :
> > > > “[RAW_METADATA_MAGIC_NUMBER] [RAW_METADATA_SIZE] [RAW_METADATA]
> > > > [HEADERS_AND_PAYLOAD]”,
> > > >
> > > > 1. Still use `entry.getDataBuffer()` to get the ByteBuf out;
> > > > 2. Read and check “[RAW_METADATA_MAGIC_NUMBER]” to see if “raw
> Message
> > > > metadata”feature is enabled; if not enabled, turn into old behaviour;
> > > > 3. If enabled, read “ [RAW_METADATA_SIZE] “, then read
> “[RAW_METADATA]
> > ”.
> > > > 4. Use protobuf to  deserialize “[RAW_METADATA]”, and read
> > > > “broker_timestamp” out.
> > > >
> > > > ##### c)) read an entry from BookKeeper and send it to the consumer
> > > client
> > > >
> > > > The main logic happened in method `sendMessages` in class
> > > > `org.apache.pulsar.broker.service.Consumer`, the change is similar to
> > > above
> > > > “seek by time”, after read BookKeeper Entry, we need to filter out
> > > >  "RAW_METADATA" related part, and only return “[HEADERS_AND_PAYLOAD]”
> > to
> > > > consumer client.
> > > >
> > > > ### 3 Summary of changes that need
> > > >
> > > > Here is a summary of above changes that related:
> > > >
> > > > 1. Add raw Message metadata protobuf
> > > >
> > > > ```protobuf
> > > > message RawMessageMetadata {
> > > >     optional uint64 broker_timestamp = 1;
> > > > }
> > > > ```
> > > >
> > > > 2. change how produced message is saved in bookkeeper:
> > > >    `org.apache.pulsar.broker.service.Producer`
> > > >
> > > > 3. change how message is seek-by-time:
> > > >
> > > >
> > >
> >
> `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages`
> > > >
> > > > 4. change how message send back to Consumer
> > > >    `org.apache.pulsar.broker.service.Consumer`
> > > >
> > > > other changes:
> > > >
> > > > 1. A broker config to enable this raw Message metadata feature. e.g.
> > > > “useBrokerTimestampsForMessageSeek”.
> > > > 2. besides change
> > > >
> > >
> >
> `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages`,
> > > > search other places,
> > > > which called `MessageImpl.deserialize(entry.getDataBuffer())` and
> > > > `msg.getPublishTime()`, such as
> > > `PersistentTopic.isOldestMessageExpired()`
> > > > to read message publish-time from new “raw Message metadata” instead
> of
> > > > deserialize the whole BookKeeper Entry into Message and call
> > > > `Message.getPublishTime()` if this feature enabled.
> > > >
> > > >
> > > > ### 4. Compatibility, Deprecation and Migration Plan
> > > >
> > > > For the first feature, since most of the changes are internally
> changes
> > > in
> > > > brokers, and it is a new feature which doesn’t change pulsar’s wire
> > > > protocol(between broker and client), and not changed public api.
> There
> > is
> > > > no backward compatibility issue. It is a newly added feature. So
> there
> > is
> > > > nothing to deprecate or migrate.
> > > >
> > > > In the future, we will consider the compatibility when implementing
> the
> > > > secondary feature of “continuous message sequence-Id” since we should
> > > send
> > > > messages with “sequence_id” to consumers. In this case, we will take
> > > > different actions dependent on the data format, feature enable or not
> > and
> > > > consumer client version . Data is new format if it contains
> > > > "[RAW_METADATA_MAGIC_NUMBER][RAW_METADATA_SIZE][RAW_METADATA]" and
> > > consumer
> > > > is a new version if it can process message contains
> "[RAW_METADATA]".
> > > >
> > > > ### 5. Test Plan
> > > >
> > > > * Unit tests for each individual change: broker/client worked well
> for
> > > > produce/consume.
> > > > * Integration tests or UT for end-to-end pipeline: multi producers
> set
> > > > message's time in random time order; seek-by-time could seek to the
> > right
> > > > message.
> > > > * Load testing for ensuring performance: seek should be more quick.
> > > >
> > > >
> > > > --
> > > > Best,
> > > > Aloys.
> > >
> >
>
>
> --
> Best,
> Aloys.
>

Reply via email to