Looks good to me.
I just have one suggestion.

Maybe using `onMessagePublish` is more clear than `beforeMessagePersistent`.
`beforeMessagePersistent` it is not very intuitive to understand the
starting point
of the interception position, maybe from the managed-ledger or before call
bookkeeper
client.

onMessagePublish is clear that the broker received a new publish message
from the producer.

Thanks,
Penghui

On Thu, Aug 25, 2022 at 10:38 PM Aloys Zhang <aloyszh...@apache.org> wrote:

> Hi Pulsar Community,
>
> This is a PIP discussion on extending the BrokerInterceptor.
> Details can be found in the issue:
> https://github.com/apache/pulsar/issues/17267
>
> Copying the content here for convenience, any suggestions are welcome and
> appreciated.
>
> Motivation
> Currently, we have a reconciliation system that compares the message(or
> entry) accounts produced and consumed at the minute level to indicate
> whether all the data has been consumed completely by the consumers.
> We want to track the message(or entry) properties (including the timestamp
> and msgSize) for messages has been persistent to bookie and messages have
> been consumed & acked. Also, we want to track the event of the producer and
> consumer closed.
> A good way to achieve this is using the BrokerInterceptor to interceptor
> the message at certain points. So we want to expand some interfaces for
> BrokerInterceptor like what #12858 did.
> Goal
> - Get timestamp and size(or other properties) for the entry which has been
> persistent to bookie
> - trace the producer or consumer closed
> - Get the timestamp and size for entry that has been consumed and
> acked(which is already supported by BrokerInterceptor)
> API Changes
>
> interceptor message before persistent to bookie to get the timestamp and
> size
>
> /**
>     * Intercept after a message before persistent to bookie.
>     *
>     * @param headersAndPayload entry's header and payload
>     * @param publishContext Publish Context
>     */
>    default void beforeMessagePersistent(Producer producer,
>                                         ByteBuf headersAndPayload,
>                                         Topic.PublishContext
> publishContext) {
>
>   }
>
> Add interfaces for a producer or consumer closed
>
> /**
>     * Called by the broker when a producer is closed.
>     *
>     * @param cnx     client Connection
>     * @param producer Consumer object
>     * @param metadata A map of metadata
>     */
>    default void producerClosed(ServerCnx cnx,
>                                Producer producer,
>                                Map<String, String> metadata) {
>   }
>
> /**
>     * Called by the broker when a consumer is closed.
>     *
>     * @param cnx client Connection
>     * @param consumer Consumer object
>     * @param metadata A map of metadata
>     */
>    default void consumerClosed(ServerCnx cnx,
>                                Consumer consumer,
>                                Map<String, String> metadata) {
>   }
>
> expand the beforeSendMessage to support consumer
>
> /**
>     * Intercept messages before sending them to the consumers.
>     *
>     * @param subscription pulsar subscription
>     * @param entry entry
>     * @param ackSet entry ack bitset. it is either <tt>null</tt> or an
> array of long-based bitsets.
>     * @param msgMetadata message metadata. The message metadata will
> be recycled after this call.
>     * @param consumer consumer. Consumer which entry are sent to.
>     */
>    default void beforeSendMessage(Subscription subscription,
>                                   Entry entry,
>                                   long[] ackSet,
>                                   MessageMetadata msgMetadata,
>                                   Consumer consumer) {
>   }
>
> Implementation
>
> First, change the APIs as described above.
>
> Then implements all the new added interfaces and BrokerInterceptors.java
> and BrokerInterceptorWithClassLoader .
>
> Set all interested properties(like timestamp and msgSize) to
> MessagePublishContext in beforeMessagePersistent interface before
> persistent entry to bookie.
>
> When after entry persistent to bookie, invoke
> BrokerInterceptor.messageProduced() and get the properties from the
> MessagePublishContext for reconciliation.
>
> For consumption, record the interested properties before sending to
> consumer by beforeSendMessage, and then at the point f message acked,
> getting all the properties for reconciliation.
>

Reply via email to