Hi Joe, I think the most controversial point is what should a MessageId be used for. >From your opinion, it should only be used as a comparable object (opaque), which represents the position of a message [1]. What I have thought is, MessageId should be a wrapper of the MessageIdData in PulsarApi.proto [2].
I agree that at application side, there are not many cases that need to look into the details of a specific MessageId implementation. However, these "internal fields" are the de-facto concepts in Pulsar. Yeah, ledger id and entry id are concepts from BK and they might change. I doubt if there is any Pulsar application developer that doesn't know these two concepts? Let's look at the clients of other languages, including C++[3], Golang[4], Python[5], Rust[6][7], C#[8]. Only the official Java client doesn't expose these fields. I'm also okay to not change the MessageId interface in the pulsar-client-api module. Because the motivation is to simplify the messed implementations of MessageId in the pulsar-client module. We can add a MessageIdPojo class in the pulsar-common module or else. In the Pulsar repo, all MessageId implementations must extend the MessageIdPojo class. ```java @Data class MessageIdPojo extends MessageId { private final long ledgerId; // ... } ``` [1] https://pulsar.apache.org/docs/concepts-messaging#messages [2] https://github.com/apache/pulsar/blob/b31c5a6a325728b5dc5faebd1a33386952d733d5/pulsar-common/src/main/proto/PulsarApi.proto#L57 [3] https://github.com/apache/pulsar-client-cpp/blob/main/include/pulsar/MessageId.h [4] https://github.com/apache/pulsar-client-go/blob/d9c8b0ab9c14e8d571b632c93002ea20db1a2c16/pulsar/message.go#L147 [5] https://github.com/apache/pulsar-client-python/blob/75a57b427d4c6944c49f4b712344107b5444aa36/pulsar/__init__.py#L84 [6] https://github.com/streamnative/pulsar-rs/blob/de59974080daa248bfdeaea8510eb72ec8f30bac/src/consumer.rs#L1984 [7] https://github.com/streamnative/pulsar-rs/blob/de59974080daa248bfdeaea8510eb72ec8f30bac/src/consumer.rs#L1353 [8] https://github.com/apache/pulsar-dotpulsar/blob/0590b1ad6c4474d425662352ba62abb41bfb9f0a/src/DotPulsar/MessageId.cs#L56 Thanks, Yunze On Wed, Nov 9, 2022 at 8:24 PM Joe F <joefranc...@gmail.com> wrote: > > Messageid is an identifier which identifies a message. How that id is > constructed, or what it contains should not matter to an application, and > an application should not assume anything about the implementation of that > id. > > >What about the partition index? We have a `TopicMetadata` interface that > returns the number of partitions. > > Partitioning is a first class concept, and is designed to be used by > application. How a partition is implemented should not be used by the > application . > > [ People violate this all the time, and I regret that Pulsar did not > provide get_Nth_topicpartion(), which led to people hardcoding it as > topicname-N. and using that directly. Now we are stuck with it.] > > Similarly batch index and batch size. Those are all logical concepts > exposed to the user. For eg: batch size is something the app is allowed to > tune > > >Even for ledger id and entry id, this pair represents a logic storage > position like the offset concept in Kafka > These are not equivalent. In Pulsar these are implementation details, > while in Kafka those are logical concepts. > > One might think that these are logical concepts in Pulsar, because if you > reverse engineer the current msgid implementation, you observe some > "properties". > > Ledger id/entry id are logical concepts in __Bookkeeper__ , not in Pulsar. > There is the Managed Ledger abstraction on top of BK, and then there is > Pulsar on top of ML. You will break two levels of abstraction to expose > ledger/entryid to an application > > An application should only care about the operations that can be done > with a messageId > > - getmsgid() to return the message id as an opaque object > > [Operators using one messageId ] > -serde, like tostring(). for storage/retrieval of message identifier > -getter/setter on logical properties of the message (partition id etc...) > -increment/decrement > > [Operators that take multiple messageIds] > -comparator > -range > > Those are the kind of operators Pulsar should provide to a user. > Applications should not implement these operators on their own by reverse > engineering the msgId. No application should be directly using ledgerid or > entryid for doing anything (math or logic), > > As long as Pulsar provides these operations with msgid to the > application, it should not care whether it's represented as "0:1:-1:-1" > or "a:b:-b-b", or "#xba4231!haxcy1826923f" or as a serialized binary > object or..whatever it may be. > > >>But it would be harder to know a tuple like "0:1:-1:-1" means. > > A user shouldn't have to know what this means. That's the point. > > Pulsar itself changed the messageId multiple times as it added > partitioning, batching and so on, and it might do so again. And bookkeeper > could change its representation of ledgers, (for eg, to uuids and byte > offsets) ML could replace BK with something else (for eg. a table in a > db.) Anything is possible - Pulsar would then just have to change the > implementation of the operator functions, and no application needs to be > rewritten. > > -j > > On Tue, Nov 8, 2022 at 6:05 PM Yunze Xu <y...@streamnative.io.invalid> > wrote: > > > Hi Joe, > > > > Then what would we expect users to do with the MessageId? It should only > > be passed to Consumer#seek or ReaderBuilder#startMessageId? > > > > What about the partition index? We have a `TopicMetadata` interface that > > returns > > the number of partitions. If the partition is also "implementation > > details", should we expose > > this interface? Or should we support customizing a MessageRouter because it > > returns the partition index? > > > > What about the batch index and batch size? For example, we have an > > enableBatchIndexAcknowledgment method to enable batch index ACK. If batch > > index is also "implementation details", how could users know what does > > "batch > > index ack" mean? > > > > Even for ledger id and entry id, this pair represents a logic storage > > position like the offset > > concept in Kafka (though each offset represents a message while each > > entry represents > > a batch). If you see the Message API, it also exposes many attributes. > > IMO, for the > > MessageIdData, only the ack_set (a long array serialized from the > > BitSet) is the implementation > > detail. > > > > The MessageId API should be flexible, not an abstract one. If not, why > > do we still implement > > the toString() method? We should not encourage users to print the > > MessageId. It would > > be easy to know what "ledger is 0, entry id is 1" means, users only > > need to know the concepts > > of ledger id and entry id. But it would be harder to know a tuple like > > "0:1:-1:-1" means. > > > > Thanks, > > Yunze > > > > On Tue, Nov 8, 2022 at 11:16 PM Joe F <joefranc...@gmail.com> wrote: > > > > > > >Maybe this design is to hidden some details, but if > > > users don't know the details like ledger id and entry id, how could > > > you know what does "0:0:-1:0" mean? > > > > > > Abstractions exist for a reason. Ledgerid and entryid are > > implementation > > > details, and an application should not be interpreting that at all. > > > -j > > > > > > > > > On Tue, Nov 8, 2022 at 3:43 AM Yunze Xu <y...@streamnative.io.invalid> > > > wrote: > > > > > > > I didn't look into these two methods at the moment. But I think it's > > > > possible to > > > > retain only the `fromByteArray`. > > > > > > > > Thanks, > > > > Yunze > > > > > > > > On Tue, Nov 8, 2022 at 7:02 PM Enrico Olivelli <eolive...@gmail.com> > > > > wrote: > > > > > > > > > > Il giorno mar 8 nov 2022 alle ore 11:52 Yunze Xu > > > > > <y...@streamnative.io.invalid> ha scritto: > > > > > > > > > > > > Hi Enrico, > > > > > > > > > > > > > We also need a way to represent this as a String or a byte[] > > > > > > > > > > > > We already have the `toByteArray` method, right? > > > > > > > > > > Yes, correct. So we are fine. I forgot about it and I answered too > > > > quickly. > > > > > > > > > > I am not sure if this can be in the scope of this initiative, but we > > > > > should somehow get rid of > > > > > stuff like "fromByteArrayWithTopic" vs "fromByteArray". > > > > > > > > > > Thanks > > > > > Enrico > > > > > > > > > > > > > > > > > Thanks, > > > > > > Yunze > > > > > > > > > > > > On Tue, Nov 8, 2022 at 6:43 PM Enrico Olivelli < > > eolive...@gmail.com> > > > > wrote: > > > > > > > > > > > > > > Il giorno mar 8 nov 2022 alle ore 11:25 Yunze Xu > > > > > > > <y...@streamnative.io.invalid> ha scritto: > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > Currently we have the following 5 implementations of MessageId: > > > > > > > > > > > > > > > > - MessageIdImpl: (ledger id, entry id, partition index) > > > > > > > > - BatchMessageIdImpl: adds (batch index, batch size, acker), > > > > where > > > > > > > > acker is a wrapper of a BitSet. > > > > > > > > - ChunkMessageIdImpl: adds another MessageIdImpl that > > represents > > > > > > > > the first MessageIdImpl of a BitSet. > > > > > > > > - MultiMessageIdImpl: adds a map that maps the topic name to > > the > > > > > > > > MessageId. > > > > > > > > - TopicMessageIdImpl: adds the topic name and the partition > > name > > > > > > > > > > > > > > > > These implementations are such a mess. For example, when users > > get > > > > a > > > > > > > > MessageId from `Producer#send`: > > > > > > > > > > > > > > > > ```java > > > > > > > > var id = producer.send("msg"); > > > > > > > > ``` > > > > > > > > > > > > > > > > There is no getter to get some specific fields like ledger id. > > You > > > > can > > > > > > > > only see a representation from `toString` method and got some > > > > output > > > > > > > > like "0:0:-1:0". Maybe this design is to hidden some details, > > but > > > > if > > > > > > > > users don't know the details like ledger id and entry id, how > > could > > > > > > > > you know what does "0:0:-1:0" mean? What if > > `MessageId#toString`'s > > > > > > > > implementation changed? Should it be treated as a breaking > > change? > > > > > > > > > > > > > > > > The original definition of the underlying MessageIdData is much > > > > more > > > > > > > > clear: > > > > > > > > > > > > > > > > ```proto > > > > > > > > message MessageIdData { > > > > > > > > required uint64 ledgerId = 1; > > > > > > > > required uint64 entryId = 2; > > > > > > > > optional int32 partition = 3 [default = -1]; > > > > > > > > optional int32 batch_index = 4 [default = -1]; > > > > > > > > repeated int64 ack_set = 5; > > > > > > > > optional int32 batch_size = 6; > > > > > > > > > > > > > > > > // For the chunk message id, we need to specify the first > > > > chunk message id. > > > > > > > > optional MessageIdData first_chunk_message_id = 7; > > > > > > > > } > > > > > > > > ``` > > > > > > > > > > > > > > > > IMO, MessageId should be a wrapper of MessageIdData. It's more > > > > natural > > > > > > > > to have an interface like: > > > > > > > > > > > > > > > > ```java > > > > > > > > interface MessageId { > > > > > > > > long ledgerId(); > > > > > > > > long entryId(); > > > > > > > > Optional<Integer> partition(); > > > > > > > > Optional<Integer> batchIndex(); > > > > > > > > // ... > > > > > > > > ``` > > > > > > > > > > > > > > This is very good for client applications. > > > > > > > We also need a way to represent this as a String or a byte[], > > this > > > > way > > > > > > > client applications have a standard way to store > > > > > > > message offsets into an external system (for instance when you > > want > > > > to > > > > > > > user the Reader API and keep track of the position by yourself) > > > > > > > > > > > > > > Enrico > > > > > > > > > > > > > > > > > > > > > > > Additionally, there are many places that use only the triple of > > > > > > > > (ledger id, entry id, batch index) as the key to represent the > > > > position. > > > > > > > > Currently, they are done by adding a conversion from > > > > > > > > BatchMessageIdImpl to MessageIdImpl. However, it's more > > intuitive > > > > to > > > > > > > > write something like: > > > > > > > > > > > > > > > > ```java > > > > > > > > class MessageIdPosition implements > > Comparable<MessageIdPosition> { > > > > > > > > private final MessageId messageId; > > > > > > > > // TODO: compare only the triple (ledger, entry, batch) > > > > > > > > ``` > > > > > > > > > > > > > > > > Therefore, I'm going to write a proposal to redesign the > > MessageId > > > > > > > > interface only by adding some getters. Regarding the 5 existing > > > > > > > > implementations, I think we can drop them because they are a > > part > > > > > > > > of `pulsar-client`, not `pulsar-client-api`. > > > > > > > > > > > > > > > > Please feel free to share your points. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Yunze > > > > > >