Hi all, >From the previous discussion [1], we reached a consensus that MessageId should only be a comparable and serializable opaque object (only in Java client though). However, the APIs that return or accept a MessageId have some hidden requirements.
Let's introduce the 5 implementations first because they are very messy. - ChunkedMessageIdImpl: it's a special MessageId for chunked messages, see PIP-36 [2] and PIP-107 [3], I won't discuss much about it. - MessageIdImpl: the (ledger, entry, partition) triple that represents a BK position of a message. - BatchedMessageIdImpl: added the batch fields to MessageIdImpl to differentiate different messages in the same entry. Let's talk a little more about the other two implementations. ```java public class TopicMessageIdImpl implements MessageId { private final String topicPartitionName; private final String topicName; private final MessageId messageId; ``` It's a proxy of an existing MessageId implementation like MessageIdImpl. The `equals`/`compareTo`/`hashCode`/`toByteArray` methods just call the same name methods of the underlying `messageId` so it acts like another MessageId implementation. ```java public class MultiMessageIdImpl implements MessageId { private Map<String, MessageId> map; ``` It's an incomplete implementation [4] that represents multiple MessageId instances. The `equals`/`compareTo` methods can only accept another `MultiMessageIdImpl` instance, the `toByteArray` method just throws an exception. Now, let's see the public APIs that return or accept a MessageId instance, for simplicity, only consider the synchronous version: Returns a MessageId: - `Producer#send` - `Message#getMessageId`: the `Message` object can be retrieved by `Consumer#receive` API or inside a `MessageListener` - `Consumer#getLastMessageId` Accepts a MessageId: - `Consumer#seek` - `Consumer#acknowledge` (or `acknowledgeCumulative`) So I made two tables: | Instance | Description | Method | MessageId | | :- | :- | :- | :- | | Producer | Batched | send | BatchedMessageIdImpl | | Producer | Non-Batched | send | MessageIdImpl | | Consumer | | receive | TopicMessageIdImpl | | Consumer | Single Topic | getLastMessageId | MessageIdImpl | | Consumer | Multi Topics | getLastMessageId | MultiMessageIdImpl | The underlying MessageId of a TopicMessageIdImpl is the same as what `send` returns. And since BatchedMessageIdImpl inherits MessageIdImpl, `getLastMessageId` could also return a BatchedMessageIdImpl. | Description | Method | Required | | :- | :- | :- | | Single Topic | seek | MessageIdImpl | | Multi Topics | seek | MessageIdImpl, TopicMessageIdImpl | | Single Topic | acknowledge | MessageIdImpl | | Multi Topics | acknowledge | TopicMessageIdImpl | >From the two tables, we can have the following conclusions: 1. MultiMessageIdImpl is totally meaningless at the moment. [6] 2. A MessageId returned by Producer#send cannot be passed to MultiTopicsConsumerImpl#acknowledge. That's what I want to discuss. (Sorry for taking too much time to introduce these MessageId implementations, but as you see, what a mess!) There was a proposal that intends to add an override of the acknowledge API that accepts a map [7]. I saw an objection: > There is no way to confirm that the MessageId really belongs to the partition and you can pass whatever you want Actually, for a `TopicMessageIdImpl` or `MultiMessageIdImpl`, there is also no way to confirm if the topic belongs to a topic subscribed by the consumer. What's worse are: - If the MessageId didn't carry a topic name, `acknowledgeAsync` would throw an `IllegalArgumentException`. - If the topic name in the `TopicMessageIdImpl` was invalid, no error would be returned. IMO, TopicMessageIdImpl and MultiMessageIdImpl are designed without enough careful considerations. They should be eliminated. Instead, passing a map of (topic name, MessageId) would be a better way. The semantic is clear. So I decide to open a discussion to determine whether to start a PIP to add `seek` and `acknowledge` APIs like: ```java Map<String, CompletableFuture<Void>> seekAsync(Map<String, MessageId> messageIdMap); Map<String, CompletableFuture<Void>> acknowledgeAsync(Map<String, MessageId> messageIdMap); Map<String, CompletableFuture<Void>> acknowledgeCumulativeAsync(Map<String, MessageId> messageIdMap); ``` If the key (topic/partition name) is not owned by a consumer, the associated future will be completed exceptionally with a specific exception. Another benefit is that the MessageId implementations would be simplified. We would have only 3 implementations (remember there is a chunked message ID). Actually I think we can also merge MessageIdImpl and BatchedMessageIdImpl into one implementation, but it's not a point I want to discuss in this email. [1] https://lists.apache.org/thread/rdkqnkohbmkjjs61hvoqplhhngr0b0sd [2] https://github.com/apache/pulsar/wiki/PIP-36%3A-Max-Message-Size [3] https://github.com/apache/pulsar/issues/12402 [4] https://github.com/apache/pulsar/issues/4940 [5] https://github.com/apache/pulsar/issues/16619 [6] https://github.com/apache/pulsar/issues/18409 [7] https://lists.apache.org/thread/3vd136jfhh7g0mhgwxrnqk7mx69qy17m Thanks, Yunze