Hi Andrey, This PIP makes sense to me. A small question: Is it better to define `TopicEvent` and `EventStage` outside of `TopicEventsListener`?
Thanks, Haiting On Tue, Jan 17, 2023 at 3:51 PM Enrico Olivelli <eolive...@gmail.com> wrote: > > I support this PIP. > This will allow Protocol Handlers to have better knowledge of what's > happening on the broker. > > I hope that someday we will be able to refactor Broker internals using > this feature and decouple the components. > > Enrico > > Il giorno lun 16 gen 2023 alle ore 10:27 Yunze Xu > <y...@streamnative.io.invalid> ha scritto: > > > > +1 to me now. > > > > Thanks, > > Yunze > > > > On Sat, Jan 14, 2023 at 1:29 AM Andrey Yegorov > > <andrey.yego...@datastax.com> wrote: > > > > > > My bad, I pasted the interface code from a branch with experiment to > > > cancel > > > events. This is no longer needed. > > > EventProcessingResult result is irrelevant, I updated the PIP. > > > > > > It is not in the PR. > > > > > > On Fri, Jan 13, 2023 at 4:08 AM Yunze Xu <y...@streamnative.io.invalid> > > > wrote: > > > > > > > I have some questions about the EventProcessingResult: > > > > 1. What's the difference between FAILURE and NOT_ALLOWED? > > > > 2. If we need to return the `message`, which indicates the error IIUC, > > > > would it be better to replace the returned value with a checked > > > > exception? > > > > > > > > Thanks, > > > > Yunze > > > > > > > > On Fri, Jan 13, 2023 at 12:36 PM Andrey Yegorov <ayego...@apache.org> > > > > wrote: > > > > > > > > > > Hi, > > > > > > > > > > I am starting discussion for PIP-241: TopicEventListener / topic > > > > > events > > > > for > > > > > the BrokerService. > > > > > > > > > > PIP issue: https://github.com/apache/pulsar/issues/19224 > > > > > > > > > > ### Motivation > > > > > > > > > > Some Protocol Handlers may need to know about the topic-specific > > > > > events > > > > to > > > > > update internal caches and/or state. > > > > > > > > > > These mechanisms will be useful also for core Pulsar components (like > > > > > the > > > > > Transactions subsystem) and probably we would be able to simplify the > > > > > interaction between the internal components in the broker by using an > > > > > unified mechanism to handle the lifecycle of topics. > > > > > > > > > > Specific use cases: > > > > > > > > > > KOP keeps some state for the topic and needs to handle such cases as: > > > > > > > > > > - Topic Unloaded: release resources dedicated to the topic > > > > > - Topic Loaded: trigger loading of components tied to the partition > > > > > (GroupCoordinator, TransactionManager) > > > > > - Topic Deleted: remove any persistent state associated to the topic > > > > > that > > > > > is stored in additional side system topics > > > > > - Topic Created: the same as “deleted” (ensure that there is no state > > > > > on > > > > > system topics related to the new topic) > > > > > > > > > > > > > > > ### Goal > > > > > > > > > > This PIP defines a set of events needed for the protocol handlers (and > > > > for > > > > > internal broker components) to get notifications about topic-specific > > > > > events as seen by BrokerService. PIP outlines changes needed for > > > > > protocol > > > > > handlers to keep/cache state consistent with BrokerService’s. > > > > > > > > > > The changes should not affect Pulsar running without protocol > > > > > handlers or > > > > > with protocol handlers that do not rely on the new events. > > > > > > > > > > > > > > > ### API Changes > > > > > > > > > > ```java > > > > > /** > > > > > * Listener for the Topic events. > > > > > */ > > > > > @InterfaceAudience.LimitedPrivate > > > > > @InterfaceStability.Evolving > > > > > public interface TopicEventsListener { > > > > > > > > > > /** > > > > > * Types of events currently supported. > > > > > * create/load/unload/delete > > > > > */ > > > > > enum TopicEvent { > > > > > // create events included into load events > > > > > CREATE, > > > > > LOAD, > > > > > UNLOAD, > > > > > DELETE, > > > > > } > > > > > > > > > > /** > > > > > * Stages of events currently supported. > > > > > * before starting the event/successful completion/failed > > > > > completion > > > > > */ > > > > > enum EventStage { > > > > > BEFORE, > > > > > SUCCESS, > > > > > FAILURE > > > > > } > > > > > > > > > > /** > > > > > * Outcome of the listener. > > > > > * Ignored for events at final stages (SUCCESS/FAILURE), > > > > > * > > > > > */ > > > > > enum EventProcessingOutcome { > > > > > OK, > > > > > FAILURE, > > > > > NOT_ALLOWED > > > > > } > > > > > > > > > > /** > > > > > * POJO for event processing result (outcome, message) > > > > > */ > > > > > @ToString(includeFieldNames=true) > > > > > @Data(staticConstructor="of") > > > > > class EventProcessingResult { > > > > > private final EventProcessingOutcome outcome; > > > > > private final String message; > > > > > } > > > > > > > > > > /** > > > > > * Handle topic event. > > > > > * Choice of the thread / maintenance of the thread pool is up to > > > > > the > > > > > event handlers. > > > > > * @param topicName - name of the topic > > > > > * @param event - TopicEvent > > > > > * @param stage - EventStage > > > > > * @param t - exception in case of FAILURE, if present/known > > > > > * @return - EventProcessingResult. > > > > > * EventProcessingResult.EventProcessingOutcome != OK > > > > > indicates > > > > > request to cancel > > > > > * event at BEFORE stage. > > > > > */ > > > > > EventProcessingResult handleEvent(String topicName, TopicEvent > > > > > event, > > > > > EventStage stage, Throwable t); > > > > > } > > > > > ``` > > > > > > > > > > BrokerService: > > > > > ```java > > > > > public void addTopicEventListener(TopicEventsListener... > > > > > listeners) > > > > > > > > > > public void removeTopicEventListener(TopicEventsListener... > > > > listeners) > > > > > ``` > > > > > > > > > > ### Implementation > > > > > > > > > > See PR for the proposed implementation. > > > > > https://github.com/apache/pulsar/pull/19153 > > > > > > > > > > > > > > > ### Alternatives > > > > > > > > > > Add new methods to the BrokerInterceptor API > > > > > > > > > > -- > > > > > Andrey > > > > > > > > > > > > > -- > > > Andrey Yegorov