https://github.com/apache/pulsar/issues/12356
--- Pasted here for quoting convenience --- ## Motivation In many use cases, applications are using Pulsar consumers or readers to fetch all the updates from a topic and construct a map with the latest value of each key for the messages that were received. This is very common when constructing a local cache of the data. We want to offer support for this access pattern directly in the Pulsar client API, as a way to encapsulate the complexities of setting this up. ## Goal Provide a view of the topic data in the form of a read-only map that is constantly updated with the latest version of each key. Additionally, let the application specify a listener so that it can perform a scan of the map and then receive notifications when new messages are received and applied. ## API Changes This proposal will only add a new API on the client side. A new type of consumer will be added, the `TableView`. Example: ```java TableView<Integer> tableView = pulsarClient.newTableView(Schema.INT32) .topic(topic) .create(); tableView.get("my-key"); // --> 5 tableView.get("my-other-key"); // --> 7 ``` When a `TableView` instance is created, it will be guaranteed to already have the latest value for each key, for the current time. ### API additions ```java interface PulsarClient { // .... <T> TableViewBuilder<T> newTableView(Schema<T> schema); } interface TableViewBuilder<T> { TableViewBuilder<T> loadConf(Map<String, Object> config); TableView<T> create() throws PulsarClientException; CompletableFuture<TableView<T>> createAsync(); TableViewBuilder<T> topic(String topic); TableViewBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit); } interface TableView<T> extends Closeable { // Similar methods as java.util.Map int size(); boolean isEmpty(); boolean containsKey(String key); T get(String key); Set<Map.Entry<String, T>> entrySet(); Set<String> keySet(); Collection<T> values(); void forEach(BiConsumer<String, T> action); /** * Performs the given action for each entry in this map until all entries * have been processed or the action throws an exception. * * When all the entries have been processed, the action will be invoked * for every new update that is received from the topic. * * @param action The action to be performed for each entry */ void forEachAndListen(BiConsumer<String, T> action); /** * Close the table view and releases resources allocated. * * @return a future that can used to track when the table view has been closed */ CompletableFuture<Void> closeAsync(); } ``` ## Implementation The `TableView` will be implemented using multiple `Reader` instances, one per each partition and will always specify to read starting from the compacted view of the topic. The creation time of a table view can be controlled by configuring the topic compaction policies for the given topic or namespace. More frequent compaction can lead to very short startup times, as in less data will be replayed to reconstruct the `TableView` of the topic. -- Matteo Merli <matteo.me...@gmail.com>