Hi Penghui,

> One question about the schema.
> How can the consumer get the schema from the shadow topic during
> consumption?
> We should add this part in the proposal.
> 

Thanks for the reminding.

>From what I see, Schema is part of a topic's metadata. So shadow topic won't
have it's own schema, but it shares the schema info of source topic. 

For consumers, we need to suppoort `GetSchema` command for shadow topic, and 
there are
two interface for this.

1. Binary protocol, which handles in `CommandGetSchema` in
   `ServerCnx#handleGetSchema`. We only need to replace the requested shadow
   topic 's `schemaName` to the `schemaName` of source topic, and the
   underlying read operation is supported by
   `SchemaRegistry#getSchema(String, SchemaVersion)`.

2. HTTP protocol, which handles in `SchemasResource#getSchema(...)`. Similar
   with the approach in binary protocol, replace the `schemaId` with source
   topic in `SchemasResourceBase#getSchemaId`.

For admins, we can support other "read" ops besides `getSchema`, including
`getAllSchemas` and `getVersionBySchema`, which all can be supported by the
same way as `getSchema`.


Thanks,
Haiting


On 2022/07/21 02:13:08 PengHui Li wrote:
> Hi Haiting,
> 
> One question about the schema.
> How can the consumer get the schema from the shadow topic during
> consumption?
> We should add this part in the proposal.
> 
> Thanks,
> Penghui
> 
> On Mon, Jul 11, 2022 at 9:09 PM Asaf Mesika <asaf.mes...@gmail.com> wrote:
> 
> > On Thu, Jun 23, 2022 at 6:26 AM Haiting Jiang <jianghait...@apache.org>
> > wrote:
> >
> > > Hi Asaf,
> > >
> > > > I did a quick reading and I couldn't understand the gist of this
> > change:
> > > > The shadow topic doesn't really have it's own messages, or it's own
> > > ledgers
> > > > right? When it reads messages, it reads from the original topic
> > ledgers.
> > > So
> > > > the only thing you need to do is sync the "metadata" - ledgers list?
> > >
> > > Yes, mostly ledger id list and LAC of the last ledger.
> >
> >
> > > > One question comes to mind here: Why not simply read the ledger
> > > information
> > > > from original topic, without copy?
> > >
> > > Yes, old ledger information will be read from metadata store when
> > > ShadowManagedLedger initializes. The replicator is only for new messages,
> > > to
> > > reduce the consume latency of subscription in shadow topic. And the
> > reason
> > > we also replicates message data is to populates the entry cache when
> > shadow
> > > topic have many active subscriptions.
> > >
> > > One optimization we can do is that, there would be not much help for
> > shadow
> > > replicator to replicate message in backlog. We can come up with some
> > > policy to
> > > reset shadow replicator cursor in future PR.
> > >
> >
> > I'm not sure I'm following you.
> > What do you mean by old ledger information and new ledger information?
> >
> > What I'm trying to understand is: why do you need to copy the source topic
> > metadata: Ledgers ID list and LAC of the last ledger? Why can't you just
> > use the original topic metadata?
> >
> >
> >
> > >
> > > > Another question - I couldn't understand why you need to change the
> > > > protocol to introduce shadow message id. Can you please explain that to
> > > me?
> > > > Is CommandSend used only internally between Pulsar Clusters or used by
> > a
> > > > Pulsar Client?
> > >
> > > CommandSend is designed for pulsar producer client first, and
> > > geo-replication
> > > reuse producer client to replicate messages between pulsar clusters.
> > >
> > > The shadow message id contains the ledger id and entry id of this
> > message.
> > > When shadow topic receive the message id, it is able to update
> > > `lastConfirmedEntry` directly, so that subscription can consume this this
> > > new
> > > message.
> > > Also shadow topic can tell if the message is from shadow replicator and
> > > reject
> > > otherwise.
> > >
> > >
> > I think the flow of information is the part I don't understand.
> >
> > In the PIP you write "The message sync procedure of shadow topic is
> > supported by shadow replication, which is very like geo-replication, with
> > these differences:"
> > What I don't understand is that you write that this is a read-only topic,
> > so why replicate/sync messages?
> >
> > I managed to understand that you want to populate the BK entry cache of the
> > topic ledgers in the shadow topic broker. Instead of reading from BK and
> > storing it in the cache, you favor copying from the source topic broker
> > cache memory to the shadow topic broker cache. Is this to save the
> > bandwidth of BK? I presume the most recent messages of BK would be in
> > memory anyway, no?
> >
> >
> >
> >
> > > Thanks,
> > > Haiting
> > >
> > > On 2022/06/22 15:57:11 Asaf Mesika wrote:
> > > > Hi,
> > > >
> > > > I did a quick reading and I couldn't understand the gist of this
> > change:
> > > > The shadow topic doesn't really have it's own messages, or it's own
> > > ledgers
> > > > right? When it reads messages, it reads from the original topic
> > ledgers.
> > > So
> > > > the only thing you need to do is sync the "metadata" - ledgers list?
> > > > One question comes to mind here: Why not simply read the ledger
> > > information
> > > > from original topic, without copy?
> > > >
> > > > Another question - I couldn't understand why you need to change the
> > > > protocol to introduce shadow message id. Can you please explain that to
> > > me?
> > > > Is CommandSend used only internally between Pulsar Clusters or used by
> > a
> > > > Pulsar Client?
> > > >
> > > > Thanks,
> > > >
> > > > Asaf
> > > >
> > > > On Tue, Jun 21, 2022 at 11:00 AM Haiting Jiang <
> > jianghait...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi Pulsar community:
> > > > >
> > > > > I open a pip to discuss "Shadow Topic, an alternative way to support
> > > > > readonly topic ownership."
> > > > >
> > > > > Proposal Link: https://github.com/apache/pulsar/issues/16153
> > > > >
> > > > > ---
> > > > >
> > > > > ## Motivation
> > > > >
> > > > > The motivation is the same as PIP-63[1], with a new broadcast use
> > case
> > > of
> > > > > supporting 100K subscriptions in a single topic.
> > > > > 1. The bandwidth of a broker limits the number of subscriptions for a
> > > > > single
> > > > >    topic.
> > > > > 2. Subscriptions are competing for the network bandwidth on brokers.
> > > > > Different
> > > > >    subscriptions might have different levels of severity.
> > > > > 3. When synchronizing cross-city message reading, cross-city access
> > > needs
> > > > > to
> > > > >    be minimized.
> > > > > 4. [New] Broadcast with 100K subscriptions. There is a limitation of
> > > the
> > > > >    subscription number of a single topic. It's tested by Hongjie from
> > > NTT
> > > > > Lab
> > > > >    that with 40K subscriptions in a single topic, the client needs
> > > about
> > > > > 20min
> > > > >    to start all client connections, and under 1 msg/s message
> > producer
> > > > > rate,
> > > > >    the average end to end latency is about 2.9s. And for 100K
> > > > > subscriptions,
> > > > >    the time of start connection and E2E latency is beyond
> > > consideration.
> > > > >
> > > > > However, it's too complicated to implement with original PIP-63
> > > proposal,
> > > > > the
> > > > > changed code is already over 3K+ lines, see PR#11960[2], and there
> > are
> > > > > still
> > > > > some problems left,
> > > > > 1. The LAC in readonly topic is updated in a polling pattern, which
> > > > > increases
> > > > >    the bookie load bookie.
> > > > > 2. The message data of readonly topic won't be cached in broker.
> > > Increase
> > > > > the
> > > > >    network usage between broker and bookie when there are more than
> > one
> > > > >    subscriber is tail-reading.
> > > > > 3. All the subscriptions is managed in original writable-topic, so
> > the
> > > > > support
> > > > >    max subscription number is not scaleable.
> > > > >
> > > > > This PIP tries to come up with a simpler solution to support readonly
> > > topic
> > > > > ownership and solve the problems the previous PR left. The main idea
> > of
> > > > > this
> > > > > solution is to reuse the feature of geo-replication, but instead of
> > > > > duplicating storage, it shares underlying bookie ledgers between
> > > different
> > > > > topics.
> > > > >
> > > > > ## Goal
> > > > >
> > > > > The goal is to introduce **Shadow Topic** as a new type of topic to
> > > support
> > > > > readonly topic ownership. Just as its name implies, a shadow topic is
> > > the
> > > > > shadow of some normal persistent topic (let's call it source topic
> > > here).
> > > > > The
> > > > > source topic and the shadow topic must have the same number of
> > > partitions
> > > > > or
> > > > > both non-partitioned. Multiply shadow topics can be created from a
> > > source
> > > > > topic.
> > > > >
> > > > > Shadow topic shares the underlying bookie ledgers from its source
> > > topic.
> > > > > User
> > > > > can't produce any messages to shadow topic directly and shadow topic
> > > don't
> > > > > create any new ledger for messages, all messages in shadow topic come
> > > from
> > > > > source topic.
> > > > >
> > > > > Shadow topic have its own subscriptions and don't share with its
> > source
> > > > > topic.
> > > > > This means the shadow topic have its own cursor ledger to store
> > > persistent
> > > > > mark-delete info for each persistent subscriptions.
> > > > >
> > > > > The message sync procedure of shadow topic is supported by shadow
> > > > > replication,
> > > > > which is very like geo-replication, with these difference:
> > > > > 1. Geo-replication only works between topic with the same name in
> > > different
> > > > >    broker clusters. But shadow topic have no naming limitation and
> > they
> > > > > can be
> > > > >    in the same cluster.
> > > > > 2. Geo-replication duplicates data storage, but shadow topic don't.
> > > > > 3. Geo-replication replicates data from each other, it's
> > > bidirectional, but
> > > > >    shadow replication only have one way data flow.
> > > > >
> > > > >
> > > > > ## API Changes
> > > > >
> > > > > 1. PulsarApi.proto.
> > > > >
> > > > > Shadow topic need to know the original message id of the replicated
> > > > > messages,
> > > > > in order to update new ledger and lac. So we need add a
> > > > > `shadow_message_id` in
> > > > > CommandSend for replicator.
> > > > >
> > > > > ```
> > > > > message CommandSend { // ... // message id for shadow topic optional
> > > > >    MessageIdData shadow_message_id = 9; }
> > > > > ```
> > > > >
> > > > > 2. Admin API for creating shadow topic with source topic
> > > > > ```
> > > > >    admin.topics().createShadowTopic(source-topic-name,
> > > shadow-topic-name)
> > > > > ```
> > > > >
> > > > > ## Implementation
> > > > >
> > > > > A picture showing key components relations is added in github issue
> > > [3].
> > > > >
> > > > > There are two key changes for implementation.
> > > > > 1. How to replicate messages to shadow topics.
> > > > > 2. How shadow topic manage shared ledgers info.
> > > > >
> > > > > ### 1. How to replicate messages to shadow topics.
> > > > >
> > > > > This part is mostly implemented by `ShadowReplicator`, which extends
> > > > > `PersistentReplicator` introduced in geo-replication. The shadow
> > topic
> > > list
> > > > > is added as a new topic policy of the source topic. Source topic
> > > manage the
> > > > > lifecycle of all the replicators. The key is to add
> > `shadow_message_id`
> > > > > when
> > > > > produce message to shadow topics.
> > > > >
> > > > > ### 2. How shadow topic manage shared ledgers info.
> > > > >
> > > > > This part is mostly implemented by `ShadowManagedLedger`, which
> > extends
> > > > > current `ManagedLedgerImpl` with two key override methods.
> > > > >
> > > > > 1. `initialize(..)`
> > > > > a. Fetch ManagedLedgerInfo of source topic instead of current shadow
> > > topic.
> > > > >    The source topic name is stored in the topic policy of the shadow
> > > topic.
> > > > > b. Open the last ledger and read the explicit LAC from bookie,
> > instead
> > > of
> > > > >    creating new ledger. Reading LAC here requires that the source
> > topic
> > > > > must
> > > > >    enable explicit LAC feature by set
> > > > > `bookkeeperExplicitLacIntervalInMills`
> > > > >    to non-zero value in broker.conf.
> > > > > c. Do not start checkLedgerRollTask, which tries roll over ledger
> > > > > periodically
> > > > >
> > > > > 2. `internalAsyncAddEntry()` Instead of write entry data to bookie,
> > It
> > > only
> > > > >    update metadata of ledgers, like `currentLedger`,
> > > `lastConfirmedEntry`
> > > > > and
> > > > >    put the replicated message into `EntryCache`.
> > > > >
> > > > > Besides, some other problems need to be taken care of.
> > > > > - Any ledger metadata updates need to be synced to shadow topic,
> > > including
> > > > >   ledger offloading or ledger deletion. Shadow topic needs to watch
> > the
> > > > > ledger
> > > > >   info updates with metadata store and update in time.
> > > > > - The local cached LAC of `LedgerHandle` won't updated in time, so we
> > > need
> > > > >   refresh LAC when a managed cursor requests entries beyond known
> > LAC.
> > > > >
> > > > > ## Reject Alternatives
> > > > >
> > > > > See PIP-63[1].
> > > > >
> > > > > ## Reference
> > > > > [1]
> > > > >
> > >
> > https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support
> > > > > [2] https://github.com/apache/pulsar/pull/11960
> > > > > [3] https://github.com/apache/pulsar/issues/16153
> > > > >
> > > > >
> > > > > BR,
> > > > > Haiting Jiang
> > > > >
> > > >
> > >
> >
> 

Reply via email to