Yunze,
great explanation, I support your proposals at 100%

Enrico

Il giorno gio 10 nov 2022 alle ore 13:17 Yunze Xu
<y...@streamnative.io.invalid> ha scritto:
>
> Hi all,
>
> From the previous discussion [1], we reached a consensus that
> MessageId should only be a comparable and serializable opaque object
> (only in Java client though). However, the APIs that return or accept
> a MessageId have some hidden requirements.
>
> Let's introduce the 5 implementations first because they are very
> messy.
> - ChunkedMessageIdImpl: it's a special MessageId for chunked messages,
>   see PIP-36 [2] and PIP-107 [3], I won't discuss much about it.
> - MessageIdImpl: the (ledger, entry, partition) triple that represents
>   a BK position of a message.
> - BatchedMessageIdImpl: added the batch fields to MessageIdImpl to
>   differentiate different messages in the same entry.
>
> Let's talk a little more about the other two implementations.
>
> ```java
> public class TopicMessageIdImpl implements MessageId {
>     private final String topicPartitionName;
>     private final String topicName;
>     private final MessageId messageId;
> ```
>
> It's a proxy of an existing MessageId implementation like
> MessageIdImpl. The `equals`/`compareTo`/`hashCode`/`toByteArray`
> methods just call the same name methods of the underlying `messageId`
> so it acts like another MessageId implementation.
>
> ```java
> public class MultiMessageIdImpl implements MessageId {
>     private Map<String, MessageId> map;
> ```
>
> It's an incomplete implementation [4] that represents multiple MessageId
> instances. The `equals`/`compareTo` methods can only accept another
> `MultiMessageIdImpl` instance, the `toByteArray` method just throws an
> exception.
>
> Now, let's see the public APIs that return or accept a MessageId
> instance, for simplicity, only consider the synchronous version:
>
> Returns a MessageId:
> - `Producer#send`
> - `Message#getMessageId`: the `Message` object can be retrieved by
>   `Consumer#receive` API or inside a `MessageListener`
> - `Consumer#getLastMessageId`
>
> Accepts a MessageId:
> - `Consumer#seek`
> - `Consumer#acknowledge` (or `acknowledgeCumulative`)
>
> So I made two tables:
>
> | Instance | Description | Method | MessageId |
> | :- | :- | :- | :- |
> | Producer | Batched      | send | BatchedMessageIdImpl |
> | Producer | Non-Batched  | send | MessageIdImpl |
> | Consumer |              | receive | TopicMessageIdImpl |
> | Consumer | Single Topic | getLastMessageId | MessageIdImpl |
> | Consumer | Multi Topics | getLastMessageId | MultiMessageIdImpl |
>
> The underlying MessageId of a TopicMessageIdImpl is the same as what
> `send` returns. And since BatchedMessageIdImpl inherits MessageIdImpl,
> `getLastMessageId` could also return a BatchedMessageIdImpl.
>
> | Description | Method | Required |
> | :- | :- | :- |
> | Single Topic | seek | MessageIdImpl |
> | Multi Topics | seek | MessageIdImpl, TopicMessageIdImpl |
> | Single Topic | acknowledge | MessageIdImpl |
> | Multi Topics | acknowledge | TopicMessageIdImpl |
>
> From the two tables, we can have the following conclusions:
> 1. MultiMessageIdImpl is totally meaningless at the moment. [6]
> 2. A MessageId returned by Producer#send cannot be passed to
>   MultiTopicsConsumerImpl#acknowledge.
>
> That's what I want to discuss. (Sorry for taking too much time to
> introduce these MessageId implementations, but as you see, what a
> mess!) There was a proposal that intends to add an override of
> the acknowledge API that accepts a map [7]. I saw an objection:
>
> > There is no way to confirm that the MessageId really belongs to the
> partition and you can pass whatever you want
>
> Actually, for a `TopicMessageIdImpl` or `MultiMessageIdImpl`, there is
> also no way to confirm if the topic belongs to a topic subscribed by
> the consumer. What's worse are:
> - If the MessageId didn't carry a topic name, `acknowledgeAsync` would
>   throw an `IllegalArgumentException`.
> - If the topic name in the `TopicMessageIdImpl` was invalid, no error
>   would be returned.
>
> IMO, TopicMessageIdImpl and MultiMessageIdImpl are designed without
> enough careful considerations. They should be eliminated. Instead,
> passing a map of (topic name, MessageId) would be a better way. The
> semantic is clear. So I decide to open a discussion to determine
> whether to start a PIP to add `seek` and `acknowledge` APIs like:
>
> ```java
> Map<String, CompletableFuture<Void>> seekAsync(Map<String, MessageId>
> messageIdMap);
> Map<String, CompletableFuture<Void>> acknowledgeAsync(Map<String,
> MessageId> messageIdMap);
> Map<String, CompletableFuture<Void>>
> acknowledgeCumulativeAsync(Map<String, MessageId> messageIdMap);
> ```
>
> If the key (topic/partition name) is not owned by a consumer, the
> associated future will be completed exceptionally with a specific
> exception.
>
> Another benefit is that the MessageId implementations would be
> simplified. We would have only 3 implementations (remember there is
> a chunked message ID). Actually I think we can also merge
> MessageIdImpl and BatchedMessageIdImpl into one implementation, but
> it's not a point I want to discuss in this email.
>
> [1] https://lists.apache.org/thread/rdkqnkohbmkjjs61hvoqplhhngr0b0sd
> [2] https://github.com/apache/pulsar/wiki/PIP-36%3A-Max-Message-Size
> [3] https://github.com/apache/pulsar/issues/12402
> [4] https://github.com/apache/pulsar/issues/4940
> [5] https://github.com/apache/pulsar/issues/16619
> [6] https://github.com/apache/pulsar/issues/18409
> [7] https://lists.apache.org/thread/3vd136jfhh7g0mhgwxrnqk7mx69qy17m
>
> Thanks,
> Yunze

Reply via email to