Makes sense +1 Enrico
Il giorno ven 26 ago 2022 alle ore 05:17 Aloys Zhang <aloyszh...@apache.org> ha scritto: > > Thanks for your suggestion. > `onMessagePublish` is more precise for understanding. > > PengHui Li <peng...@apache.org> 于2022年8月26日周五 10:30写道: > > > Looks good to me. > > I just have one suggestion. > > > > Maybe using `onMessagePublish` is more clear than > > `beforeMessagePersistent`. > > `beforeMessagePersistent` it is not very intuitive to understand the > > starting point > > of the interception position, maybe from the managed-ledger or before call > > bookkeeper > > client. > > > > onMessagePublish is clear that the broker received a new publish message > > from the producer. > > > > Thanks, > > Penghui > > > > On Thu, Aug 25, 2022 at 10:38 PM Aloys Zhang <aloyszh...@apache.org> > > wrote: > > > > > Hi Pulsar Community, > > > > > > This is a PIP discussion on extending the BrokerInterceptor. > > > Details can be found in the issue: > > > https://github.com/apache/pulsar/issues/17267 > > > > > > Copying the content here for convenience, any suggestions are welcome and > > > appreciated. > > > > > > Motivation > > > Currently, we have a reconciliation system that compares the message(or > > > entry) accounts produced and consumed at the minute level to indicate > > > whether all the data has been consumed completely by the consumers. > > > We want to track the message(or entry) properties (including the > > timestamp > > > and msgSize) for messages has been persistent to bookie and messages have > > > been consumed & acked. Also, we want to track the event of the producer > > and > > > consumer closed. > > > A good way to achieve this is using the BrokerInterceptor to interceptor > > > the message at certain points. So we want to expand some interfaces for > > > BrokerInterceptor like what #12858 did. > > > Goal > > > - Get timestamp and size(or other properties) for the entry which has > > been > > > persistent to bookie > > > - trace the producer or consumer closed > > > - Get the timestamp and size for entry that has been consumed and > > > acked(which is already supported by BrokerInterceptor) > > > API Changes > > > > > > interceptor message before persistent to bookie to get the timestamp and > > > size > > > > > > /** > > > * Intercept after a message before persistent to bookie. > > > * > > > * @param headersAndPayload entry's header and payload > > > * @param publishContext Publish Context > > > */ > > > default void beforeMessagePersistent(Producer producer, > > > ByteBuf headersAndPayload, > > > Topic.PublishContext > > > publishContext) { > > > > > > } > > > > > > Add interfaces for a producer or consumer closed > > > > > > /** > > > * Called by the broker when a producer is closed. > > > * > > > * @param cnx client Connection > > > * @param producer Consumer object > > > * @param metadata A map of metadata > > > */ > > > default void producerClosed(ServerCnx cnx, > > > Producer producer, > > > Map<String, String> metadata) { > > > } > > > > > > /** > > > * Called by the broker when a consumer is closed. > > > * > > > * @param cnx client Connection > > > * @param consumer Consumer object > > > * @param metadata A map of metadata > > > */ > > > default void consumerClosed(ServerCnx cnx, > > > Consumer consumer, > > > Map<String, String> metadata) { > > > } > > > > > > expand the beforeSendMessage to support consumer > > > > > > /** > > > * Intercept messages before sending them to the consumers. > > > * > > > * @param subscription pulsar subscription > > > * @param entry entry > > > * @param ackSet entry ack bitset. it is either <tt>null</tt> or an > > > array of long-based bitsets. > > > * @param msgMetadata message metadata. The message metadata will > > > be recycled after this call. > > > * @param consumer consumer. Consumer which entry are sent to. > > > */ > > > default void beforeSendMessage(Subscription subscription, > > > Entry entry, > > > long[] ackSet, > > > MessageMetadata msgMetadata, > > > Consumer consumer) { > > > } > > > > > > Implementation > > > > > > First, change the APIs as described above. > > > > > > Then implements all the new added interfaces and BrokerInterceptors.java > > > and BrokerInterceptorWithClassLoader . > > > > > > Set all interested properties(like timestamp and msgSize) to > > > MessagePublishContext in beforeMessagePersistent interface before > > > persistent entry to bookie. > > > > > > When after entry persistent to bookie, invoke > > > BrokerInterceptor.messageProduced() and get the properties from the > > > MessagePublishContext for reconciliation. > > > > > > For consumption, record the interested properties before sending to > > > consumer by beforeSendMessage, and then at the point f message acked, > > > getting all the properties for reconciliation. > > > > >