Hi Asaf, On 2022/07/11 13:08:52 Asaf Mesika wrote: > On Thu, Jun 23, 2022 at 6:26 AM Haiting Jiang <jianghait...@apache.org> > wrote: > > > Hi Asaf, > > > > > I did a quick reading and I couldn't understand the gist of this change: > > > The shadow topic doesn't really have it's own messages, or it's own > > ledgers > > > right? When it reads messages, it reads from the original topic ledgers. > > So > > > the only thing you need to do is sync the "metadata" - ledgers list? > > > > Yes, mostly ledger id list and LAC of the last ledger. > > > > > One question comes to mind here: Why not simply read the ledger > > information > > > from original topic, without copy? > > > > Yes, old ledger information will be read from metadata store when > > ShadowManagedLedger initializes. The replicator is only for new messages, > > to > > reduce the consume latency of subscription in shadow topic. And the reason > > we also replicates message data is to populates the entry cache when shadow > > topic have many active subscriptions. > > > > One optimization we can do is that, there would be not much help for shadow > > replicator to replicate message in backlog. We can come up with some > > policy to > > reset shadow replicator cursor in future PR. > > >
> I'm not sure I'm following you. > What do you mean by old ledger information and new ledger information? > The old and new is said relative to the moment of shadow topic initialization. Because shadow topic and its source topic are usually in different brokers, when clients keep writing messages to the source topic, "new" ledgers will be created in source topic, and the shadow topic must have some way to know the "new" ledgers. > What I'm trying to understand is: why do you need to copy the source topic > metadata: Ledgers ID list and LAC of the last ledger? Why can't you just > use the original topic metadata? > Maybe you are missing that the source topic and the shadow topic is normally not in the same broker? There won't be any way to just USE the original topic metadata. so RPC is always required. For ledgers id list: I am not copying all the ledger id list, these can be read from metadata store (zk) directly. As for LAC, this info is only stored in source topic BK client and synced to BK server in a piggyback way. So updates shadow topic LAC with the message id brought by shadow replicator is an easy and direct way. > > > > > > > Another question - I couldn't understand why you need to change the > > > protocol to introduce shadow message id. Can you please explain that to > > me? > > > Is CommandSend used only internally between Pulsar Clusters or used by a > > > Pulsar Client? > > > > CommandSend is designed for pulsar producer client first, and > > geo-replication > > reuse producer client to replicate messages between pulsar clusters. > > > > The shadow message id contains the ledger id and entry id of this message. > > When shadow topic receive the message id, it is able to update > > `lastConfirmedEntry` directly, so that subscription can consume this this > > new > > message. > > Also shadow topic can tell if the message is from shadow replicator and > > reject > > otherwise. > > > > > I think the flow of information is the part I don't understand. > > In the PIP you write "The message sync procedure of shadow topic is > supported by shadow replication, which is very like geo-replication, with > these differences:" > What I don't understand is that you write that this is a read-only topic, > so why replicate/sync messages? > The "read-only" is a bit mis-leading here, and this is the reason why I introduced the concept of "shadow" topic. From a consumer's view, the shadow topic is not "read-only", there still will be new messages writing into this topic. But from a producer's view, the shadow topic can't be written directly. All messages goes from source topic. Just like a shadow. > I managed to understand that you want to populate the BK entry cache of the > topic ledgers in the shadow topic broker. Instead of reading from BK and > storing it in the cache, you favor copying from the source topic broker > cache memory to the shadow topic broker cache. Is this to save the > bandwidth of BK? I presume the most recent messages of BK would be in > memory anyway, no? > > The reason I prefer to use copying is not to save bandwidth or disk io usage of BK. The main concern here is to minimize consuming latency and implementation complexity. For consuming latency, if we try to populte message from BK in shadow topic, then we must have a trigger to do so, and I can't think of any other ways with smaller consuming latency than this shadow replicator. For implementation complexity. Currently the entry cache in broker can only be populated by message producing. By reusing this write path, we can avoid solving a lot of race conditions which maybe introduced by adding another entry write path. And it's already stable and easy to maintain. > > > > Thanks, > > Haiting > > > > On 2022/06/22 15:57:11 Asaf Mesika wrote: > > > Hi, > > > > > > I did a quick reading and I couldn't understand the gist of this change: > > > The shadow topic doesn't really have it's own messages, or it's own > > ledgers > > > right? When it reads messages, it reads from the original topic ledgers. > > So > > > the only thing you need to do is sync the "metadata" - ledgers list? > > > One question comes to mind here: Why not simply read the ledger > > information > > > from original topic, without copy? > > > > > > Another question - I couldn't understand why you need to change the > > > protocol to introduce shadow message id. Can you please explain that to > > me? > > > Is CommandSend used only internally between Pulsar Clusters or used by a > > > Pulsar Client? > > > > > > Thanks, > > > > > > Asaf > > > > > > On Tue, Jun 21, 2022 at 11:00 AM Haiting Jiang < jianghait...@apache.org> > > > wrote: > > > > > > > Hi Pulsar community: > > > > > > > > I open a pip to discuss "Shadow Topic, an alternative way to support > > > > readonly topic ownership." > > > > > > > > Proposal Link: https://github.com/apache/pulsar/issues/16153 > > > > > > > > --- > > > > > > > > ## Motivation > > > > > > > > The motivation is the same as PIP-63[1], with a new broadcast use case > > of > > > > supporting 100K subscriptions in a single topic. > > > > 1. The bandwidth of a broker limits the number of subscriptions for a > > > > single > > > > topic. > > > > 2. Subscriptions are competing for the network bandwidth on brokers. > > > > Different > > > > subscriptions might have different levels of severity. > > > > 3. When synchronizing cross-city message reading, cross-city access > > needs > > > > to > > > > be minimized. > > > > 4. [New] Broadcast with 100K subscriptions. There is a limitation of > > the > > > > subscription number of a single topic. It's tested by Hongjie from > > NTT > > > > Lab > > > > that with 40K subscriptions in a single topic, the client needs > > about > > > > 20min > > > > to start all client connections, and under 1 msg/s message producer > > > > rate, > > > > the average end to end latency is about 2.9s. And for 100K > > > > subscriptions, > > > > the time of start connection and E2E latency is beyond > > consideration. > > > > > > > > However, it's too complicated to implement with original PIP-63 > > proposal, > > > > the > > > > changed code is already over 3K+ lines, see PR#11960[2], and there are > > > > still > > > > some problems left, > > > > 1. The LAC in readonly topic is updated in a polling pattern, which > > > > increases > > > > the bookie load bookie. > > > > 2. The message data of readonly topic won't be cached in broker. > > Increase > > > > the > > > > network usage between broker and bookie when there are more than one > > > > subscriber is tail-reading. > > > > 3. All the subscriptions is managed in original writable-topic, so the > > > > support > > > > max subscription number is not scaleable. > > > > > > > > This PIP tries to come up with a simpler solution to support readonly > > topic > > > > ownership and solve the problems the previous PR left. The main idea of > > > > this > > > > solution is to reuse the feature of geo-replication, but instead of > > > > duplicating storage, it shares underlying bookie ledgers between > > different > > > > topics. > > > > > > > > ## Goal > > > > > > > > The goal is to introduce **Shadow Topic** as a new type of topic to > > support > > > > readonly topic ownership. Just as its name implies, a shadow topic is > > the > > > > shadow of some normal persistent topic (let's call it source topic > > here). > > > > The > > > > source topic and the shadow topic must have the same number of > > partitions > > > > or > > > > both non-partitioned. Multiply shadow topics can be created from a > > source > > > > topic. > > > > > > > > Shadow topic shares the underlying bookie ledgers from its source > > topic. > > > > User > > > > can't produce any messages to shadow topic directly and shadow topic > > don't > > > > create any new ledger for messages, all messages in shadow topic come > > from > > > > source topic. > > > > > > > > Shadow topic have its own subscriptions and don't share with its source > > > > topic. > > > > This means the shadow topic have its own cursor ledger to store > > persistent > > > > mark-delete info for each persistent subscriptions. > > > > > > > > The message sync procedure of shadow topic is supported by shadow > > > > replication, > > > > which is very like geo-replication, with these difference: > > > > 1. Geo-replication only works between topic with the same name in > > different > > > > broker clusters. But shadow topic have no naming limitation and they > > > > can be > > > > in the same cluster. > > > > 2. Geo-replication duplicates data storage, but shadow topic don't. > > > > 3. Geo-replication replicates data from each other, it's > > bidirectional, but > > > > shadow replication only have one way data flow. > > > > > > > > > > > > ## API Changes > > > > > > > > 1. PulsarApi.proto. > > > > > > > > Shadow topic need to know the original message id of the replicated > > > > messages, > > > > in order to update new ledger and lac. So we need add a > > > > `shadow_message_id` in > > > > CommandSend for replicator. > > > > > > > > ``` > > > > message CommandSend { // ... // message id for shadow topic optional > > > > MessageIdData shadow_message_id = 9; } > > > > ``` > > > > > > > > 2. Admin API for creating shadow topic with source topic > > > > ``` > > > > admin.topics().createShadowTopic(source-topic-name, > > shadow-topic-name) > > > > ``` > > > > > > > > ## Implementation > > > > > > > > A picture showing key components relations is added in github issue > > [3]. > > > > > > > > There are two key changes for implementation. > > > > 1. How to replicate messages to shadow topics. > > > > 2. How shadow topic manage shared ledgers info. > > > > > > > > ### 1. How to replicate messages to shadow topics. > > > > > > > > This part is mostly implemented by `ShadowReplicator`, which extends > > > > `PersistentReplicator` introduced in geo-replication. The shadow topic > > list > > > > is added as a new topic policy of the source topic. Source topic > > manage the > > > > lifecycle of all the replicators. The key is to add `shadow_message_id` > > > > when > > > > produce message to shadow topics. > > > > > > > > ### 2. How shadow topic manage shared ledgers info. > > > > > > > > This part is mostly implemented by `ShadowManagedLedger`, which extends > > > > current `ManagedLedgerImpl` with two key override methods. > > > > > > > > 1. `initialize(..)` > > > > a. Fetch ManagedLedgerInfo of source topic instead of current shadow > > topic. > > > > The source topic name is stored in the topic policy of the shadow > > topic. > > > > b. Open the last ledger and read the explicit LAC from bookie, instead > > of > > > > creating new ledger. Reading LAC here requires that the source topic > > > > must > > > > enable explicit LAC feature by set > > > > `bookkeeperExplicitLacIntervalInMills` > > > > to non-zero value in broker.conf. > > > > c. Do not start checkLedgerRollTask, which tries roll over ledger > > > > periodically > > > > > > > > 2. `internalAsyncAddEntry()` Instead of write entry data to bookie, It > > only > > > > update metadata of ledgers, like `currentLedger`, > > `lastConfirmedEntry` > > > > and > > > > put the replicated message into `EntryCache`. > > > > > > > > Besides, some other problems need to be taken care of. > > > > - Any ledger metadata updates need to be synced to shadow topic, > > including > > > > ledger offloading or ledger deletion. Shadow topic needs to watch the > > > > ledger > > > > info updates with metadata store and update in time. > > > > - The local cached LAC of `LedgerHandle` won't updated in time, so we > > need > > > > refresh LAC when a managed cursor requests entries beyond known LAC. > > > > > > > > ## Reject Alternatives > > > > > > > > See PIP-63[1]. > > > > > > > > ## Reference > > > > [1] > > > > > > https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support > > > > [2] https://github.com/apache/pulsar/pull/11960 > > > > [3] https://github.com/apache/pulsar/issues/16153 > > > > > > > > > > > > BR, > > > > Haiting Jiang > > > > > > > > > > BR, Haiting