Hi all,
>From the previous discussion [1], we reached a consensus that
MessageId should only be a comparable and serializable opaque object
(only in Java client though). However, the APIs that return or accept
a MessageId have some hidden requirements.
Let's introduce the 5 implementations first because they are very
messy.
- ChunkedMessageIdImpl: it's a special MessageId for chunked messages,
see PIP-36 [2] and PIP-107 [3], I won't discuss much about it.
- MessageIdImpl: the (ledger, entry, partition) triple that represents
a BK position of a message.
- BatchedMessageIdImpl: added the batch fields to MessageIdImpl to
differentiate different messages in the same entry.
Let's talk a little more about the other two implementations.
```java
public class TopicMessageIdImpl implements MessageId {
private final String topicPartitionName;
private final String topicName;
private final MessageId messageId;
```
It's a proxy of an existing MessageId implementation like
MessageIdImpl. The `equals`/`compareTo`/`hashCode`/`toByteArray`
methods just call the same name methods of the underlying `messageId`
so it acts like another MessageId implementation.
```java
public class MultiMessageIdImpl implements MessageId {
private Map<String, MessageId> map;
```
It's an incomplete implementation [4] that represents multiple MessageId
instances. The `equals`/`compareTo` methods can only accept another
`MultiMessageIdImpl` instance, the `toByteArray` method just throws an
exception.
Now, let's see the public APIs that return or accept a MessageId
instance, for simplicity, only consider the synchronous version:
Returns a MessageId:
- `Producer#send`
- `Message#getMessageId`: the `Message` object can be retrieved by
`Consumer#receive` API or inside a `MessageListener`
- `Consumer#getLastMessageId`
Accepts a MessageId:
- `Consumer#seek`
- `Consumer#acknowledge` (or `acknowledgeCumulative`)
So I made two tables:
| Instance | Description | Method | MessageId |
| :- | :- | :- | :- |
| Producer | Batched | send | BatchedMessageIdImpl |
| Producer | Non-Batched | send | MessageIdImpl |
| Consumer | | receive | TopicMessageIdImpl |
| Consumer | Single Topic | getLastMessageId | MessageIdImpl |
| Consumer | Multi Topics | getLastMessageId | MultiMessageIdImpl |
The underlying MessageId of a TopicMessageIdImpl is the same as what
`send` returns. And since BatchedMessageIdImpl inherits MessageIdImpl,
`getLastMessageId` could also return a BatchedMessageIdImpl.
| Description | Method | Required |
| :- | :- | :- |
| Single Topic | seek | MessageIdImpl |
| Multi Topics | seek | MessageIdImpl, TopicMessageIdImpl |
| Single Topic | acknowledge | MessageIdImpl |
| Multi Topics | acknowledge | TopicMessageIdImpl |
>From the two tables, we can have the following conclusions:
1. MultiMessageIdImpl is totally meaningless at the moment. [6]
2. A MessageId returned by Producer#send cannot be passed to
MultiTopicsConsumerImpl#acknowledge.
That's what I want to discuss. (Sorry for taking too much time to
introduce these MessageId implementations, but as you see, what a
mess!) There was a proposal that intends to add an override of
the acknowledge API that accepts a map [7]. I saw an objection:
> There is no way to confirm that the MessageId really belongs to the
partition and you can pass whatever you want
Actually, for a `TopicMessageIdImpl` or `MultiMessageIdImpl`, there is
also no way to confirm if the topic belongs to a topic subscribed by
the consumer. What's worse are:
- If the MessageId didn't carry a topic name, `acknowledgeAsync` would
throw an `IllegalArgumentException`.
- If the topic name in the `TopicMessageIdImpl` was invalid, no error
would be returned.
IMO, TopicMessageIdImpl and MultiMessageIdImpl are designed without
enough careful considerations. They should be eliminated. Instead,
passing a map of (topic name, MessageId) would be a better way. The
semantic is clear. So I decide to open a discussion to determine
whether to start a PIP to add `seek` and `acknowledge` APIs like:
```java
Map<String, CompletableFuture<Void>> seekAsync(Map<String, MessageId>
messageIdMap);
Map<String, CompletableFuture<Void>> acknowledgeAsync(Map<String,
MessageId> messageIdMap);
Map<String, CompletableFuture<Void>>
acknowledgeCumulativeAsync(Map<String, MessageId> messageIdMap);
```
If the key (topic/partition name) is not owned by a consumer, the
associated future will be completed exceptionally with a specific
exception.
Another benefit is that the MessageId implementations would be
simplified. We would have only 3 implementations (remember there is
a chunked message ID). Actually I think we can also merge
MessageIdImpl and BatchedMessageIdImpl into one implementation, but
it's not a point I want to discuss in this email.
[1] https://lists.apache.org/thread/rdkqnkohbmkjjs61hvoqplhhngr0b0sd
[2] https://github.com/apache/pulsar/wiki/PIP-36%3A-Max-Message-Size
[3] https://github.com/apache/pulsar/issues/12402
[4] https://github.com/apache/pulsar/issues/4940
[5] https://github.com/apache/pulsar/issues/16619
[6] https://github.com/apache/pulsar/issues/18409
[7] https://lists.apache.org/thread/3vd136jfhh7g0mhgwxrnqk7mx69qy17m
Thanks,
Yunze