Makes sense
+1

Enrico

Il giorno ven 26 ago 2022 alle ore 05:17 Aloys Zhang
<aloyszh...@apache.org> ha scritto:
>
> Thanks for your suggestion.
> `onMessagePublish` is more precise for understanding.
>
> PengHui Li <peng...@apache.org> 于2022年8月26日周五 10:30写道:
>
> > Looks good to me.
> > I just have one suggestion.
> >
> > Maybe using `onMessagePublish` is more clear than
> > `beforeMessagePersistent`.
> > `beforeMessagePersistent` it is not very intuitive to understand the
> > starting point
> > of the interception position, maybe from the managed-ledger or before call
> > bookkeeper
> > client.
> >
> > onMessagePublish is clear that the broker received a new publish message
> > from the producer.
> >
> > Thanks,
> > Penghui
> >
> > On Thu, Aug 25, 2022 at 10:38 PM Aloys Zhang <aloyszh...@apache.org>
> > wrote:
> >
> > > Hi Pulsar Community,
> > >
> > > This is a PIP discussion on extending the BrokerInterceptor.
> > > Details can be found in the issue:
> > > https://github.com/apache/pulsar/issues/17267
> > >
> > > Copying the content here for convenience, any suggestions are welcome and
> > > appreciated.
> > >
> > > Motivation
> > > Currently, we have a reconciliation system that compares the message(or
> > > entry) accounts produced and consumed at the minute level to indicate
> > > whether all the data has been consumed completely by the consumers.
> > > We want to track the message(or entry) properties (including the
> > timestamp
> > > and msgSize) for messages has been persistent to bookie and messages have
> > > been consumed & acked. Also, we want to track the event of the producer
> > and
> > > consumer closed.
> > > A good way to achieve this is using the BrokerInterceptor to interceptor
> > > the message at certain points. So we want to expand some interfaces for
> > > BrokerInterceptor like what #12858 did.
> > > Goal
> > > - Get timestamp and size(or other properties) for the entry which has
> > been
> > > persistent to bookie
> > > - trace the producer or consumer closed
> > > - Get the timestamp and size for entry that has been consumed and
> > > acked(which is already supported by BrokerInterceptor)
> > > API Changes
> > >
> > > interceptor message before persistent to bookie to get the timestamp and
> > > size
> > >
> > > /**
> > >     * Intercept after a message before persistent to bookie.
> > >     *
> > >     * @param headersAndPayload entry's header and payload
> > >     * @param publishContext Publish Context
> > >     */
> > >    default void beforeMessagePersistent(Producer producer,
> > >                                         ByteBuf headersAndPayload,
> > >                                         Topic.PublishContext
> > > publishContext) {
> > >
> > >   }
> > >
> > > Add interfaces for a producer or consumer closed
> > >
> > > /**
> > >     * Called by the broker when a producer is closed.
> > >     *
> > >     * @param cnx     client Connection
> > >     * @param producer Consumer object
> > >     * @param metadata A map of metadata
> > >     */
> > >    default void producerClosed(ServerCnx cnx,
> > >                                Producer producer,
> > >                                Map<String, String> metadata) {
> > >   }
> > >
> > > /**
> > >     * Called by the broker when a consumer is closed.
> > >     *
> > >     * @param cnx client Connection
> > >     * @param consumer Consumer object
> > >     * @param metadata A map of metadata
> > >     */
> > >    default void consumerClosed(ServerCnx cnx,
> > >                                Consumer consumer,
> > >                                Map<String, String> metadata) {
> > >   }
> > >
> > > expand the beforeSendMessage to support consumer
> > >
> > > /**
> > >     * Intercept messages before sending them to the consumers.
> > >     *
> > >     * @param subscription pulsar subscription
> > >     * @param entry entry
> > >     * @param ackSet entry ack bitset. it is either <tt>null</tt> or an
> > > array of long-based bitsets.
> > >     * @param msgMetadata message metadata. The message metadata will
> > > be recycled after this call.
> > >     * @param consumer consumer. Consumer which entry are sent to.
> > >     */
> > >    default void beforeSendMessage(Subscription subscription,
> > >                                   Entry entry,
> > >                                   long[] ackSet,
> > >                                   MessageMetadata msgMetadata,
> > >                                   Consumer consumer) {
> > >   }
> > >
> > > Implementation
> > >
> > > First, change the APIs as described above.
> > >
> > > Then implements all the new added interfaces and BrokerInterceptors.java
> > > and BrokerInterceptorWithClassLoader .
> > >
> > > Set all interested properties(like timestamp and msgSize) to
> > > MessagePublishContext in beforeMessagePersistent interface before
> > > persistent entry to bookie.
> > >
> > > When after entry persistent to bookie, invoke
> > > BrokerInterceptor.messageProduced() and get the properties from the
> > > MessagePublishContext for reconciliation.
> > >
> > > For consumption, record the interested properties before sending to
> > > consumer by beforeSendMessage, and then at the point f message acked,
> > > getting all the properties for reconciliation.
> > >
> >

Reply via email to