FWIW, the Flink Pulsar connector hacky parses the message id internals to get the next message id: https://github.com/apache/flink/blob/421f057a7488fd64854a82424755f76b89561a0b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java
Enrico Olivelli <eolive...@gmail.com>于2022年11月10日 周四01:03写道: > After reading Joe's comments I have changed my mind. > Actually it is better to not expose "ledgerId" and "entryId" to client > applications. > They are useless pieces of information. > And also if in the future we want to change the way we internally > address a message we will always have to support these fields. > > toByteArray() is enough for an application to save the ID into an > external database and then to recover a Subscription (or a Reader) > from a certain point. > toString() is good only for debug/logs, we can change it but it is > better to not touch it and add "tests" > > It is better that the MessageId API stays as opaque as possible. > > Enrico > > > Il giorno mer 9 nov 2022 alle ore 15:50 Yunze Xu > <y...@streamnative.io.invalid> ha scritto: > > > > Hi Jiaqi, > > > > > I don't think `tostring` should be used in any serious case because it > has > > no standard. > > > > I agree. But it's better to keep it not changed. Just like my previous > reply, it > > might be a de-facto standard because the `toString()` like methods are > used > > in logging, not only for debugging. For example, there is a > getLastMessageId > > API in consumer and users might log the last message ID. > > > > Different representations across different languages is not a big issue > but it > > could make users and administrators confused. > > > > Here is an example that the C++ client supports constructing a MessageId > with > > 4 arguments, but the 1st argument is the partition, not the ledger id. > > However, the > > string representation is still > > "<ledger-id>:<entry-id>:<partition>:<batch-index>". Though > > in Java client a non-batched message ID doesn't have the > > ":<batch-index>" suffix. > > > > Thanks, > > Yunze > > > > On Wed, Nov 9, 2022 at 9:13 PM Jiaqi Shen <gleiphir2...@gmail.com> > wrote: > > > > > > Thanks, this is very inspiring to me. > > > > > > But I have a different opinion on `tostring`. > > > > > > >>You can only see a representation from `toString` method and got some > > > output like "0:0:-1:0". > > > > > > I don't think `tostring` should be used in any serious case because it > has > > > no standard. There is no constraint on how the messageId should be > > > converted to a string. For example, in go client, `tostring` is not > being > > > supported now. If go client should implement a `tostring` method, does > go > > > client' s`tostring` must follow the java implement like "0:0:-1:0"? > > > > > > If user do need a string/[]byte to record a messageId, `toByteArray` > will > > > be enough. In user side, most of the time , I think users don't really > care > > > about the "messageId string" is meaningful. I think `tostring` only > should > > > be used in debug. > > > > > > Thanks, > > > Jiaqi Shen > > > > > > > > > Joe F <joefranc...@gmail.com> 于2022年11月9日周三 20:25写道: > > > > > > > Messageid is an identifier which identifies a message. How that id > is > > > > constructed, or what it contains should not matter to an > application, and > > > > an application should not assume anything about the implementation > of that > > > > id. > > > > > > > > >What about the partition index? We have a `TopicMetadata` interface > that > > > > returns the number of partitions. > > > > > > > > Partitioning is a first class concept, and is designed to be used by > > > > application. How a partition is implemented should not be used by > the > > > > application . > > > > > > > > [ People violate this all the time, and I regret that Pulsar did not > > > > provide get_Nth_topicpartion(), which led to people hardcoding it as > > > > topicname-N. and using that directly. Now we are stuck with it.] > > > > > > > > Similarly batch index and batch size. Those are all logical concepts > > > > exposed to the user. For eg: batch size is something the app is > allowed to > > > > tune > > > > > > > > >Even for ledger id and entry id, this pair represents a logic > storage > > > > position like the offset concept in Kafka > > > > These are not equivalent. In Pulsar these are implementation > details, > > > > while in Kafka those are logical concepts. > > > > > > > > One might think that these are logical concepts in Pulsar, because > if you > > > > reverse engineer the current msgid implementation, you observe some > > > > "properties". > > > > > > > > Ledger id/entry id are logical concepts in __Bookkeeper__ , not in > Pulsar. > > > > There is the Managed Ledger abstraction on top of BK, and then there > is > > > > Pulsar on top of ML. You will break two levels of abstraction to > expose > > > > ledger/entryid to an application > > > > > > > > An application should only care about the operations that can be > done > > > > with a messageId > > > > > > > > - getmsgid() to return the message id as an opaque object > > > > > > > > [Operators using one messageId ] > > > > -serde, like tostring(). for storage/retrieval of message > identifier > > > > -getter/setter on logical properties of the message (partition id > etc...) > > > > -increment/decrement > > > > > > > > [Operators that take multiple messageIds] > > > > -comparator > > > > -range > > > > > > > > Those are the kind of operators Pulsar should provide to a user. > > > > Applications should not implement these operators on their own by > reverse > > > > engineering the msgId. No application should be directly using > ledgerid or > > > > entryid for doing anything (math or logic), > > > > > > > > As long as Pulsar provides these operations with msgid to the > > > > application, it should not care whether it's represented as > "0:1:-1:-1" > > > > or "a:b:-b-b", or "#xba4231!haxcy1826923f" or as a serialized > binary > > > > object or..whatever it may be. > > > > > > > > >>But it would be harder to know a tuple like "0:1:-1:-1" means. > > > > > > > > A user shouldn't have to know what this means. That's the point. > > > > > > > > Pulsar itself changed the messageId multiple times as it added > > > > partitioning, batching and so on, and it might do so again. And > bookkeeper > > > > could change its representation of ledgers, (for eg, to uuids and > byte > > > > offsets) ML could replace BK with something else (for eg. a table > in a > > > > db.) Anything is possible - Pulsar would then just have to change > the > > > > implementation of the operator functions, and no application needs > to be > > > > rewritten. > > > > > > > > -j > > > > > > > > On Tue, Nov 8, 2022 at 6:05 PM Yunze Xu <y...@streamnative.io.invalid > > > > > > wrote: > > > > > > > > > Hi Joe, > > > > > > > > > > Then what would we expect users to do with the MessageId? It > should only > > > > > be passed to Consumer#seek or ReaderBuilder#startMessageId? > > > > > > > > > > What about the partition index? We have a `TopicMetadata` > interface that > > > > > returns > > > > > the number of partitions. If the partition is also "implementation > > > > > details", should we expose > > > > > this interface? Or should we support customizing a MessageRouter > because > > > > it > > > > > returns the partition index? > > > > > > > > > > What about the batch index and batch size? For example, we have an > > > > > enableBatchIndexAcknowledgment method to enable batch index ACK. > If batch > > > > > index is also "implementation details", how could users know what > does > > > > > "batch > > > > > index ack" mean? > > > > > > > > > > Even for ledger id and entry id, this pair represents a logic > storage > > > > > position like the offset > > > > > concept in Kafka (though each offset represents a message while > each > > > > > entry represents > > > > > a batch). If you see the Message API, it also exposes many > attributes. > > > > > IMO, for the > > > > > MessageIdData, only the ack_set (a long array serialized from the > > > > > BitSet) is the implementation > > > > > detail. > > > > > > > > > > The MessageId API should be flexible, not an abstract one. If not, > why > > > > > do we still implement > > > > > the toString() method? We should not encourage users to print the > > > > > MessageId. It would > > > > > be easy to know what "ledger is 0, entry id is 1" means, users only > > > > > need to know the concepts > > > > > of ledger id and entry id. But it would be harder to know a tuple > like > > > > > "0:1:-1:-1" means. > > > > > > > > > > Thanks, > > > > > Yunze > > > > > > > > > > On Tue, Nov 8, 2022 at 11:16 PM Joe F <joefranc...@gmail.com> > wrote: > > > > > > > > > > > > >Maybe this design is to hidden some details, but if > > > > > > users don't know the details like ledger id and entry id, how > could > > > > > > you know what does "0:0:-1:0" mean? > > > > > > > > > > > > Abstractions exist for a reason. Ledgerid and entryid are > > > > > implementation > > > > > > details, and an application should not be interpreting that at > all. > > > > > > -j > > > > > > > > > > > > > > > > > > On Tue, Nov 8, 2022 at 3:43 AM Yunze Xu > <y...@streamnative.io.invalid> > > > > > > wrote: > > > > > > > > > > > > > I didn't look into these two methods at the moment. But I > think it's > > > > > > > possible to > > > > > > > retain only the `fromByteArray`. > > > > > > > > > > > > > > Thanks, > > > > > > > Yunze > > > > > > > > > > > > > > On Tue, Nov 8, 2022 at 7:02 PM Enrico Olivelli < > eolive...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > Il giorno mar 8 nov 2022 alle ore 11:52 Yunze Xu > > > > > > > > <y...@streamnative.io.invalid> ha scritto: > > > > > > > > > > > > > > > > > > Hi Enrico, > > > > > > > > > > > > > > > > > > > We also need a way to represent this as a String or a > byte[] > > > > > > > > > > > > > > > > > > We already have the `toByteArray` method, right? > > > > > > > > > > > > > > > > Yes, correct. So we are fine. I forgot about it and I > answered too > > > > > > > quickly. > > > > > > > > > > > > > > > > I am not sure if this can be in the scope of this > initiative, but > > > > we > > > > > > > > should somehow get rid of > > > > > > > > stuff like "fromByteArrayWithTopic" vs "fromByteArray". > > > > > > > > > > > > > > > > Thanks > > > > > > > > Enrico > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Yunze > > > > > > > > > > > > > > > > > > On Tue, Nov 8, 2022 at 6:43 PM Enrico Olivelli < > > > > > eolive...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > Il giorno mar 8 nov 2022 alle ore 11:25 Yunze Xu > > > > > > > > > > <y...@streamnative.io.invalid> ha scritto: > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > Currently we have the following 5 implementations of > > > > MessageId: > > > > > > > > > > > > > > > > > > > > > > - MessageIdImpl: (ledger id, entry id, partition index) > > > > > > > > > > > - BatchMessageIdImpl: adds (batch index, batch size, > > > > acker), > > > > > > > where > > > > > > > > > > > acker is a wrapper of a BitSet. > > > > > > > > > > > - ChunkMessageIdImpl: adds another MessageIdImpl that > > > > > represents > > > > > > > > > > > the first MessageIdImpl of a BitSet. > > > > > > > > > > > - MultiMessageIdImpl: adds a map that maps the topic > name > > > > to > > > > > the > > > > > > > > > > > MessageId. > > > > > > > > > > > - TopicMessageIdImpl: adds the topic name and the > partition > > > > > name > > > > > > > > > > > > > > > > > > > > > > These implementations are such a mess. For example, > when > > > > users > > > > > get > > > > > > > a > > > > > > > > > > > MessageId from `Producer#send`: > > > > > > > > > > > > > > > > > > > > > > ```java > > > > > > > > > > > var id = producer.send("msg"); > > > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > > > > > There is no getter to get some specific fields like > ledger > > > > id. > > > > > You > > > > > > > can > > > > > > > > > > > only see a representation from `toString` method and > got some > > > > > > > output > > > > > > > > > > > like "0:0:-1:0". Maybe this design is to hidden some > details, > > > > > but > > > > > > > if > > > > > > > > > > > users don't know the details like ledger id and entry > id, how > > > > > could > > > > > > > > > > > you know what does "0:0:-1:0" mean? What if > > > > > `MessageId#toString`'s > > > > > > > > > > > implementation changed? Should it be treated as a > breaking > > > > > change? > > > > > > > > > > > > > > > > > > > > > > The original definition of the underlying > MessageIdData is > > > > much > > > > > > > more > > > > > > > > > > > clear: > > > > > > > > > > > > > > > > > > > > > > ```proto > > > > > > > > > > > message MessageIdData { > > > > > > > > > > > required uint64 ledgerId = 1; > > > > > > > > > > > required uint64 entryId = 2; > > > > > > > > > > > optional int32 partition = 3 [default = -1]; > > > > > > > > > > > optional int32 batch_index = 4 [default = -1]; > > > > > > > > > > > repeated int64 ack_set = 5; > > > > > > > > > > > optional int32 batch_size = 6; > > > > > > > > > > > > > > > > > > > > > > // For the chunk message id, we need to specify > the first > > > > > > > chunk message id. > > > > > > > > > > > optional MessageIdData first_chunk_message_id = 7; > > > > > > > > > > > } > > > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > > > > > IMO, MessageId should be a wrapper of MessageIdData. > It's > > > > more > > > > > > > natural > > > > > > > > > > > to have an interface like: > > > > > > > > > > > > > > > > > > > > > > ```java > > > > > > > > > > > interface MessageId { > > > > > > > > > > > long ledgerId(); > > > > > > > > > > > long entryId(); > > > > > > > > > > > Optional<Integer> partition(); > > > > > > > > > > > Optional<Integer> batchIndex(); > > > > > > > > > > > // ... > > > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > > > This is very good for client applications. > > > > > > > > > > We also need a way to represent this as a String or a > byte[], > > > > > this > > > > > > > way > > > > > > > > > > client applications have a standard way to store > > > > > > > > > > message offsets into an external system (for instance > when you > > > > > want > > > > > > > to > > > > > > > > > > user the Reader API and keep track of the position by > yourself) > > > > > > > > > > > > > > > > > > > > Enrico > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Additionally, there are many places that use only the > triple > > > > of > > > > > > > > > > > (ledger id, entry id, batch index) as the key to > represent > > > > the > > > > > > > position. > > > > > > > > > > > Currently, they are done by adding a conversion from > > > > > > > > > > > BatchMessageIdImpl to MessageIdImpl. However, it's more > > > > > intuitive > > > > > > > to > > > > > > > > > > > write something like: > > > > > > > > > > > > > > > > > > > > > > ```java > > > > > > > > > > > class MessageIdPosition implements > > > > > Comparable<MessageIdPosition> { > > > > > > > > > > > private final MessageId messageId; > > > > > > > > > > > // TODO: compare only the triple (ledger, entry, > batch) > > > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > > > > > Therefore, I'm going to write a proposal to redesign > the > > > > > MessageId > > > > > > > > > > > interface only by adding some getters. Regarding the 5 > > > > existing > > > > > > > > > > > implementations, I think we can drop them because they > are a > > > > > part > > > > > > > > > > > of `pulsar-client`, not `pulsar-client-api`. > > > > > > > > > > > > > > > > > > > > > > Please feel free to share your points. > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Yunze > > > > > > > > > > > > > > > > > -- Best, tison.