Thanks for your suggestion.
`onMessagePublish` is more precise for understanding.

PengHui Li <peng...@apache.org> 于2022年8月26日周五 10:30写道:

> Looks good to me.
> I just have one suggestion.
>
> Maybe using `onMessagePublish` is more clear than
> `beforeMessagePersistent`.
> `beforeMessagePersistent` it is not very intuitive to understand the
> starting point
> of the interception position, maybe from the managed-ledger or before call
> bookkeeper
> client.
>
> onMessagePublish is clear that the broker received a new publish message
> from the producer.
>
> Thanks,
> Penghui
>
> On Thu, Aug 25, 2022 at 10:38 PM Aloys Zhang <aloyszh...@apache.org>
> wrote:
>
> > Hi Pulsar Community,
> >
> > This is a PIP discussion on extending the BrokerInterceptor.
> > Details can be found in the issue:
> > https://github.com/apache/pulsar/issues/17267
> >
> > Copying the content here for convenience, any suggestions are welcome and
> > appreciated.
> >
> > Motivation
> > Currently, we have a reconciliation system that compares the message(or
> > entry) accounts produced and consumed at the minute level to indicate
> > whether all the data has been consumed completely by the consumers.
> > We want to track the message(or entry) properties (including the
> timestamp
> > and msgSize) for messages has been persistent to bookie and messages have
> > been consumed & acked. Also, we want to track the event of the producer
> and
> > consumer closed.
> > A good way to achieve this is using the BrokerInterceptor to interceptor
> > the message at certain points. So we want to expand some interfaces for
> > BrokerInterceptor like what #12858 did.
> > Goal
> > - Get timestamp and size(or other properties) for the entry which has
> been
> > persistent to bookie
> > - trace the producer or consumer closed
> > - Get the timestamp and size for entry that has been consumed and
> > acked(which is already supported by BrokerInterceptor)
> > API Changes
> >
> > interceptor message before persistent to bookie to get the timestamp and
> > size
> >
> > /**
> >     * Intercept after a message before persistent to bookie.
> >     *
> >     * @param headersAndPayload entry's header and payload
> >     * @param publishContext Publish Context
> >     */
> >    default void beforeMessagePersistent(Producer producer,
> >                                         ByteBuf headersAndPayload,
> >                                         Topic.PublishContext
> > publishContext) {
> >
> >   }
> >
> > Add interfaces for a producer or consumer closed
> >
> > /**
> >     * Called by the broker when a producer is closed.
> >     *
> >     * @param cnx     client Connection
> >     * @param producer Consumer object
> >     * @param metadata A map of metadata
> >     */
> >    default void producerClosed(ServerCnx cnx,
> >                                Producer producer,
> >                                Map<String, String> metadata) {
> >   }
> >
> > /**
> >     * Called by the broker when a consumer is closed.
> >     *
> >     * @param cnx client Connection
> >     * @param consumer Consumer object
> >     * @param metadata A map of metadata
> >     */
> >    default void consumerClosed(ServerCnx cnx,
> >                                Consumer consumer,
> >                                Map<String, String> metadata) {
> >   }
> >
> > expand the beforeSendMessage to support consumer
> >
> > /**
> >     * Intercept messages before sending them to the consumers.
> >     *
> >     * @param subscription pulsar subscription
> >     * @param entry entry
> >     * @param ackSet entry ack bitset. it is either <tt>null</tt> or an
> > array of long-based bitsets.
> >     * @param msgMetadata message metadata. The message metadata will
> > be recycled after this call.
> >     * @param consumer consumer. Consumer which entry are sent to.
> >     */
> >    default void beforeSendMessage(Subscription subscription,
> >                                   Entry entry,
> >                                   long[] ackSet,
> >                                   MessageMetadata msgMetadata,
> >                                   Consumer consumer) {
> >   }
> >
> > Implementation
> >
> > First, change the APIs as described above.
> >
> > Then implements all the new added interfaces and BrokerInterceptors.java
> > and BrokerInterceptorWithClassLoader .
> >
> > Set all interested properties(like timestamp and msgSize) to
> > MessagePublishContext in beforeMessagePersistent interface before
> > persistent entry to bookie.
> >
> > When after entry persistent to bookie, invoke
> > BrokerInterceptor.messageProduced() and get the properties from the
> > MessagePublishContext for reconciliation.
> >
> > For consumption, record the interested properties before sending to
> > consumer by beforeSendMessage, and then at the point f message acked,
> > getting all the properties for reconciliation.
> >
>

Reply via email to