Hi Joe,

I think the most controversial point is what should a MessageId be used for.
>From your opinion, it should only be used as a comparable object (opaque),
which represents the position of a message [1]. What I have thought is,
MessageId should be a wrapper of the MessageIdData in PulsarApi.proto [2].

I agree that at application side, there are not many cases that need
to look into
the details of a specific MessageId implementation. However, these
"internal fields"
are the de-facto concepts in Pulsar. Yeah, ledger id and entry id are
concepts from
BK and they might change. I doubt if there is any Pulsar application
developer that
doesn't know these two concepts?

Let's look at the clients of other languages, including C++[3],
Golang[4], Python[5],
Rust[6][7], C#[8]. Only the official Java client doesn't expose these fields.

I'm also okay to not change the MessageId interface in the
pulsar-client-api module.
Because the motivation is to simplify the messed implementations of
MessageId in the
pulsar-client module.

We can add a MessageIdPojo class in the pulsar-common module or else.
In the Pulsar
repo, all MessageId implementations must extend the MessageIdPojo class.

```java
@Data
class MessageIdPojo extends MessageId {
    private final long ledgerId;
    // ...
}
```

[1] https://pulsar.apache.org/docs/concepts-messaging#messages
[2] 
https://github.com/apache/pulsar/blob/b31c5a6a325728b5dc5faebd1a33386952d733d5/pulsar-common/src/main/proto/PulsarApi.proto#L57
[3] 
https://github.com/apache/pulsar-client-cpp/blob/main/include/pulsar/MessageId.h
[4] 
https://github.com/apache/pulsar-client-go/blob/d9c8b0ab9c14e8d571b632c93002ea20db1a2c16/pulsar/message.go#L147
[5] 
https://github.com/apache/pulsar-client-python/blob/75a57b427d4c6944c49f4b712344107b5444aa36/pulsar/__init__.py#L84
[6] 
https://github.com/streamnative/pulsar-rs/blob/de59974080daa248bfdeaea8510eb72ec8f30bac/src/consumer.rs#L1984
[7] 
https://github.com/streamnative/pulsar-rs/blob/de59974080daa248bfdeaea8510eb72ec8f30bac/src/consumer.rs#L1353
[8] 
https://github.com/apache/pulsar-dotpulsar/blob/0590b1ad6c4474d425662352ba62abb41bfb9f0a/src/DotPulsar/MessageId.cs#L56

Thanks,
Yunze

On Wed, Nov 9, 2022 at 8:24 PM Joe F <joefranc...@gmail.com> wrote:
>
> Messageid is an identifier which identifies a message.  How that id is
> constructed, or what it contains should not  matter to an application,  and
> an application should not assume anything about the implementation of that
> id.
>
> >What about the partition index? We have a `TopicMetadata` interface that
> returns the number of partitions.
>
> Partitioning is a first class concept, and is  designed to be used by
> application.  How a partition is implemented  should not be used by the
> application .
>
>  [ People violate this all the time, and I regret that Pulsar did not
> provide get_Nth_topicpartion(), which led to people hardcoding it  as
> topicname-N. and using that directly.  Now we are stuck with it.]
>
>  Similarly batch index and batch size. Those are all logical concepts
> exposed to the user.  For eg: batch size is something the app is allowed to
> tune
>
> >Even for ledger id and entry id, this pair represents a logic storage
> position like the offset concept in Kafka
> These are not equivalent.   In Pulsar these are implementation details,
> while in Kafka those are logical concepts.
>
> One might think that these are logical concepts in Pulsar, because if you
> reverse engineer the current msgid implementation, you observe some
> "properties".
>
> Ledger id/entry id are logical concepts in __Bookkeeper__ , not  in Pulsar.
> There is the Managed Ledger abstraction on top of BK, and then there is
> Pulsar on top of ML. You will break two levels of abstraction to expose
> ledger/entryid to an application
>
> An application  should only care about the  operations that  can be done
> with a messageId
>
> - getmsgid() to return the message id  as an opaque object
>
> [Operators   using  one messageId ]
> -serde,   like tostring(). for storage/retrieval of message identifier
> -getter/setter on logical properties of the message (partition id etc...)
> -increment/decrement
>
> [Operators that take multiple messageIds]
> -comparator
> -range
>
> Those are the kind of operators Pulsar should provide to a user.
> Applications should not implement these operators on their own by reverse
> engineering the msgId. No application should be directly using ledgerid or
> entryid for doing anything (math or logic),
>
>   As long as Pulsar provides  these operations  with msgid to the
> application,  it should not care whether it's represented as "0:1:-1:-1"
> or  "a:b:-b-b", or   "#xba4231!haxcy1826923f" or as a serialized binary
> object or..whatever it may be.
>
> >>But it would be harder to know a tuple like "0:1:-1:-1" means.
>
> A user shouldn't have to know what this means. That's the point.
>
> Pulsar itself changed the messageId multiple times as it added
> partitioning, batching and so on, and it might do so again. And bookkeeper
> could change its representation of  ledgers, (for eg,  to uuids and byte
> offsets)  ML could replace BK with something else  (for eg.  a table in a
> db.)  Anything is possible - Pulsar would then just have to change the
> implementation of the operator functions, and no application needs to be
> rewritten.
>
> -j
>
> On Tue, Nov 8, 2022 at 6:05 PM Yunze Xu <y...@streamnative.io.invalid>
> wrote:
>
> > Hi Joe,
> >
> > Then what would we expect users to do with the MessageId? It should only
> > be passed to Consumer#seek or ReaderBuilder#startMessageId?
> >
> > What about the partition index? We have a `TopicMetadata` interface that
> > returns
> > the number of partitions. If the partition is also "implementation
> > details", should we expose
> > this interface? Or should we support customizing a MessageRouter because it
> > returns the partition index?
> >
> > What about the batch index and batch size? For example, we have an
> > enableBatchIndexAcknowledgment method to enable batch index ACK. If batch
> > index is also "implementation details", how could users know what does
> > "batch
> > index ack" mean?
> >
> > Even for ledger id and entry id, this pair represents a logic storage
> > position like the offset
> > concept in Kafka (though each offset represents a message while each
> > entry represents
> > a batch). If you see the Message API, it also exposes many attributes.
> > IMO, for the
> > MessageIdData, only the ack_set (a long array serialized from the
> > BitSet) is the implementation
> > detail.
> >
> > The MessageId API should be flexible, not an abstract one. If not, why
> > do we still implement
> > the toString() method? We should not encourage users to print the
> > MessageId. It would
> > be easy to know what "ledger is 0, entry id is 1" means, users only
> > need to know the concepts
> > of ledger id and entry id. But it would be harder to know a tuple like
> > "0:1:-1:-1" means.
> >
> > Thanks,
> > Yunze
> >
> > On Tue, Nov 8, 2022 at 11:16 PM Joe F <joefranc...@gmail.com> wrote:
> > >
> > > >Maybe this design is to hidden some details, but if
> > > users don't know the details like ledger id and entry id, how could
> > > you know what does "0:0:-1:0" mean?
> > >
> > >  Abstractions exist for a reason. Ledgerid and entryid are
> > implementation
> > > details, and an application should not be interpreting that at all.
> > > -j
> > >
> > >
> > > On Tue, Nov 8, 2022 at 3:43 AM Yunze Xu <y...@streamnative.io.invalid>
> > > wrote:
> > >
> > > > I didn't look into these two methods at the moment. But I think it's
> > > > possible to
> > > > retain only the `fromByteArray`.
> > > >
> > > > Thanks,
> > > > Yunze
> > > >
> > > > On Tue, Nov 8, 2022 at 7:02 PM Enrico Olivelli <eolive...@gmail.com>
> > > > wrote:
> > > > >
> > > > > Il giorno mar 8 nov 2022 alle ore 11:52 Yunze Xu
> > > > > <y...@streamnative.io.invalid> ha scritto:
> > > > > >
> > > > > > Hi Enrico,
> > > > > >
> > > > > > > We also need a way to represent this as a String or a byte[]
> > > > > >
> > > > > > We already have the `toByteArray` method, right?
> > > > >
> > > > > Yes, correct. So we are fine. I forgot about it and I answered too
> > > > quickly.
> > > > >
> > > > > I am not sure if this can be in the scope of this initiative, but we
> > > > > should somehow get rid of
> > > > > stuff like "fromByteArrayWithTopic" vs "fromByteArray".
> > > > >
> > > > > Thanks
> > > > > Enrico
> > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > Yunze
> > > > > >
> > > > > > On Tue, Nov 8, 2022 at 6:43 PM Enrico Olivelli <
> > eolive...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > Il giorno mar 8 nov 2022 alle ore 11:25 Yunze Xu
> > > > > > > <y...@streamnative.io.invalid> ha scritto:
> > > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > Currently we have the following 5 implementations of MessageId:
> > > > > > > >
> > > > > > > > - MessageIdImpl: (ledger id, entry id, partition index)
> > > > > > > >   - BatchMessageIdImpl: adds (batch index, batch size, acker),
> > > > where
> > > > > > > >     acker is a wrapper of a BitSet.
> > > > > > > >   - ChunkMessageIdImpl: adds another MessageIdImpl that
> > represents
> > > > > > > >     the first MessageIdImpl of a BitSet.
> > > > > > > >   - MultiMessageIdImpl: adds a map that maps the topic name to
> > the
> > > > > > > >     MessageId.
> > > > > > > > - TopicMessageIdImpl: adds the topic name and the partition
> > name
> > > > > > > >
> > > > > > > > These implementations are such a mess. For example, when users
> > get
> > > > a
> > > > > > > > MessageId from `Producer#send`:
> > > > > > > >
> > > > > > > > ```java
> > > > > > > > var id = producer.send("msg");
> > > > > > > > ```
> > > > > > > >
> > > > > > > > There is no getter to get some specific fields like ledger id.
> > You
> > > > can
> > > > > > > > only see a representation from `toString` method and got some
> > > > output
> > > > > > > > like "0:0:-1:0". Maybe this design is to hidden some details,
> > but
> > > > if
> > > > > > > > users don't know the details like ledger id and entry id, how
> > could
> > > > > > > > you know what does "0:0:-1:0" mean? What if
> > `MessageId#toString`'s
> > > > > > > > implementation changed? Should it be treated as a breaking
> > change?
> > > > > > > >
> > > > > > > > The original definition of the underlying MessageIdData is much
> > > > more
> > > > > > > > clear:
> > > > > > > >
> > > > > > > > ```proto
> > > > > > > > message MessageIdData {
> > > > > > > >     required uint64 ledgerId = 1;
> > > > > > > >     required uint64 entryId  = 2;
> > > > > > > >     optional int32 partition = 3 [default = -1];
> > > > > > > >     optional int32 batch_index = 4 [default = -1];
> > > > > > > >     repeated int64 ack_set = 5;
> > > > > > > >     optional int32 batch_size = 6;
> > > > > > > >
> > > > > > > >     // For the chunk message id, we need to specify the first
> > > > chunk message id.
> > > > > > > >     optional MessageIdData first_chunk_message_id = 7;
> > > > > > > > }
> > > > > > > > ```
> > > > > > > >
> > > > > > > > IMO, MessageId should be a wrapper of MessageIdData. It's more
> > > > natural
> > > > > > > > to have an interface like:
> > > > > > > >
> > > > > > > > ```java
> > > > > > > > interface MessageId {
> > > > > > > >     long ledgerId();
> > > > > > > >     long entryId();
> > > > > > > >     Optional<Integer> partition();
> > > > > > > >     Optional<Integer> batchIndex();
> > > > > > > >     // ...
> > > > > > > > ```
> > > > > > >
> > > > > > > This is very good for client applications.
> > > > > > > We also need a way to represent this as a String or a byte[],
> > this
> > > > way
> > > > > > > client applications have a standard way to store
> > > > > > > message offsets into an external system (for instance when you
> > want
> > > > to
> > > > > > > user the Reader API and keep track of the position by yourself)
> > > > > > >
> > > > > > > Enrico
> > > > > > >
> > > > > > > >
> > > > > > > > Additionally, there are many places that use only the triple of
> > > > > > > > (ledger id, entry id, batch index) as the key to represent the
> > > > position.
> > > > > > > > Currently, they are done by adding a conversion from
> > > > > > > > BatchMessageIdImpl to MessageIdImpl. However, it's more
> > intuitive
> > > > to
> > > > > > > > write something like:
> > > > > > > >
> > > > > > > > ```java
> > > > > > > > class MessageIdPosition implements
> > Comparable<MessageIdPosition> {
> > > > > > > >     private final MessageId messageId;
> > > > > > > >     // TODO: compare only the triple (ledger, entry, batch)
> > > > > > > > ```
> > > > > > > >
> > > > > > > > Therefore, I'm going to write a proposal to redesign the
> > MessageId
> > > > > > > > interface only by adding some getters. Regarding the 5 existing
> > > > > > > > implementations, I think we can drop them because they are a
> > part
> > > > > > > > of `pulsar-client`, not `pulsar-client-api`.
> > > > > > > >
> > > > > > > > Please feel free to share your points.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Yunze
> > > >
> >

Reply via email to