I have some questions about the EventProcessingResult: 1. What's the difference between FAILURE and NOT_ALLOWED? 2. If we need to return the `message`, which indicates the error IIUC, would it be better to replace the returned value with a checked exception?
Thanks, Yunze On Fri, Jan 13, 2023 at 12:36 PM Andrey Yegorov <ayego...@apache.org> wrote: > > Hi, > > I am starting discussion for PIP-241: TopicEventListener / topic events for > the BrokerService. > > PIP issue: https://github.com/apache/pulsar/issues/19224 > > ### Motivation > > Some Protocol Handlers may need to know about the topic-specific events to > update internal caches and/or state. > > These mechanisms will be useful also for core Pulsar components (like the > Transactions subsystem) and probably we would be able to simplify the > interaction between the internal components in the broker by using an > unified mechanism to handle the lifecycle of topics. > > Specific use cases: > > KOP keeps some state for the topic and needs to handle such cases as: > > - Topic Unloaded: release resources dedicated to the topic > - Topic Loaded: trigger loading of components tied to the partition > (GroupCoordinator, TransactionManager) > - Topic Deleted: remove any persistent state associated to the topic that > is stored in additional side system topics > - Topic Created: the same as “deleted” (ensure that there is no state on > system topics related to the new topic) > > > ### Goal > > This PIP defines a set of events needed for the protocol handlers (and for > internal broker components) to get notifications about topic-specific > events as seen by BrokerService. PIP outlines changes needed for protocol > handlers to keep/cache state consistent with BrokerService’s. > > The changes should not affect Pulsar running without protocol handlers or > with protocol handlers that do not rely on the new events. > > > ### API Changes > > ```java > /** > * Listener for the Topic events. > */ > @InterfaceAudience.LimitedPrivate > @InterfaceStability.Evolving > public interface TopicEventsListener { > > /** > * Types of events currently supported. > * create/load/unload/delete > */ > enum TopicEvent { > // create events included into load events > CREATE, > LOAD, > UNLOAD, > DELETE, > } > > /** > * Stages of events currently supported. > * before starting the event/successful completion/failed completion > */ > enum EventStage { > BEFORE, > SUCCESS, > FAILURE > } > > /** > * Outcome of the listener. > * Ignored for events at final stages (SUCCESS/FAILURE), > * > */ > enum EventProcessingOutcome { > OK, > FAILURE, > NOT_ALLOWED > } > > /** > * POJO for event processing result (outcome, message) > */ > @ToString(includeFieldNames=true) > @Data(staticConstructor="of") > class EventProcessingResult { > private final EventProcessingOutcome outcome; > private final String message; > } > > /** > * Handle topic event. > * Choice of the thread / maintenance of the thread pool is up to the > event handlers. > * @param topicName - name of the topic > * @param event - TopicEvent > * @param stage - EventStage > * @param t - exception in case of FAILURE, if present/known > * @return - EventProcessingResult. > * EventProcessingResult.EventProcessingOutcome != OK indicates > request to cancel > * event at BEFORE stage. > */ > EventProcessingResult handleEvent(String topicName, TopicEvent event, > EventStage stage, Throwable t); > } > ``` > > BrokerService: > ```java > public void addTopicEventListener(TopicEventsListener... listeners) > > public void removeTopicEventListener(TopicEventsListener... listeners) > ``` > > ### Implementation > > See PR for the proposed implementation. > https://github.com/apache/pulsar/pull/19153 > > > ### Alternatives > > Add new methods to the BrokerInterceptor API > > -- > Andrey