Looks good to me. I just have one suggestion. Maybe using `onMessagePublish` is more clear than `beforeMessagePersistent`. `beforeMessagePersistent` it is not very intuitive to understand the starting point of the interception position, maybe from the managed-ledger or before call bookkeeper client.
onMessagePublish is clear that the broker received a new publish message from the producer. Thanks, Penghui On Thu, Aug 25, 2022 at 10:38 PM Aloys Zhang <aloyszh...@apache.org> wrote: > Hi Pulsar Community, > > This is a PIP discussion on extending the BrokerInterceptor. > Details can be found in the issue: > https://github.com/apache/pulsar/issues/17267 > > Copying the content here for convenience, any suggestions are welcome and > appreciated. > > Motivation > Currently, we have a reconciliation system that compares the message(or > entry) accounts produced and consumed at the minute level to indicate > whether all the data has been consumed completely by the consumers. > We want to track the message(or entry) properties (including the timestamp > and msgSize) for messages has been persistent to bookie and messages have > been consumed & acked. Also, we want to track the event of the producer and > consumer closed. > A good way to achieve this is using the BrokerInterceptor to interceptor > the message at certain points. So we want to expand some interfaces for > BrokerInterceptor like what #12858 did. > Goal > - Get timestamp and size(or other properties) for the entry which has been > persistent to bookie > - trace the producer or consumer closed > - Get the timestamp and size for entry that has been consumed and > acked(which is already supported by BrokerInterceptor) > API Changes > > interceptor message before persistent to bookie to get the timestamp and > size > > /** > * Intercept after a message before persistent to bookie. > * > * @param headersAndPayload entry's header and payload > * @param publishContext Publish Context > */ > default void beforeMessagePersistent(Producer producer, > ByteBuf headersAndPayload, > Topic.PublishContext > publishContext) { > > } > > Add interfaces for a producer or consumer closed > > /** > * Called by the broker when a producer is closed. > * > * @param cnx client Connection > * @param producer Consumer object > * @param metadata A map of metadata > */ > default void producerClosed(ServerCnx cnx, > Producer producer, > Map<String, String> metadata) { > } > > /** > * Called by the broker when a consumer is closed. > * > * @param cnx client Connection > * @param consumer Consumer object > * @param metadata A map of metadata > */ > default void consumerClosed(ServerCnx cnx, > Consumer consumer, > Map<String, String> metadata) { > } > > expand the beforeSendMessage to support consumer > > /** > * Intercept messages before sending them to the consumers. > * > * @param subscription pulsar subscription > * @param entry entry > * @param ackSet entry ack bitset. it is either <tt>null</tt> or an > array of long-based bitsets. > * @param msgMetadata message metadata. The message metadata will > be recycled after this call. > * @param consumer consumer. Consumer which entry are sent to. > */ > default void beforeSendMessage(Subscription subscription, > Entry entry, > long[] ackSet, > MessageMetadata msgMetadata, > Consumer consumer) { > } > > Implementation > > First, change the APIs as described above. > > Then implements all the new added interfaces and BrokerInterceptors.java > and BrokerInterceptorWithClassLoader . > > Set all interested properties(like timestamp and msgSize) to > MessagePublishContext in beforeMessagePersistent interface before > persistent entry to bookie. > > When after entry persistent to bookie, invoke > BrokerInterceptor.messageProduced() and get the properties from the > MessagePublishContext for reconciliation. > > For consumption, record the interested properties before sending to > consumer by beforeSendMessage, and then at the point f message acked, > getting all the properties for reconciliation. >