Hi Penghui, > One question about the schema. > How can the consumer get the schema from the shadow topic during > consumption? > We should add this part in the proposal. >
Thanks for the reminding. >From what I see, Schema is part of a topic's metadata. So shadow topic won't have it's own schema, but it shares the schema info of source topic. For consumers, we need to suppoort `GetSchema` command for shadow topic, and there are two interface for this. 1. Binary protocol, which handles in `CommandGetSchema` in `ServerCnx#handleGetSchema`. We only need to replace the requested shadow topic 's `schemaName` to the `schemaName` of source topic, and the underlying read operation is supported by `SchemaRegistry#getSchema(String, SchemaVersion)`. 2. HTTP protocol, which handles in `SchemasResource#getSchema(...)`. Similar with the approach in binary protocol, replace the `schemaId` with source topic in `SchemasResourceBase#getSchemaId`. For admins, we can support other "read" ops besides `getSchema`, including `getAllSchemas` and `getVersionBySchema`, which all can be supported by the same way as `getSchema`. Thanks, Haiting On 2022/07/21 02:13:08 PengHui Li wrote: > Hi Haiting, > > One question about the schema. > How can the consumer get the schema from the shadow topic during > consumption? > We should add this part in the proposal. > > Thanks, > Penghui > > On Mon, Jul 11, 2022 at 9:09 PM Asaf Mesika <asaf.mes...@gmail.com> wrote: > > > On Thu, Jun 23, 2022 at 6:26 AM Haiting Jiang <jianghait...@apache.org> > > wrote: > > > > > Hi Asaf, > > > > > > > I did a quick reading and I couldn't understand the gist of this > > change: > > > > The shadow topic doesn't really have it's own messages, or it's own > > > ledgers > > > > right? When it reads messages, it reads from the original topic > > ledgers. > > > So > > > > the only thing you need to do is sync the "metadata" - ledgers list? > > > > > > Yes, mostly ledger id list and LAC of the last ledger. > > > > > > > > One question comes to mind here: Why not simply read the ledger > > > information > > > > from original topic, without copy? > > > > > > Yes, old ledger information will be read from metadata store when > > > ShadowManagedLedger initializes. The replicator is only for new messages, > > > to > > > reduce the consume latency of subscription in shadow topic. And the > > reason > > > we also replicates message data is to populates the entry cache when > > shadow > > > topic have many active subscriptions. > > > > > > One optimization we can do is that, there would be not much help for > > shadow > > > replicator to replicate message in backlog. We can come up with some > > > policy to > > > reset shadow replicator cursor in future PR. > > > > > > > I'm not sure I'm following you. > > What do you mean by old ledger information and new ledger information? > > > > What I'm trying to understand is: why do you need to copy the source topic > > metadata: Ledgers ID list and LAC of the last ledger? Why can't you just > > use the original topic metadata? > > > > > > > > > > > > > Another question - I couldn't understand why you need to change the > > > > protocol to introduce shadow message id. Can you please explain that to > > > me? > > > > Is CommandSend used only internally between Pulsar Clusters or used by > > a > > > > Pulsar Client? > > > > > > CommandSend is designed for pulsar producer client first, and > > > geo-replication > > > reuse producer client to replicate messages between pulsar clusters. > > > > > > The shadow message id contains the ledger id and entry id of this > > message. > > > When shadow topic receive the message id, it is able to update > > > `lastConfirmedEntry` directly, so that subscription can consume this this > > > new > > > message. > > > Also shadow topic can tell if the message is from shadow replicator and > > > reject > > > otherwise. > > > > > > > > I think the flow of information is the part I don't understand. > > > > In the PIP you write "The message sync procedure of shadow topic is > > supported by shadow replication, which is very like geo-replication, with > > these differences:" > > What I don't understand is that you write that this is a read-only topic, > > so why replicate/sync messages? > > > > I managed to understand that you want to populate the BK entry cache of the > > topic ledgers in the shadow topic broker. Instead of reading from BK and > > storing it in the cache, you favor copying from the source topic broker > > cache memory to the shadow topic broker cache. Is this to save the > > bandwidth of BK? I presume the most recent messages of BK would be in > > memory anyway, no? > > > > > > > > > > > Thanks, > > > Haiting > > > > > > On 2022/06/22 15:57:11 Asaf Mesika wrote: > > > > Hi, > > > > > > > > I did a quick reading and I couldn't understand the gist of this > > change: > > > > The shadow topic doesn't really have it's own messages, or it's own > > > ledgers > > > > right? When it reads messages, it reads from the original topic > > ledgers. > > > So > > > > the only thing you need to do is sync the "metadata" - ledgers list? > > > > One question comes to mind here: Why not simply read the ledger > > > information > > > > from original topic, without copy? > > > > > > > > Another question - I couldn't understand why you need to change the > > > > protocol to introduce shadow message id. Can you please explain that to > > > me? > > > > Is CommandSend used only internally between Pulsar Clusters or used by > > a > > > > Pulsar Client? > > > > > > > > Thanks, > > > > > > > > Asaf > > > > > > > > On Tue, Jun 21, 2022 at 11:00 AM Haiting Jiang < > > jianghait...@apache.org> > > > > wrote: > > > > > > > > > Hi Pulsar community: > > > > > > > > > > I open a pip to discuss "Shadow Topic, an alternative way to support > > > > > readonly topic ownership." > > > > > > > > > > Proposal Link: https://github.com/apache/pulsar/issues/16153 > > > > > > > > > > --- > > > > > > > > > > ## Motivation > > > > > > > > > > The motivation is the same as PIP-63[1], with a new broadcast use > > case > > > of > > > > > supporting 100K subscriptions in a single topic. > > > > > 1. The bandwidth of a broker limits the number of subscriptions for a > > > > > single > > > > > topic. > > > > > 2. Subscriptions are competing for the network bandwidth on brokers. > > > > > Different > > > > > subscriptions might have different levels of severity. > > > > > 3. When synchronizing cross-city message reading, cross-city access > > > needs > > > > > to > > > > > be minimized. > > > > > 4. [New] Broadcast with 100K subscriptions. There is a limitation of > > > the > > > > > subscription number of a single topic. It's tested by Hongjie from > > > NTT > > > > > Lab > > > > > that with 40K subscriptions in a single topic, the client needs > > > about > > > > > 20min > > > > > to start all client connections, and under 1 msg/s message > > producer > > > > > rate, > > > > > the average end to end latency is about 2.9s. And for 100K > > > > > subscriptions, > > > > > the time of start connection and E2E latency is beyond > > > consideration. > > > > > > > > > > However, it's too complicated to implement with original PIP-63 > > > proposal, > > > > > the > > > > > changed code is already over 3K+ lines, see PR#11960[2], and there > > are > > > > > still > > > > > some problems left, > > > > > 1. The LAC in readonly topic is updated in a polling pattern, which > > > > > increases > > > > > the bookie load bookie. > > > > > 2. The message data of readonly topic won't be cached in broker. > > > Increase > > > > > the > > > > > network usage between broker and bookie when there are more than > > one > > > > > subscriber is tail-reading. > > > > > 3. All the subscriptions is managed in original writable-topic, so > > the > > > > > support > > > > > max subscription number is not scaleable. > > > > > > > > > > This PIP tries to come up with a simpler solution to support readonly > > > topic > > > > > ownership and solve the problems the previous PR left. The main idea > > of > > > > > this > > > > > solution is to reuse the feature of geo-replication, but instead of > > > > > duplicating storage, it shares underlying bookie ledgers between > > > different > > > > > topics. > > > > > > > > > > ## Goal > > > > > > > > > > The goal is to introduce **Shadow Topic** as a new type of topic to > > > support > > > > > readonly topic ownership. Just as its name implies, a shadow topic is > > > the > > > > > shadow of some normal persistent topic (let's call it source topic > > > here). > > > > > The > > > > > source topic and the shadow topic must have the same number of > > > partitions > > > > > or > > > > > both non-partitioned. Multiply shadow topics can be created from a > > > source > > > > > topic. > > > > > > > > > > Shadow topic shares the underlying bookie ledgers from its source > > > topic. > > > > > User > > > > > can't produce any messages to shadow topic directly and shadow topic > > > don't > > > > > create any new ledger for messages, all messages in shadow topic come > > > from > > > > > source topic. > > > > > > > > > > Shadow topic have its own subscriptions and don't share with its > > source > > > > > topic. > > > > > This means the shadow topic have its own cursor ledger to store > > > persistent > > > > > mark-delete info for each persistent subscriptions. > > > > > > > > > > The message sync procedure of shadow topic is supported by shadow > > > > > replication, > > > > > which is very like geo-replication, with these difference: > > > > > 1. Geo-replication only works between topic with the same name in > > > different > > > > > broker clusters. But shadow topic have no naming limitation and > > they > > > > > can be > > > > > in the same cluster. > > > > > 2. Geo-replication duplicates data storage, but shadow topic don't. > > > > > 3. Geo-replication replicates data from each other, it's > > > bidirectional, but > > > > > shadow replication only have one way data flow. > > > > > > > > > > > > > > > ## API Changes > > > > > > > > > > 1. PulsarApi.proto. > > > > > > > > > > Shadow topic need to know the original message id of the replicated > > > > > messages, > > > > > in order to update new ledger and lac. So we need add a > > > > > `shadow_message_id` in > > > > > CommandSend for replicator. > > > > > > > > > > ``` > > > > > message CommandSend { // ... // message id for shadow topic optional > > > > > MessageIdData shadow_message_id = 9; } > > > > > ``` > > > > > > > > > > 2. Admin API for creating shadow topic with source topic > > > > > ``` > > > > > admin.topics().createShadowTopic(source-topic-name, > > > shadow-topic-name) > > > > > ``` > > > > > > > > > > ## Implementation > > > > > > > > > > A picture showing key components relations is added in github issue > > > [3]. > > > > > > > > > > There are two key changes for implementation. > > > > > 1. How to replicate messages to shadow topics. > > > > > 2. How shadow topic manage shared ledgers info. > > > > > > > > > > ### 1. How to replicate messages to shadow topics. > > > > > > > > > > This part is mostly implemented by `ShadowReplicator`, which extends > > > > > `PersistentReplicator` introduced in geo-replication. The shadow > > topic > > > list > > > > > is added as a new topic policy of the source topic. Source topic > > > manage the > > > > > lifecycle of all the replicators. The key is to add > > `shadow_message_id` > > > > > when > > > > > produce message to shadow topics. > > > > > > > > > > ### 2. How shadow topic manage shared ledgers info. > > > > > > > > > > This part is mostly implemented by `ShadowManagedLedger`, which > > extends > > > > > current `ManagedLedgerImpl` with two key override methods. > > > > > > > > > > 1. `initialize(..)` > > > > > a. Fetch ManagedLedgerInfo of source topic instead of current shadow > > > topic. > > > > > The source topic name is stored in the topic policy of the shadow > > > topic. > > > > > b. Open the last ledger and read the explicit LAC from bookie, > > instead > > > of > > > > > creating new ledger. Reading LAC here requires that the source > > topic > > > > > must > > > > > enable explicit LAC feature by set > > > > > `bookkeeperExplicitLacIntervalInMills` > > > > > to non-zero value in broker.conf. > > > > > c. Do not start checkLedgerRollTask, which tries roll over ledger > > > > > periodically > > > > > > > > > > 2. `internalAsyncAddEntry()` Instead of write entry data to bookie, > > It > > > only > > > > > update metadata of ledgers, like `currentLedger`, > > > `lastConfirmedEntry` > > > > > and > > > > > put the replicated message into `EntryCache`. > > > > > > > > > > Besides, some other problems need to be taken care of. > > > > > - Any ledger metadata updates need to be synced to shadow topic, > > > including > > > > > ledger offloading or ledger deletion. Shadow topic needs to watch > > the > > > > > ledger > > > > > info updates with metadata store and update in time. > > > > > - The local cached LAC of `LedgerHandle` won't updated in time, so we > > > need > > > > > refresh LAC when a managed cursor requests entries beyond known > > LAC. > > > > > > > > > > ## Reject Alternatives > > > > > > > > > > See PIP-63[1]. > > > > > > > > > > ## Reference > > > > > [1] > > > > > > > > > > https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support > > > > > [2] https://github.com/apache/pulsar/pull/11960 > > > > > [3] https://github.com/apache/pulsar/issues/16153 > > > > > > > > > > > > > > > BR, > > > > > Haiting Jiang > > > > > > > > > > > > > > >