Yunze, great explanation, I support your proposals at 100% Enrico
Il giorno gio 10 nov 2022 alle ore 13:17 Yunze Xu <y...@streamnative.io.invalid> ha scritto: > > Hi all, > > From the previous discussion [1], we reached a consensus that > MessageId should only be a comparable and serializable opaque object > (only in Java client though). However, the APIs that return or accept > a MessageId have some hidden requirements. > > Let's introduce the 5 implementations first because they are very > messy. > - ChunkedMessageIdImpl: it's a special MessageId for chunked messages, > see PIP-36 [2] and PIP-107 [3], I won't discuss much about it. > - MessageIdImpl: the (ledger, entry, partition) triple that represents > a BK position of a message. > - BatchedMessageIdImpl: added the batch fields to MessageIdImpl to > differentiate different messages in the same entry. > > Let's talk a little more about the other two implementations. > > ```java > public class TopicMessageIdImpl implements MessageId { > private final String topicPartitionName; > private final String topicName; > private final MessageId messageId; > ``` > > It's a proxy of an existing MessageId implementation like > MessageIdImpl. The `equals`/`compareTo`/`hashCode`/`toByteArray` > methods just call the same name methods of the underlying `messageId` > so it acts like another MessageId implementation. > > ```java > public class MultiMessageIdImpl implements MessageId { > private Map<String, MessageId> map; > ``` > > It's an incomplete implementation [4] that represents multiple MessageId > instances. The `equals`/`compareTo` methods can only accept another > `MultiMessageIdImpl` instance, the `toByteArray` method just throws an > exception. > > Now, let's see the public APIs that return or accept a MessageId > instance, for simplicity, only consider the synchronous version: > > Returns a MessageId: > - `Producer#send` > - `Message#getMessageId`: the `Message` object can be retrieved by > `Consumer#receive` API or inside a `MessageListener` > - `Consumer#getLastMessageId` > > Accepts a MessageId: > - `Consumer#seek` > - `Consumer#acknowledge` (or `acknowledgeCumulative`) > > So I made two tables: > > | Instance | Description | Method | MessageId | > | :- | :- | :- | :- | > | Producer | Batched | send | BatchedMessageIdImpl | > | Producer | Non-Batched | send | MessageIdImpl | > | Consumer | | receive | TopicMessageIdImpl | > | Consumer | Single Topic | getLastMessageId | MessageIdImpl | > | Consumer | Multi Topics | getLastMessageId | MultiMessageIdImpl | > > The underlying MessageId of a TopicMessageIdImpl is the same as what > `send` returns. And since BatchedMessageIdImpl inherits MessageIdImpl, > `getLastMessageId` could also return a BatchedMessageIdImpl. > > | Description | Method | Required | > | :- | :- | :- | > | Single Topic | seek | MessageIdImpl | > | Multi Topics | seek | MessageIdImpl, TopicMessageIdImpl | > | Single Topic | acknowledge | MessageIdImpl | > | Multi Topics | acknowledge | TopicMessageIdImpl | > > From the two tables, we can have the following conclusions: > 1. MultiMessageIdImpl is totally meaningless at the moment. [6] > 2. A MessageId returned by Producer#send cannot be passed to > MultiTopicsConsumerImpl#acknowledge. > > That's what I want to discuss. (Sorry for taking too much time to > introduce these MessageId implementations, but as you see, what a > mess!) There was a proposal that intends to add an override of > the acknowledge API that accepts a map [7]. I saw an objection: > > > There is no way to confirm that the MessageId really belongs to the > partition and you can pass whatever you want > > Actually, for a `TopicMessageIdImpl` or `MultiMessageIdImpl`, there is > also no way to confirm if the topic belongs to a topic subscribed by > the consumer. What's worse are: > - If the MessageId didn't carry a topic name, `acknowledgeAsync` would > throw an `IllegalArgumentException`. > - If the topic name in the `TopicMessageIdImpl` was invalid, no error > would be returned. > > IMO, TopicMessageIdImpl and MultiMessageIdImpl are designed without > enough careful considerations. They should be eliminated. Instead, > passing a map of (topic name, MessageId) would be a better way. The > semantic is clear. So I decide to open a discussion to determine > whether to start a PIP to add `seek` and `acknowledge` APIs like: > > ```java > Map<String, CompletableFuture<Void>> seekAsync(Map<String, MessageId> > messageIdMap); > Map<String, CompletableFuture<Void>> acknowledgeAsync(Map<String, > MessageId> messageIdMap); > Map<String, CompletableFuture<Void>> > acknowledgeCumulativeAsync(Map<String, MessageId> messageIdMap); > ``` > > If the key (topic/partition name) is not owned by a consumer, the > associated future will be completed exceptionally with a specific > exception. > > Another benefit is that the MessageId implementations would be > simplified. We would have only 3 implementations (remember there is > a chunked message ID). Actually I think we can also merge > MessageIdImpl and BatchedMessageIdImpl into one implementation, but > it's not a point I want to discuss in this email. > > [1] https://lists.apache.org/thread/rdkqnkohbmkjjs61hvoqplhhngr0b0sd > [2] https://github.com/apache/pulsar/wiki/PIP-36%3A-Max-Message-Size > [3] https://github.com/apache/pulsar/issues/12402 > [4] https://github.com/apache/pulsar/issues/4940 > [5] https://github.com/apache/pulsar/issues/16619 > [6] https://github.com/apache/pulsar/issues/18409 > [7] https://lists.apache.org/thread/3vd136jfhh7g0mhgwxrnqk7mx69qy17m > > Thanks, > Yunze