Thanks for your suggestion. `onMessagePublish` is more precise for understanding.
PengHui Li <peng...@apache.org> 于2022年8月26日周五 10:30写道: > Looks good to me. > I just have one suggestion. > > Maybe using `onMessagePublish` is more clear than > `beforeMessagePersistent`. > `beforeMessagePersistent` it is not very intuitive to understand the > starting point > of the interception position, maybe from the managed-ledger or before call > bookkeeper > client. > > onMessagePublish is clear that the broker received a new publish message > from the producer. > > Thanks, > Penghui > > On Thu, Aug 25, 2022 at 10:38 PM Aloys Zhang <aloyszh...@apache.org> > wrote: > > > Hi Pulsar Community, > > > > This is a PIP discussion on extending the BrokerInterceptor. > > Details can be found in the issue: > > https://github.com/apache/pulsar/issues/17267 > > > > Copying the content here for convenience, any suggestions are welcome and > > appreciated. > > > > Motivation > > Currently, we have a reconciliation system that compares the message(or > > entry) accounts produced and consumed at the minute level to indicate > > whether all the data has been consumed completely by the consumers. > > We want to track the message(or entry) properties (including the > timestamp > > and msgSize) for messages has been persistent to bookie and messages have > > been consumed & acked. Also, we want to track the event of the producer > and > > consumer closed. > > A good way to achieve this is using the BrokerInterceptor to interceptor > > the message at certain points. So we want to expand some interfaces for > > BrokerInterceptor like what #12858 did. > > Goal > > - Get timestamp and size(or other properties) for the entry which has > been > > persistent to bookie > > - trace the producer or consumer closed > > - Get the timestamp and size for entry that has been consumed and > > acked(which is already supported by BrokerInterceptor) > > API Changes > > > > interceptor message before persistent to bookie to get the timestamp and > > size > > > > /** > > * Intercept after a message before persistent to bookie. > > * > > * @param headersAndPayload entry's header and payload > > * @param publishContext Publish Context > > */ > > default void beforeMessagePersistent(Producer producer, > > ByteBuf headersAndPayload, > > Topic.PublishContext > > publishContext) { > > > > } > > > > Add interfaces for a producer or consumer closed > > > > /** > > * Called by the broker when a producer is closed. > > * > > * @param cnx client Connection > > * @param producer Consumer object > > * @param metadata A map of metadata > > */ > > default void producerClosed(ServerCnx cnx, > > Producer producer, > > Map<String, String> metadata) { > > } > > > > /** > > * Called by the broker when a consumer is closed. > > * > > * @param cnx client Connection > > * @param consumer Consumer object > > * @param metadata A map of metadata > > */ > > default void consumerClosed(ServerCnx cnx, > > Consumer consumer, > > Map<String, String> metadata) { > > } > > > > expand the beforeSendMessage to support consumer > > > > /** > > * Intercept messages before sending them to the consumers. > > * > > * @param subscription pulsar subscription > > * @param entry entry > > * @param ackSet entry ack bitset. it is either <tt>null</tt> or an > > array of long-based bitsets. > > * @param msgMetadata message metadata. The message metadata will > > be recycled after this call. > > * @param consumer consumer. Consumer which entry are sent to. > > */ > > default void beforeSendMessage(Subscription subscription, > > Entry entry, > > long[] ackSet, > > MessageMetadata msgMetadata, > > Consumer consumer) { > > } > > > > Implementation > > > > First, change the APIs as described above. > > > > Then implements all the new added interfaces and BrokerInterceptors.java > > and BrokerInterceptorWithClassLoader . > > > > Set all interested properties(like timestamp and msgSize) to > > MessagePublishContext in beforeMessagePersistent interface before > > persistent entry to bookie. > > > > When after entry persistent to bookie, invoke > > BrokerInterceptor.messageProduced() and get the properties from the > > MessagePublishContext for reconciliation. > > > > For consumption, record the interested properties before sending to > > consumer by beforeSendMessage, and then at the point f message acked, > > getting all the properties for reconciliation. > > >