Hi Penghui, > Can we use `message_id` directly? It's a message ID from the source > cluster, it can be used > by the shadow topic feature but can also be used by other features in the > feature. >
Sure, `message_id` is better. > > there are not much difference to reset the cursor in persistent > replicator or > in the raw reader. > > Do you want to control it in the shadow topic? or by users? > Maybe we can make some changes to the replicator to support skip backlogs. > +1. We can add new configuration like `shadowReplicatorAutoResetBacklogEntries`. Once backlog entry number exceeded this threshold, the shadow replicator will reset the cursor to LATEST automatically. Thanks, Haiting On 2022/07/18 00:38:37 PengHui Li wrote: > Hi Haiting, > > > For this point, like I wrote in the reply to Asaf, I still think the cost > of > maintaining the consumers would be a little too high, especially the > lifecycle > management part. It's quite complicated in replicator already. > > Yes, I see it. Makes sense. Without the replicator, it looks like we are > trying to implement > a very similar function to replicate data. The only difference is which > broker to > maintain the consumer. > > Can we use `message_id` directly? It's a message ID from the source > cluster, it can be used > by the shadow topic feature but can also be used by other features in the > feature. > > > there are not much difference to reset the cursor in persistent > replicator or > in the raw reader. > > Do you want to control it in the shadow topic? or by users? > Maybe we can make some changes to the replicator to support skip backlogs. > > Thanks, > Penghui > > On Sun, Jul 17, 2022 at 10:57 PM Haiting Jiang <jianghait...@apache.org> > wrote: > > > Hi Penghui, > > > > > I had a conversation with Asaf, looks like we can use the RawReader for a > > > shadow topic internally to consume > > > messages from the original topic. So that we don't need to introduce > > > `shadow_message_id`, and don't need to introduce > > > changes to the replicator. > > > > For this point, like I wrote in the reply to Asaf, I still think the cost > > of > > maintaining the consumers would be a little too high, especially the > > lifecycle > > management part. It's quite complicated in replicator already. > > > > > > > > > > And using a persistent replicator might cause the very old messages to > > add > > > to the entry cache of the shadow topic? > > > Maybe there are some backlogs of the replicator. > > > > Yes, we should skip old backlogs for shadow replicator. But it seems that > > there are not much difference to reset the cursor in persistent replicator > > or > > in the raw reader. > > > > BR, > > Haiting > > > > > > On 2022/07/13 08:29:34 PengHui Li wrote: > > > Hi Haiting, > > > > > > I had a conversation with Asaf, looks like we can use the RawReader for a > > > shadow topic internally to consume > > > messages from the original topic. So that we don't need to introduce > > > `shadow_message_id`, and don't need to introduce > > > changes to the replicator. > > > > > > And using a persistent replicator might cause the very old messages to > > add > > > to the entry cache of the shadow topic? > > > Maybe there are some backlogs of the replicator. > > > > > > Thanks, > > > Penghui > > > > > > On Wed, Jul 13, 2022 at 4:16 PM Asaf Mesika <asaf.mes...@gmail.com> > > wrote: > > > > > > > Thanks for the detailed answer. I have a few follow-up questions: > > > > > > > > 1. Can you clarify how data is flowing exactly. Specifically, from > > source > > > > topic broker to shadow topic broker? I tried to follow the PIP again to > > > > understand that, especially the overview but couldn't figure it out. > > > > Also if you can while you're at it, how do you do flow control? I > > mean, you > > > > stream all messages from source topic to shadow topic broker - there > > will > > > > be some cases it will not have any more room in memory. > > > > Once data arrives at the shadow topic, where does it go exactly? > > > > > > > > 2. You wrote it was the easiest way to populate the entry cache, but > > there > > > > is a very big penalty to pay here: contaminating the protocol. > > > > The way I see it, the protocol should be sacred. > > > > Most importantly, the internal details of a broker-to-broker should > > never > > > > leak to the client. > > > > In this case, the Command Send which is what the producer's clients are > > > > using now has an extra field, called shadow_message_id. This field is > > an > > > > internal detail of broker-to-broker communication and thus should never > > > > leak outside. > > > > > > > > I'm not that fluent in the Pulsar codebase to give a decent > > alternative of > > > > implementation. I can only point out that this IMO is too high a price > > to > > > > pay. > > > > > > > > One possible solution comes to mind for brokers to broker RPC without > > > > interfering with the protocol: open a consumer in shadow topic broker > > and > > > > consume the messages and from there fill up the cache of managed > > ledger? > > > > > > > > > > > > 3. I understand that you want to minimize consumption latency, hence > > you're > > > > copying produced messages from the source topic broker topic cache > > (memory) > > > > to the shadow topic broker cache (memory). > > > > Since those produced messages are fairly new, if you are reading from > > BK, > > > > won't those messages appear in the BK cache? How bad percentage-wise is > > > > reading from BK cache relative to reading from Pulsar cache - both on a > > > > remote machine? > > > > > > > > Geo-replication, from my understanding, is a fairly simple idea: the > > source > > > > topic is in charge to push out messages (replicate) to a remote cluster > > > > topic, using the normal client (producer) for this. > > > > If I understand correctly, here, we re-use the replicator, which > > pushes out > > > > messages to a topic, but those messages won't really be written to BK. > > > > I'm afraid that creates a complicated code that will be hard to read > > later > > > > and ties up one area with another which will be very hard later to > > > > untangle. > > > > > > > > > > > > Thanks! > > > > > > > > > > > > On Tue, Jul 12, 2022 at 5:35 PM Haiting Jiang <jianghait...@gmail.com> > > > > wrote: > > > > > > > > > Hi Asaf, > > > > > > > > > > On 2022/07/11 13:08:52 Asaf Mesika wrote: > > > > > > On Thu, Jun 23, 2022 at 6:26 AM Haiting Jiang < > > jianghait...@apache.org > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi Asaf, > > > > > > > > > > > > > > > I did a quick reading and I couldn't understand the gist of > > this > > > > > change: > > > > > > > > The shadow topic doesn't really have it's own messages, or > > it's own > > > > > > > ledgers > > > > > > > > right? When it reads messages, it reads from the original topic > > > > > ledgers. > > > > > > > So > > > > > > > > the only thing you need to do is sync the "metadata" - ledgers > > > > list? > > > > > > > > > > > > > > Yes, mostly ledger id list and LAC of the last ledger. > > > > > > > > > > > > > > > > > > > > One question comes to mind here: Why not simply read the ledger > > > > > > > information > > > > > > > > from original topic, without copy? > > > > > > > > > > > > > > Yes, old ledger information will be read from metadata store when > > > > > > > ShadowManagedLedger initializes. The replicator is only for new > > > > > messages, > > > > > > > to > > > > > > > reduce the consume latency of subscription in shadow topic. And > > the > > > > > reason > > > > > > > we also replicates message data is to populates the entry cache > > when > > > > > shadow > > > > > > > topic have many active subscriptions. > > > > > > > > > > > > > > One optimization we can do is that, there would be not much help > > for > > > > > shadow > > > > > > > replicator to replicate message in backlog. We can come up with > > some > > > > > > > policy to > > > > > > > reset shadow replicator cursor in future PR. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm not sure I'm following you. > > > > > > What do you mean by old ledger information and new ledger > > information? > > > > > > > > > > > > > > > > The old and new is said relative to the moment of shadow topic > > > > > initialization. > > > > > Because shadow topic and its source topic are usually in different > > > > brokers, > > > > > when clients keep writing messages to the source topic, "new" ledgers > > > > will > > > > > be > > > > > created in source topic, and the shadow topic must have some way to > > know > > > > > the > > > > > "new" ledgers. > > > > > > > > > > > > > > > > What I'm trying to understand is: why do you need to copy the > > source > > > > > topic > > > > > > metadata: Ledgers ID list and LAC of the last ledger? Why can't you > > > > just > > > > > > use the original topic metadata? > > > > > > > > > > > > > > > > Maybe you are missing that the source topic and the shadow topic is > > > > > normally > > > > > not in the same broker? There won't be any way to just USE the > > original > > > > > topic > > > > > metadata. so RPC is always required. > > > > > > > > > > For ledgers id list: I am not copying all the ledger id list, these > > can > > > > be > > > > > read from metadata store (zk) directly. > > > > > > > > > > As for LAC, this info is only stored in source topic BK client and > > synced > > > > > to > > > > > BK server in a piggyback way. So updates shadow topic LAC with the > > > > message > > > > > id > > > > > brought by shadow replicator is an easy and direct way. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Another question - I couldn't understand why you need to > > change the > > > > > > > > protocol to introduce shadow message id. Can you please explain > > > > that > > > > > to > > > > > > > me? > > > > > > > > Is CommandSend used only internally between Pulsar Clusters or > > used > > > > > by a > > > > > > > > Pulsar Client? > > > > > > > > > > > > > > CommandSend is designed for pulsar producer client first, and > > > > > > > geo-replication > > > > > > > reuse producer client to replicate messages between pulsar > > clusters. > > > > > > > > > > > > > > The shadow message id contains the ledger id and entry id of this > > > > > message. > > > > > > > When shadow topic receive the message id, it is able to update > > > > > > > `lastConfirmedEntry` directly, so that subscription can consume > > this > > > > > this > > > > > > > new > > > > > > > message. > > > > > > > Also shadow topic can tell if the message is from shadow > > replicator > > > > and > > > > > > > reject > > > > > > > otherwise. > > > > > > > > > > > > > > > > > > > > I think the flow of information is the part I don't understand. > > > > > > > > > > > > In the PIP you write "The message sync procedure of shadow topic is > > > > > > supported by shadow replication, which is very like > > geo-replication, > > > > with > > > > > > these differences:" > > > > > > What I don't understand is that you write that this is a read-only > > > > topic, > > > > > > so why replicate/sync messages? > > > > > > > > > > > > > > > > The "read-only" is a bit mis-leading here, and this is the reason > > why I > > > > > introduced the concept of "shadow" topic. From a consumer's view, > > the > > > > > shadow > > > > > topic is not "read-only", there still will be new messages writing > > into > > > > > this > > > > > topic. But from a producer's view, the shadow topic can't be written > > > > > directly. All messages goes from source topic. Just like a shadow. > > > > > > > > > > > I managed to understand that you want to populate the BK entry > > cache of > > > > > the > > > > > > topic ledgers in the shadow topic broker. Instead of reading from > > BK > > > > and > > > > > > storing it in the cache, you favor copying from the source topic > > broker > > > > > > cache memory to the shadow topic broker cache. Is this to save the > > > > > > bandwidth of BK? I presume the most recent messages of BK would be > > in > > > > > > memory anyway, no? > > > > > > > > > > > > > > > > > > > > > > The reason I prefer to use copying is not to save bandwidth or disk > > io > > > > > usage of BK. > > > > > > > > > > The main concern here is to minimize consuming latency and > > implementation > > > > > complexity. > > > > > > > > > > For consuming latency, if we try to populte message from BK in shadow > > > > > topic, > > > > > then we must have a trigger to do so, and I can't think of any other > > ways > > > > > with > > > > > smaller consuming latency than this shadow replicator. > > > > > > > > > > For implementation complexity. Currently the entry cache in broker > > can > > > > only > > > > > be > > > > > populated by message producing. By reusing this write path, we can > > avoid > > > > > solving a lot of race conditions which maybe introduced by adding > > another > > > > > entry write path. And it's already stable and easy to maintain. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > Haiting > > > > > > > > > > > > > > On 2022/06/22 15:57:11 Asaf Mesika wrote: > > > > > > > > Hi, > > > > > > > > > > > > > > > > I did a quick reading and I couldn't understand the gist of > > this > > > > > change: > > > > > > > > The shadow topic doesn't really have it's own messages, or > > it's own > > > > > > > ledgers > > > > > > > > right? When it reads messages, it reads from the original topic > > > > > ledgers. > > > > > > > So > > > > > > > > the only thing you need to do is sync the "metadata" - ledgers > > > > list? > > > > > > > > One question comes to mind here: Why not simply read the ledger > > > > > > > information > > > > > > > > from original topic, without copy? > > > > > > > > > > > > > > > > Another question - I couldn't understand why you need to > > change the > > > > > > > > protocol to introduce shadow message id. Can you please explain > > > > that > > > > > to > > > > > > > me? > > > > > > > > Is CommandSend used only internally between Pulsar Clusters or > > used > > > > > by a > > > > > > > > Pulsar Client? > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Asaf > > > > > > > > > > > > > > > > On Tue, Jun 21, 2022 at 11:00 AM Haiting Jiang < > > > > > jianghait...@apache.org> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Pulsar community: > > > > > > > > > > > > > > > > > > I open a pip to discuss "Shadow Topic, an alternative way to > > > > > support > > > > > > > > > readonly topic ownership." > > > > > > > > > > > > > > > > > > Proposal Link: https://github.com/apache/pulsar/issues/16153 > > > > > > > > > > > > > > > > > > --- > > > > > > > > > > > > > > > > > > ## Motivation > > > > > > > > > > > > > > > > > > The motivation is the same as PIP-63[1], with a new > > broadcast use > > > > > case > > > > > > > of > > > > > > > > > supporting 100K subscriptions in a single topic. > > > > > > > > > 1. The bandwidth of a broker limits the number of > > subscriptions > > > > for > > > > > a > > > > > > > > > single > > > > > > > > > topic. > > > > > > > > > 2. Subscriptions are competing for the network bandwidth on > > > > > brokers. > > > > > > > > > Different > > > > > > > > > subscriptions might have different levels of severity. > > > > > > > > > 3. When synchronizing cross-city message reading, cross-city > > > > access > > > > > > > needs > > > > > > > > > to > > > > > > > > > be minimized. > > > > > > > > > 4. [New] Broadcast with 100K subscriptions. There is a > > limitation > > > > > of > > > > > > > the > > > > > > > > > subscription number of a single topic. It's tested by > > Hongjie > > > > > from > > > > > > > NTT > > > > > > > > > Lab > > > > > > > > > that with 40K subscriptions in a single topic, the client > > > > needs > > > > > > > about > > > > > > > > > 20min > > > > > > > > > to start all client connections, and under 1 msg/s message > > > > > producer > > > > > > > > > rate, > > > > > > > > > the average end to end latency is about 2.9s. And for 100K > > > > > > > > > subscriptions, > > > > > > > > > the time of start connection and E2E latency is beyond > > > > > > > consideration. > > > > > > > > > > > > > > > > > > However, it's too complicated to implement with original > > PIP-63 > > > > > > > proposal, > > > > > > > > > the > > > > > > > > > changed code is already over 3K+ lines, see PR#11960[2], and > > > > there > > > > > are > > > > > > > > > still > > > > > > > > > some problems left, > > > > > > > > > 1. The LAC in readonly topic is updated in a polling pattern, > > > > which > > > > > > > > > increases > > > > > > > > > the bookie load bookie. > > > > > > > > > 2. The message data of readonly topic won't be cached in > > broker. > > > > > > > Increase > > > > > > > > > the > > > > > > > > > network usage between broker and bookie when there are > > more > > > > than > > > > > one > > > > > > > > > subscriber is tail-reading. > > > > > > > > > 3. All the subscriptions is managed in original > > writable-topic, > > > > so > > > > > the > > > > > > > > > support > > > > > > > > > max subscription number is not scaleable. > > > > > > > > > > > > > > > > > > This PIP tries to come up with a simpler solution to support > > > > > readonly > > > > > > > topic > > > > > > > > > ownership and solve the problems the previous PR left. The > > main > > > > > idea of > > > > > > > > > this > > > > > > > > > solution is to reuse the feature of geo-replication, but > > instead > > > > of > > > > > > > > > duplicating storage, it shares underlying bookie ledgers > > between > > > > > > > different > > > > > > > > > topics. > > > > > > > > > > > > > > > > > > ## Goal > > > > > > > > > > > > > > > > > > The goal is to introduce **Shadow Topic** as a new type of > > topic > > > > to > > > > > > > support > > > > > > > > > readonly topic ownership. Just as its name implies, a shadow > > > > topic > > > > > is > > > > > > > the > > > > > > > > > shadow of some normal persistent topic (let's call it source > > > > topic > > > > > > > here). > > > > > > > > > The > > > > > > > > > source topic and the shadow topic must have the same number > > of > > > > > > > partitions > > > > > > > > > or > > > > > > > > > both non-partitioned. Multiply shadow topics can be created > > from > > > > a > > > > > > > source > > > > > > > > > topic. > > > > > > > > > > > > > > > > > > Shadow topic shares the underlying bookie ledgers from its > > source > > > > > > > topic. > > > > > > > > > User > > > > > > > > > can't produce any messages to shadow topic directly and > > shadow > > > > > topic > > > > > > > don't > > > > > > > > > create any new ledger for messages, all messages in shadow > > topic > > > > > come > > > > > > > from > > > > > > > > > source topic. > > > > > > > > > > > > > > > > > > Shadow topic have its own subscriptions and don't share with > > its > > > > > source > > > > > > > > > topic. > > > > > > > > > This means the shadow topic have its own cursor ledger to > > store > > > > > > > persistent > > > > > > > > > mark-delete info for each persistent subscriptions. > > > > > > > > > > > > > > > > > > The message sync procedure of shadow topic is supported by > > shadow > > > > > > > > > replication, > > > > > > > > > which is very like geo-replication, with these difference: > > > > > > > > > 1. Geo-replication only works between topic with the same > > name in > > > > > > > different > > > > > > > > > broker clusters. But shadow topic have no naming > > limitation > > > > and > > > > > they > > > > > > > > > can be > > > > > > > > > in the same cluster. > > > > > > > > > 2. Geo-replication duplicates data storage, but shadow topic > > > > don't. > > > > > > > > > 3. Geo-replication replicates data from each other, it's > > > > > > > bidirectional, but > > > > > > > > > shadow replication only have one way data flow. > > > > > > > > > > > > > > > > > > > > > > > > > > > ## API Changes > > > > > > > > > > > > > > > > > > 1. PulsarApi.proto. > > > > > > > > > > > > > > > > > > Shadow topic need to know the original message id of the > > > > replicated > > > > > > > > > messages, > > > > > > > > > in order to update new ledger and lac. So we need add a > > > > > > > > > `shadow_message_id` in > > > > > > > > > CommandSend for replicator. > > > > > > > > > > > > > > > > > > ``` > > > > > > > > > message CommandSend { // ... // message id for shadow topic > > > > > optional > > > > > > > > > MessageIdData shadow_message_id = 9; } > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > 2. Admin API for creating shadow topic with source topic > > > > > > > > > ``` > > > > > > > > > admin.topics().createShadowTopic(source-topic-name, > > > > > > > shadow-topic-name) > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > ## Implementation > > > > > > > > > > > > > > > > > > A picture showing key components relations is added in github > > > > issue > > > > > > > [3]. > > > > > > > > > > > > > > > > > > There are two key changes for implementation. > > > > > > > > > 1. How to replicate messages to shadow topics. > > > > > > > > > 2. How shadow topic manage shared ledgers info. > > > > > > > > > > > > > > > > > > ### 1. How to replicate messages to shadow topics. > > > > > > > > > > > > > > > > > > This part is mostly implemented by `ShadowReplicator`, which > > > > > extends > > > > > > > > > `PersistentReplicator` introduced in geo-replication. The > > shadow > > > > > topic > > > > > > > list > > > > > > > > > is added as a new topic policy of the source topic. Source > > topic > > > > > > > manage the > > > > > > > > > lifecycle of all the replicators. The key is to add > > > > > `shadow_message_id` > > > > > > > > > when > > > > > > > > > produce message to shadow topics. > > > > > > > > > > > > > > > > > > ### 2. How shadow topic manage shared ledgers info. > > > > > > > > > > > > > > > > > > This part is mostly implemented by `ShadowManagedLedger`, > > which > > > > > extends > > > > > > > > > current `ManagedLedgerImpl` with two key override methods. > > > > > > > > > > > > > > > > > > 1. `initialize(..)` > > > > > > > > > a. Fetch ManagedLedgerInfo of source topic instead of current > > > > > shadow > > > > > > > topic. > > > > > > > > > The source topic name is stored in the topic policy of the > > > > > shadow > > > > > > > topic. > > > > > > > > > b. Open the last ledger and read the explicit LAC from > > bookie, > > > > > instead > > > > > > > of > > > > > > > > > creating new ledger. Reading LAC here requires that the > > source > > > > > topic > > > > > > > > > must > > > > > > > > > enable explicit LAC feature by set > > > > > > > > > `bookkeeperExplicitLacIntervalInMills` > > > > > > > > > to non-zero value in broker.conf. > > > > > > > > > c. Do not start checkLedgerRollTask, which tries roll over > > ledger > > > > > > > > > periodically > > > > > > > > > > > > > > > > > > 2. `internalAsyncAddEntry()` Instead of write entry data to > > > > bookie, > > > > > It > > > > > > > only > > > > > > > > > update metadata of ledgers, like `currentLedger`, > > > > > > > `lastConfirmedEntry` > > > > > > > > > and > > > > > > > > > put the replicated message into `EntryCache`. > > > > > > > > > > > > > > > > > > Besides, some other problems need to be taken care of. > > > > > > > > > - Any ledger metadata updates need to be synced to shadow > > topic, > > > > > > > including > > > > > > > > > ledger offloading or ledger deletion. Shadow topic needs to > > > > watch > > > > > the > > > > > > > > > ledger > > > > > > > > > info updates with metadata store and update in time. > > > > > > > > > - The local cached LAC of `LedgerHandle` won't updated in > > time, > > > > so > > > > > we > > > > > > > need > > > > > > > > > refresh LAC when a managed cursor requests entries beyond > > known > > > > > LAC. > > > > > > > > > > > > > > > > > > ## Reject Alternatives > > > > > > > > > > > > > > > > > > See PIP-63[1]. > > > > > > > > > > > > > > > > > > ## Reference > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support > > > > > > > > > [2] https://github.com/apache/pulsar/pull/11960 > > > > > > > > > [3] https://github.com/apache/pulsar/issues/16153 > > > > > > > > > > > > > > > > > > > > > > > > > > > BR, > > > > > > > > > Haiting Jiang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > BR, > > > > > Haiting > > > > > > > > > > > > > > >