Hi,

I am starting discussion for PIP-241: TopicEventListener / topic events for
the BrokerService.

PIP issue: https://github.com/apache/pulsar/issues/19224

### Motivation

Some Protocol Handlers may need to know about the topic-specific events to
update internal caches and/or state.

These mechanisms will be useful also for core Pulsar components (like the
Transactions subsystem) and probably we would be able to simplify the
interaction between the internal components in the broker by using an
unified mechanism to handle the lifecycle of topics.

Specific use cases:

KOP keeps some state for the topic and needs to handle such cases as:

- Topic Unloaded: release resources dedicated to the topic
- Topic Loaded: trigger loading of components tied to the partition
(GroupCoordinator, TransactionManager)
- Topic Deleted: remove any persistent state associated to the topic that
is stored in additional side system topics
- Topic Created: the same as “deleted” (ensure that there is no state on
system topics related to the new topic)


### Goal

This PIP defines a set of events needed for the protocol handlers (and for
internal broker components) to get notifications about topic-specific
events as seen by BrokerService. PIP outlines changes needed for protocol
handlers to keep/cache state consistent with BrokerService’s.

The changes should not affect Pulsar running without protocol handlers or
with protocol handlers that do not rely on the new events.


### API Changes

```java
/**
 * Listener for the Topic events.
 */
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface TopicEventsListener {

    /**
     * Types of events currently supported.
     *  create/load/unload/delete
     */
    enum TopicEvent {
        // create events included into load events
        CREATE,
        LOAD,
        UNLOAD,
        DELETE,
    }

    /**
     * Stages of events currently supported.
     *  before starting the event/successful completion/failed completion
     */
    enum EventStage {
        BEFORE,
        SUCCESS,
        FAILURE
    }

    /**
     * Outcome of the listener.
     * Ignored for events at final stages (SUCCESS/FAILURE),
     *
     */
    enum EventProcessingOutcome {
        OK,
        FAILURE,
        NOT_ALLOWED
    }

    /**
     * POJO for event processing result (outcome, message)
     */
    @ToString(includeFieldNames=true)
    @Data(staticConstructor="of")
    class EventProcessingResult {
        private final EventProcessingOutcome outcome;
        private final String message;
    }

    /**
     * Handle topic event.
     * Choice of the thread / maintenance of the thread pool is up to the
event handlers.
     * @param topicName - name of the topic
     * @param event - TopicEvent
     * @param stage - EventStage
     * @param t - exception in case of FAILURE, if present/known
     * @return - EventProcessingResult.
     *      EventProcessingResult.EventProcessingOutcome != OK indicates
request to cancel
     *           event at BEFORE stage.
     */
    EventProcessingResult handleEvent(String topicName, TopicEvent event,
EventStage stage, Throwable t);
}
```

BrokerService:
```java
    public void addTopicEventListener(TopicEventsListener... listeners)

    public void removeTopicEventListener(TopicEventsListener... listeners)
```

### Implementation

See PR for the proposed implementation.
https://github.com/apache/pulsar/pull/19153


### Alternatives

Add new methods to the BrokerInterceptor API

--
Andrey

Reply via email to