Hi Andrey,

This PIP makes sense to me.
A small question: Is it better to define `TopicEvent` and `EventStage`
outside of  `TopicEventsListener`?

Thanks,
Haiting


On Tue, Jan 17, 2023 at 3:51 PM Enrico Olivelli <eolive...@gmail.com> wrote:
>
> I support this PIP.
> This will allow Protocol Handlers to have better knowledge of what's
> happening on the broker.
>
> I hope that someday we will be able to refactor Broker internals using
> this feature and decouple the components.
>
> Enrico
>
> Il giorno lun 16 gen 2023 alle ore 10:27 Yunze Xu
> <y...@streamnative.io.invalid> ha scritto:
> >
> > +1 to me now.
> >
> > Thanks,
> > Yunze
> >
> > On Sat, Jan 14, 2023 at 1:29 AM Andrey Yegorov
> > <andrey.yego...@datastax.com> wrote:
> > >
> > > My bad, I pasted the interface code from a branch with experiment to 
> > > cancel
> > > events. This is no longer needed.
> > > EventProcessingResult result is irrelevant, I updated the PIP.
> > >
> > > It is not in the PR.
> > >
> > > On Fri, Jan 13, 2023 at 4:08 AM Yunze Xu <y...@streamnative.io.invalid>
> > > wrote:
> > >
> > > > I have some questions about the EventProcessingResult:
> > > > 1. What's the difference between FAILURE and NOT_ALLOWED?
> > > > 2. If we need to return the `message`, which indicates the error IIUC,
> > > > would it be better to replace the returned value with a checked
> > > > exception?
> > > >
> > > > Thanks,
> > > > Yunze
> > > >
> > > > On Fri, Jan 13, 2023 at 12:36 PM Andrey Yegorov <ayego...@apache.org>
> > > > wrote:
> > > > >
> > > > > Hi,
> > > > >
> > > > > I am starting discussion for PIP-241: TopicEventListener / topic 
> > > > > events
> > > > for
> > > > > the BrokerService.
> > > > >
> > > > > PIP issue: https://github.com/apache/pulsar/issues/19224
> > > > >
> > > > > ### Motivation
> > > > >
> > > > > Some Protocol Handlers may need to know about the topic-specific 
> > > > > events
> > > > to
> > > > > update internal caches and/or state.
> > > > >
> > > > > These mechanisms will be useful also for core Pulsar components (like 
> > > > > the
> > > > > Transactions subsystem) and probably we would be able to simplify the
> > > > > interaction between the internal components in the broker by using an
> > > > > unified mechanism to handle the lifecycle of topics.
> > > > >
> > > > > Specific use cases:
> > > > >
> > > > > KOP keeps some state for the topic and needs to handle such cases as:
> > > > >
> > > > > - Topic Unloaded: release resources dedicated to the topic
> > > > > - Topic Loaded: trigger loading of components tied to the partition
> > > > > (GroupCoordinator, TransactionManager)
> > > > > - Topic Deleted: remove any persistent state associated to the topic 
> > > > > that
> > > > > is stored in additional side system topics
> > > > > - Topic Created: the same as “deleted” (ensure that there is no state 
> > > > > on
> > > > > system topics related to the new topic)
> > > > >
> > > > >
> > > > > ### Goal
> > > > >
> > > > > This PIP defines a set of events needed for the protocol handlers (and
> > > > for
> > > > > internal broker components) to get notifications about topic-specific
> > > > > events as seen by BrokerService. PIP outlines changes needed for 
> > > > > protocol
> > > > > handlers to keep/cache state consistent with BrokerService’s.
> > > > >
> > > > > The changes should not affect Pulsar running without protocol 
> > > > > handlers or
> > > > > with protocol handlers that do not rely on the new events.
> > > > >
> > > > >
> > > > > ### API Changes
> > > > >
> > > > > ```java
> > > > > /**
> > > > >  * Listener for the Topic events.
> > > > >  */
> > > > > @InterfaceAudience.LimitedPrivate
> > > > > @InterfaceStability.Evolving
> > > > > public interface TopicEventsListener {
> > > > >
> > > > >     /**
> > > > >      * Types of events currently supported.
> > > > >      *  create/load/unload/delete
> > > > >      */
> > > > >     enum TopicEvent {
> > > > >         // create events included into load events
> > > > >         CREATE,
> > > > >         LOAD,
> > > > >         UNLOAD,
> > > > >         DELETE,
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * Stages of events currently supported.
> > > > >      *  before starting the event/successful completion/failed 
> > > > > completion
> > > > >      */
> > > > >     enum EventStage {
> > > > >         BEFORE,
> > > > >         SUCCESS,
> > > > >         FAILURE
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * Outcome of the listener.
> > > > >      * Ignored for events at final stages (SUCCESS/FAILURE),
> > > > >      *
> > > > >      */
> > > > >     enum EventProcessingOutcome {
> > > > >         OK,
> > > > >         FAILURE,
> > > > >         NOT_ALLOWED
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * POJO for event processing result (outcome, message)
> > > > >      */
> > > > >     @ToString(includeFieldNames=true)
> > > > >     @Data(staticConstructor="of")
> > > > >     class EventProcessingResult {
> > > > >         private final EventProcessingOutcome outcome;
> > > > >         private final String message;
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * Handle topic event.
> > > > >      * Choice of the thread / maintenance of the thread pool is up to 
> > > > > the
> > > > > event handlers.
> > > > >      * @param topicName - name of the topic
> > > > >      * @param event - TopicEvent
> > > > >      * @param stage - EventStage
> > > > >      * @param t - exception in case of FAILURE, if present/known
> > > > >      * @return - EventProcessingResult.
> > > > >      *      EventProcessingResult.EventProcessingOutcome != OK 
> > > > > indicates
> > > > > request to cancel
> > > > >      *           event at BEFORE stage.
> > > > >      */
> > > > >     EventProcessingResult handleEvent(String topicName, TopicEvent 
> > > > > event,
> > > > > EventStage stage, Throwable t);
> > > > > }
> > > > > ```
> > > > >
> > > > > BrokerService:
> > > > > ```java
> > > > >     public void addTopicEventListener(TopicEventsListener... 
> > > > > listeners)
> > > > >
> > > > >     public void removeTopicEventListener(TopicEventsListener...
> > > > listeners)
> > > > > ```
> > > > >
> > > > > ### Implementation
> > > > >
> > > > > See PR for the proposed implementation.
> > > > > https://github.com/apache/pulsar/pull/19153
> > > > >
> > > > >
> > > > > ### Alternatives
> > > > >
> > > > > Add new methods to the BrokerInterceptor API
> > > > >
> > > > > --
> > > > > Andrey
> > > >
> > >
> > >
> > > --
> > > Andrey Yegorov

Reply via email to