Hi all,

>From the previous discussion [1], we reached a consensus that
MessageId should only be a comparable and serializable opaque object
(only in Java client though). However, the APIs that return or accept
a MessageId have some hidden requirements.

Let's introduce the 5 implementations first because they are very
messy.
- ChunkedMessageIdImpl: it's a special MessageId for chunked messages,
  see PIP-36 [2] and PIP-107 [3], I won't discuss much about it.
- MessageIdImpl: the (ledger, entry, partition) triple that represents
  a BK position of a message.
- BatchedMessageIdImpl: added the batch fields to MessageIdImpl to
  differentiate different messages in the same entry.

Let's talk a little more about the other two implementations.

```java
public class TopicMessageIdImpl implements MessageId {
    private final String topicPartitionName;
    private final String topicName;
    private final MessageId messageId;
```

It's a proxy of an existing MessageId implementation like
MessageIdImpl. The `equals`/`compareTo`/`hashCode`/`toByteArray`
methods just call the same name methods of the underlying `messageId`
so it acts like another MessageId implementation.

```java
public class MultiMessageIdImpl implements MessageId {
    private Map<String, MessageId> map;
```

It's an incomplete implementation [4] that represents multiple MessageId
instances. The `equals`/`compareTo` methods can only accept another
`MultiMessageIdImpl` instance, the `toByteArray` method just throws an
exception.

Now, let's see the public APIs that return or accept a MessageId
instance, for simplicity, only consider the synchronous version:

Returns a MessageId:
- `Producer#send`
- `Message#getMessageId`: the `Message` object can be retrieved by
  `Consumer#receive` API or inside a `MessageListener`
- `Consumer#getLastMessageId`

Accepts a MessageId:
- `Consumer#seek`
- `Consumer#acknowledge` (or `acknowledgeCumulative`)

So I made two tables:

| Instance | Description | Method | MessageId |
| :- | :- | :- | :- |
| Producer | Batched      | send | BatchedMessageIdImpl |
| Producer | Non-Batched  | send | MessageIdImpl |
| Consumer |              | receive | TopicMessageIdImpl |
| Consumer | Single Topic | getLastMessageId | MessageIdImpl |
| Consumer | Multi Topics | getLastMessageId | MultiMessageIdImpl |

The underlying MessageId of a TopicMessageIdImpl is the same as what
`send` returns. And since BatchedMessageIdImpl inherits MessageIdImpl,
`getLastMessageId` could also return a BatchedMessageIdImpl.

| Description | Method | Required |
| :- | :- | :- |
| Single Topic | seek | MessageIdImpl |
| Multi Topics | seek | MessageIdImpl, TopicMessageIdImpl |
| Single Topic | acknowledge | MessageIdImpl |
| Multi Topics | acknowledge | TopicMessageIdImpl |

>From the two tables, we can have the following conclusions:
1. MultiMessageIdImpl is totally meaningless at the moment. [6]
2. A MessageId returned by Producer#send cannot be passed to
  MultiTopicsConsumerImpl#acknowledge.

That's what I want to discuss. (Sorry for taking too much time to
introduce these MessageId implementations, but as you see, what a
mess!) There was a proposal that intends to add an override of
the acknowledge API that accepts a map [7]. I saw an objection:

> There is no way to confirm that the MessageId really belongs to the
partition and you can pass whatever you want

Actually, for a `TopicMessageIdImpl` or `MultiMessageIdImpl`, there is
also no way to confirm if the topic belongs to a topic subscribed by
the consumer. What's worse are:
- If the MessageId didn't carry a topic name, `acknowledgeAsync` would
  throw an `IllegalArgumentException`.
- If the topic name in the `TopicMessageIdImpl` was invalid, no error
  would be returned.

IMO, TopicMessageIdImpl and MultiMessageIdImpl are designed without
enough careful considerations. They should be eliminated. Instead,
passing a map of (topic name, MessageId) would be a better way. The
semantic is clear. So I decide to open a discussion to determine
whether to start a PIP to add `seek` and `acknowledge` APIs like:

```java
Map<String, CompletableFuture<Void>> seekAsync(Map<String, MessageId>
messageIdMap);
Map<String, CompletableFuture<Void>> acknowledgeAsync(Map<String,
MessageId> messageIdMap);
Map<String, CompletableFuture<Void>>
acknowledgeCumulativeAsync(Map<String, MessageId> messageIdMap);
```

If the key (topic/partition name) is not owned by a consumer, the
associated future will be completed exceptionally with a specific
exception.

Another benefit is that the MessageId implementations would be
simplified. We would have only 3 implementations (remember there is
a chunked message ID). Actually I think we can also merge
MessageIdImpl and BatchedMessageIdImpl into one implementation, but
it's not a point I want to discuss in this email.

[1] https://lists.apache.org/thread/rdkqnkohbmkjjs61hvoqplhhngr0b0sd
[2] https://github.com/apache/pulsar/wiki/PIP-36%3A-Max-Message-Size
[3] https://github.com/apache/pulsar/issues/12402
[4] https://github.com/apache/pulsar/issues/4940
[5] https://github.com/apache/pulsar/issues/16619
[6] https://github.com/apache/pulsar/issues/18409
[7] https://lists.apache.org/thread/3vd136jfhh7g0mhgwxrnqk7mx69qy17m

Thanks,
Yunze

Reply via email to