Hi, I am starting discussion for PIP-241: TopicEventListener / topic events for the BrokerService.
PIP issue: https://github.com/apache/pulsar/issues/19224 ### Motivation Some Protocol Handlers may need to know about the topic-specific events to update internal caches and/or state. These mechanisms will be useful also for core Pulsar components (like the Transactions subsystem) and probably we would be able to simplify the interaction between the internal components in the broker by using an unified mechanism to handle the lifecycle of topics. Specific use cases: KOP keeps some state for the topic and needs to handle such cases as: - Topic Unloaded: release resources dedicated to the topic - Topic Loaded: trigger loading of components tied to the partition (GroupCoordinator, TransactionManager) - Topic Deleted: remove any persistent state associated to the topic that is stored in additional side system topics - Topic Created: the same as “deleted” (ensure that there is no state on system topics related to the new topic) ### Goal This PIP defines a set of events needed for the protocol handlers (and for internal broker components) to get notifications about topic-specific events as seen by BrokerService. PIP outlines changes needed for protocol handlers to keep/cache state consistent with BrokerService’s. The changes should not affect Pulsar running without protocol handlers or with protocol handlers that do not rely on the new events. ### API Changes ```java /** * Listener for the Topic events. */ @InterfaceAudience.LimitedPrivate @InterfaceStability.Evolving public interface TopicEventsListener { /** * Types of events currently supported. * create/load/unload/delete */ enum TopicEvent { // create events included into load events CREATE, LOAD, UNLOAD, DELETE, } /** * Stages of events currently supported. * before starting the event/successful completion/failed completion */ enum EventStage { BEFORE, SUCCESS, FAILURE } /** * Outcome of the listener. * Ignored for events at final stages (SUCCESS/FAILURE), * */ enum EventProcessingOutcome { OK, FAILURE, NOT_ALLOWED } /** * POJO for event processing result (outcome, message) */ @ToString(includeFieldNames=true) @Data(staticConstructor="of") class EventProcessingResult { private final EventProcessingOutcome outcome; private final String message; } /** * Handle topic event. * Choice of the thread / maintenance of the thread pool is up to the event handlers. * @param topicName - name of the topic * @param event - TopicEvent * @param stage - EventStage * @param t - exception in case of FAILURE, if present/known * @return - EventProcessingResult. * EventProcessingResult.EventProcessingOutcome != OK indicates request to cancel * event at BEFORE stage. */ EventProcessingResult handleEvent(String topicName, TopicEvent event, EventStage stage, Throwable t); } ``` BrokerService: ```java public void addTopicEventListener(TopicEventsListener... listeners) public void removeTopicEventListener(TopicEventsListener... listeners) ``` ### Implementation See PR for the proposed implementation. https://github.com/apache/pulsar/pull/19153 ### Alternatives Add new methods to the BrokerInterceptor API -- Andrey