Aloys, Thank you for your confirmation and updates! I have updated the PIP to https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata
I will review your PR closely. - Sijie On Thu, Dec 10, 2020 at 1:10 AM Aloys Zhang <lofterzh...@gmail.com> wrote: > Sijie, > > "Broker Metadata" is more precise for understanding. > > I have modify the PIP-70 in google docs > > https://docs.google.com/document/d/1IgnF9AJzL6JG6G4EL_xcoQxvOpd7bUXcgxFApBiPOFY/edit > , > please correct me if anything is inappropriate. > > > > Sijie Guo <guosi...@gmail.com> 于2020年12月10日周四 下午1:45写道: > > > Matteo, > > > > Improving Protobuf serialization and deserialization can be a separate > PIP > > because it doesn't solve the problems that this PIP tries to solve. > > > > This PIP probably doesn't state the problem clearly. "Raw Metadata" is > > probably an inaccurate name. It should probably be called "Broker > Metadata" > > which can be used for distinguishing the existing metadata that is > > generated at the client-side. > > > > The "Broker Metadata" is used for storing all the information generated > by > > brokers, such as broker-side publish (append) timestamp, monotonically > > increasing log index, and etc. > > > > The main reason we don't re-use existing "metadata" for this purpose is > not > > serialization & deserialization concerns. It is more about checksum > > concern. Because the current metadata section is generated and the > > corresponding checksum is set by the clients. If we want to mutate the > > metadata, we have to re-generate the checksum. This would hugely impact > > performance. > > > > Introducing a "Broker Metadata" section can avoid re-generating checksum > > for a message batch. Also, it separates the client-generated metadata > from > > broker-generated metadata, avoiding any mistakes in touching > > client-generated metadata. > > > > Hope this clarifies the problems that this PIP tries to solve. > > > > > > Aloys, > > > > If you agree with my comments above, I think we should rename "Raw > > Metadata" to "Broker Metadata" to make it clear to avoid any confusion. > > > > Thanks, > > Sijie > > > > On Wed, Nov 18, 2020 at 10:47 AM Matteo Merli <matteo.me...@gmail.com> > > wrote: > > > > > The goal of this proposal can be achieved automatically by using a > > > better ser/de generator that doesn't have overhead for ignored fields. > > > > > > I'm preparing a revamping of the current Protobuf serialization and > > > I'll send a proposal soon. > > > > > > Matteo > > > > > > > > > -- > > > Matteo Merli > > > <matteo.me...@gmail.com> > > > > > > On Sun, Nov 8, 2020 at 10:25 PM Aloys Zhang <lofterzh...@gmail.com> > > wrote: > > > > > > > > Hi all, > > > > > > > > We have drafted a proposal for supporting lightweight raw Message > > > metadata > > > > which can be found at > > > > > > > > > > https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-raw-Message-metadata > > > > and > > > > > > > > > > https://docs.google.com/document/d/1IgnF9AJzL6JG6G4EL_xcoQxvOpd7bUXcgxFApBiPOFY > > > > > > > > Also, I copy it to the email thread for easier viewing. > > > > > > > > Any suggestions or ideas are welcomed to join the discussion. > > > > > > > > > > > > > > > > ## PIP-70: Introduce lightweight raw Message metadata > > > > > > > > ### 1. Motivation > > > > > > > > For messages in Pulsar, If we want to add new property, we always > > change > > > > the `MessageMetadata` in protocol(PulsarApi.proto), this kind of > > property > > > > could be understood by both the broker side and client side by > > > > deserializing the `MessageMetadata` . But in some different cases,, > the > > > > property needs to be added from the broker side, Or need to be > > understood > > > > by the broker side in a low cost way. When the broker side gets the > > > message > > > > produced from the client, we could add the property at a new area, > > which > > > > does not combine with `MessageMetadata`, and no need deserializing > > > original > > > > `MessageMetadata` when gets it out ; and when the broker sends the > > > message > > > > to client, we could choose to filter out this part of property(or not > > as > > > > the client needs). We call this kind of property “raw Message > > metadata”. > > > By > > > > this way, the “raw Message metadata” consumption is independent, and > > not > > > > related with the original `MessageMetadata`. > > > > > > > > The benefit for this kind of “raw Message metadata” is that the > broker > > > does > > > > not need to serialize/deserialize for the protobuf-ed > > `MessageMetadata`, > > > > this will provide a better performance. And also could provide a lot > of > > > > features that are not supported yet. > > > > > > > > Here are some of the use cases for raw Message metadata: > > > > 1) Provide ordered messages by time(broker side) sequence to make > > message > > > > seek by time more accurate. > > > > Currently, each message has a `publish_time`, it uses client side > time, > > > but > > > > for different producers in different clients, the time may not align > > > > between clients, and cause the message order and the message time > > > > (`publish_time`) order may be different. But each topic-partition > only > > > has > > > > one owner broker, if we append broker side time in the “raw Message > > > > metadata”, we could make sure the message order is aligned with > broker > > > side > > > > time. With this feature, we could handle the message seek by time > more > > > > accurately. > > > > > > > > 2) Provide continuous message sequence-Id for messages in one > > > > topic-partition. > > > > MessageId is a combination of ledgerId+entryId+batchIndex; for a > > > partition > > > > that contains more than one ledger, the Ids inside is not continuous. > > By > > > > this solution, we could append a sequence-Id at the end of each > > Message. > > > > This will make the message sequence management earlier. > > > > > > > > In this proposal, we will take count in the first feature “provide > > > ordered > > > > message by time(broker side) sequence” mentioned above, this will be > > > easier > > > > to go through the proposal. > > > > > > > > ### 2. Message and “raw Message metadata” structure changes. > > > > > > > > As mentioned above, there are 2 main benefits in this proposal: > > > > > > > > 1. Most of all the change happened on the Broker side. > > > > 2. Avoid to serialize/deserialize for the protobuf-ed > > `MessageMetadata`. > > > > > > > > #### 2.1 Raw Message metadata structure in Protobuf > > > > > > > > Protobuf used a lot in Pulsar, we could use Protobuf to do the raw > > > Message > > > > metadata serialize/deserialize. > > > > In this example, we will save the broker side timestamp when each > > message > > > > is sent from the broker to BookKeeper. So the definition is very > > simple. > > > > > > > > ```protobuf > > > > message RawMessageMetadata { > > > > optional uint64 broker_timestamp = 1; > > > > } > > > > ``` > > > > > > > > #### 2.2 Message and “raw Message metadata” structure details > > > > > > > > Each message is send from producer client to broker in this frame > > format: > > > > > > > > ``` > > > > [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER] [CHECKSUM] > [METADATA_SIZE] > > > > [METADATA] [PAYLOAD] > > > > ``` > > > > > > > > The first 3 fields “[TOTAL_SIZE] [CMD_SIZE ] [CMD]” will be read in > > > > `LengthFieldBasedFrameDecoder` and `PulsarDecoder`, and left the > rest > > > part > > > > handled in method > > > > `org.apache.pulsar.broker.service.Producer.publishMessage`. The left > > part > > > > “[MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD]” is > > > usually > > > > treated as “headersAndPayload” in the code. As described above, we do > > not > > > > want this part to be changed at all, so we could take this part as a > > > whole > > > > package. > > > > > > > > ``` > > > > [MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD] ==> > > > > [HEADERS_AND_PAYLOAD] > > > > ``` > > > > > > > > We need to add some fields to make ”Raw Message metadata” work well. > > > > > > > > ``` > > > > [RAW_METADATA_MAGIC_NUMBER] [RAW_METADATA_SIZE] [RAW_METADATA] > > > > [HEADERS_AND_PAYLOAD] > > > > ``` > > > > > > > > RAW_METADATA_MAGIC_NUMBER is used to identify whether this feature is > > on, > > > > and the message is handled by this feature. > > > > > > > > RAW_METADATA_SIZE is added for this feature, and records the size of > > the > > > > serialised raw Message metadata content. > > > > > > > > RAW_METADATA is the serialised raw Message metadata content. > > > > > > > > “HEADERS_AND_PAYLOAD” is the original ByteBuf data that contains > > metadata > > > > and payload. > > > > > > > > We put the new added fields before instead of after > > "HEADERS_AND_PAYLOAD" > > > > here, this is because > > > > > > > > - Firstly , the “raw Message metadata” is an addition for origin > > protocol > > > > and it can support this new feature without much modification for > > origin > > > > wire protocol, especially it does not need to serialize/deserialize > for > > > the > > > > protobuf-ed `MessageMetadata` build from producer. > > > > - Secondly, if we put new fields after "HEADERS_AND_PAYLOAD" , we > can't > > > > know where the offset for new added fields since we don't know the > the > > > > length of "PAYLOAD", and also, fields after "PAYLOAD" will change the > > > CRC32 > > > > checksum which means we need to recalculate the checksum one more > time. > > > > > > > > #### 2.3 Message and “raw Message metadata” lifecycle > > > > > > > > The message process logic would be like this: > > > > > > > > 1. Producer client send the original message to broker, and parsed by > > > > “PulsarDecoder”, only “[HEADERS_AND_PAYLOAD]” left; > > > > 2. originally, “[HEADERS_AND_PAYLOAD]” will be write into BookKeeper > by > > > > method `asyncAddEntry` in `ManagedLedger`; but now we first add above > > > > “[RAW_METADATA]” and related parts along with > “[HEADERS_AND_PAYLOAD]”, > > > then > > > > write the whole ByteBuf into BookKeeper by `asyncAddEntry` ; so the > > "raw > > > > Message metadata" is kept together with original message in > BookKeeper. > > > > 3. When some features need to use this “raw Message metadata”, read > the > > > new > > > > BookKeeper “Entry”, and get the raw Message metadata information to > > > serve > > > > the new feature. > > > > 4. In this “provide ordered message by time(broker side) sequence” > > > feature, > > > > when a seek-by-time operation passes into the broker, we readout > > > > “broker_timestamp” from “[RAW_METADATA]”. > > > > 5. After brokers read BookKeeper Entry out, and before send to the > > > consumer > > > > client, we need to filter out the raw Message metadata related part, > > and > > > > only return “[HEADERS_AND_PAYLOAD]” part. > > > > > > > > By this way, we don't need to serialize/deserialize for the > > protobuf-ed > > > > `MessageMetadata`. > > > > > > > > For more details, in these steps: > > > > > > > > ##### a)) produced message handle and store > > > > > > > > Producer client side message send to broker will be handled by method > > > > > > > > ``` > > > > publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, > long > > > > batchSize, boolean isChunked) > > > > ``` > > > > > > > > in class `org.apache.pulsar.broker.service.Producer`. And it will > > finally > > > > call into the method `asyncAddEntry` of `ManagedLedger`, it will > write > > > the > > > > passed in serialized message as an Entry stored in BookKeeper. > > > > > > > > ``` > > > > void asyncAddEntry(ByteBuf headersAndPayload, AddEntryCallback > > callback, > > > > Object ctx) > > > > ``` > > > > > > > > For raw Message metadata content, we need to put the current broker > > > > timestamp in `message RawMessageMetadata` and use protobuf to > serialize > > > it > > > > into `ByteBuf`. Then we put the size and content of serialized > > `ByteBuf` > > > > along with original `ByteBuf headersAndPayload`. > > > > > > > > Then “[HEADERS_AND_PAYLOAD]” turned into “[RAW_METADATA_MAGIC_NUMBER] > > > > [RAW_METADATA_SIZE] [RAW_METADATA] [HEADERS_AND_PAYLOAD]” and stored > in > > > > BookKeeper. > > > > > > > > ##### b)) seek by time happens > > > > > > > > Originally, when a seek_by_time comes in, it will finally calls into > > > class > > > > > > > > > > `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages` > > > > and the original logic is like this > > > > > > > > ```java > > > > public void findMessages(final long timestamp, > > > > AsyncCallbacks.FindEntryCallback callback) { > > > > … > > > > > > > > > > > > > > cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, > > > > entry -> { > > > > MessageImpl msg = null; > > > > msg = MessageImpl.deserialize(entry.getDataBuffer()); < > > === > > > > deserialize > > > > return msg.getPublishTime() < timestamp; > > > > }, this, callback); > > > > … > > > > ``` > > > > > > > > Find message through the topic-partition, and read out the target > Entry > > > > from BookKeeper, deserialize Entry into a MessageImpl and read the > > > message > > > > publish time that match the requirements; if not match, iterate to > > > another > > > > round of read another message. > > > > > > > > By the new design, after reading out the target Entry from > BookKeeper, > > it > > > > does not need to do the deserialize, but only need to read out the > > > > “broker_timestamp” from the “raw Message metadata”. > > > > As mentioned above, the frame stored in BookKeeper is as : > > > > “[RAW_METADATA_MAGIC_NUMBER] [RAW_METADATA_SIZE] [RAW_METADATA] > > > > [HEADERS_AND_PAYLOAD]”, > > > > > > > > 1. Still use `entry.getDataBuffer()` to get the ByteBuf out; > > > > 2. Read and check “[RAW_METADATA_MAGIC_NUMBER]” to see if “raw > Message > > > > metadata”feature is enabled; if not enabled, turn into old behaviour; > > > > 3. If enabled, read “ [RAW_METADATA_SIZE] “, then read > “[RAW_METADATA] > > ”. > > > > 4. Use protobuf to deserialize “[RAW_METADATA]”, and read > > > > “broker_timestamp” out. > > > > > > > > ##### c)) read an entry from BookKeeper and send it to the consumer > > > client > > > > > > > > The main logic happened in method `sendMessages` in class > > > > `org.apache.pulsar.broker.service.Consumer`, the change is similar to > > > above > > > > “seek by time”, after read BookKeeper Entry, we need to filter out > > > > "RAW_METADATA" related part, and only return “[HEADERS_AND_PAYLOAD]” > > to > > > > consumer client. > > > > > > > > ### 3 Summary of changes that need > > > > > > > > Here is a summary of above changes that related: > > > > > > > > 1. Add raw Message metadata protobuf > > > > > > > > ```protobuf > > > > message RawMessageMetadata { > > > > optional uint64 broker_timestamp = 1; > > > > } > > > > ``` > > > > > > > > 2. change how produced message is saved in bookkeeper: > > > > `org.apache.pulsar.broker.service.Producer` > > > > > > > > 3. change how message is seek-by-time: > > > > > > > > > > > > > > `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages` > > > > > > > > 4. change how message send back to Consumer > > > > `org.apache.pulsar.broker.service.Consumer` > > > > > > > > other changes: > > > > > > > > 1. A broker config to enable this raw Message metadata feature. e.g. > > > > “useBrokerTimestampsForMessageSeek”. > > > > 2. besides change > > > > > > > > > > `org.apache.pulsar.broker.service.persistent.PersistentMessageFinder.findMessages`, > > > > search other places, > > > > which called `MessageImpl.deserialize(entry.getDataBuffer())` and > > > > `msg.getPublishTime()`, such as > > > `PersistentTopic.isOldestMessageExpired()` > > > > to read message publish-time from new “raw Message metadata” instead > of > > > > deserialize the whole BookKeeper Entry into Message and call > > > > `Message.getPublishTime()` if this feature enabled. > > > > > > > > > > > > ### 4. Compatibility, Deprecation and Migration Plan > > > > > > > > For the first feature, since most of the changes are internally > changes > > > in > > > > brokers, and it is a new feature which doesn’t change pulsar’s wire > > > > protocol(between broker and client), and not changed public api. > There > > is > > > > no backward compatibility issue. It is a newly added feature. So > there > > is > > > > nothing to deprecate or migrate. > > > > > > > > In the future, we will consider the compatibility when implementing > the > > > > secondary feature of “continuous message sequence-Id” since we should > > > send > > > > messages with “sequence_id” to consumers. In this case, we will take > > > > different actions dependent on the data format, feature enable or not > > and > > > > consumer client version . Data is new format if it contains > > > > "[RAW_METADATA_MAGIC_NUMBER][RAW_METADATA_SIZE][RAW_METADATA]" and > > > consumer > > > > is a new version if it can process message contains > "[RAW_METADATA]". > > > > > > > > ### 5. Test Plan > > > > > > > > * Unit tests for each individual change: broker/client worked well > for > > > > produce/consume. > > > > * Integration tests or UT for end-to-end pipeline: multi producers > set > > > > message's time in random time order; seek-by-time could seek to the > > right > > > > message. > > > > * Load testing for ensuring performance: seek should be more quick. > > > > > > > > > > > > -- > > > > Best, > > > > Aloys. > > > > > > > > -- > Best, > Aloys. >