Hi Penghui,

> I had a conversation with Asaf, looks like we can use the RawReader for a
> shadow topic internally to consume
> messages from the original topic. So that we don't need to introduce
> `shadow_message_id`, and don't need to introduce
> changes to the replicator.

For this point, like I wrote in the reply to Asaf, I still think the cost of
maintaining the consumers would be a little too high, especially the lifecycle
management part. It's quite complicated in replicator already.


> 
> And using a persistent replicator might cause the very old messages to add
> to the entry cache of the shadow topic?
> Maybe there are some backlogs of the replicator.

Yes, we should skip old backlogs for shadow replicator. But it seems that
there are not much difference to reset the cursor in persistent replicator or
in the raw reader.

BR,
Haiting


On 2022/07/13 08:29:34 PengHui Li wrote:
> Hi Haiting,
> 
> I had a conversation with Asaf, looks like we can use the RawReader for a
> shadow topic internally to consume
> messages from the original topic. So that we don't need to introduce
> `shadow_message_id`, and don't need to introduce
> changes to the replicator.
> 
> And using a persistent replicator might cause the very old messages to add
> to the entry cache of the shadow topic?
> Maybe there are some backlogs of the replicator.
> 
> Thanks,
> Penghui
> 
> On Wed, Jul 13, 2022 at 4:16 PM Asaf Mesika <asaf.mes...@gmail.com> wrote:
> 
> > Thanks for the detailed answer. I have a few follow-up questions:
> >
> > 1. Can you clarify how data is flowing exactly. Specifically, from source
> > topic broker to shadow topic broker? I tried to follow the PIP again to
> > understand that, especially the overview but couldn't figure it out.
> > Also if you can while you're at it, how do you do flow control? I mean, you
> > stream all messages from source topic to shadow topic broker - there will
> > be some cases it will not have any more room in memory.
> > Once data arrives at the shadow topic, where does it go exactly?
> >
> > 2. You wrote it was the easiest way to populate the entry cache, but there
> > is a very big penalty to pay here: contaminating the protocol.
> > The way I see it, the protocol should be sacred.
> > Most importantly, the internal details of a broker-to-broker should never
> > leak to the client.
> > In this case, the Command Send which is what the producer's clients are
> > using now has an extra field, called shadow_message_id. This field is an
> > internal detail of broker-to-broker communication and thus should never
> > leak outside.
> >
> > I'm not that fluent in the Pulsar codebase to give a decent alternative of
> > implementation. I can only point out that this IMO is too high a price to
> > pay.
> >
> > One possible solution comes to mind for brokers to broker RPC without
> > interfering with the protocol: open a consumer in shadow topic broker and
> > consume the messages and from there fill up the cache of managed ledger?
> >
> >
> > 3. I understand that you want to minimize consumption latency, hence you're
> > copying produced messages from the source topic broker topic cache (memory)
> > to the shadow topic broker cache (memory).
> > Since those produced messages are fairly new, if you are reading from BK,
> > won't those messages appear in the BK cache? How bad percentage-wise is
> > reading from BK cache relative to reading from Pulsar cache - both on a
> > remote machine?
> >
> > Geo-replication, from my understanding, is a fairly simple idea: the source
> > topic is in charge to push out messages (replicate) to a remote cluster
> > topic, using the normal client (producer) for this.
> > If I understand correctly, here, we re-use the replicator, which pushes out
> > messages to a topic, but those messages won't really be written to BK.
> > I'm afraid that creates a complicated code that will be hard to read later
> > and ties up one area with another which will be very hard later to
> > untangle.
> >
> >
> > Thanks!
> >
> >
> > On Tue, Jul 12, 2022 at 5:35 PM Haiting Jiang <jianghait...@gmail.com>
> > wrote:
> >
> > > Hi Asaf,
> > >
> > > On 2022/07/11 13:08:52 Asaf Mesika wrote:
> > > > On Thu, Jun 23, 2022 at 6:26 AM Haiting Jiang <jianghait...@apache.org
> > >
> > > > wrote:
> > > >
> > > > > Hi Asaf,
> > > > >
> > > > > > I did a quick reading and I couldn't understand the gist of this
> > > change:
> > > > > > The shadow topic doesn't really have it's own messages, or it's own
> > > > > ledgers
> > > > > > right? When it reads messages, it reads from the original topic
> > > ledgers.
> > > > > So
> > > > > > the only thing you need to do is sync the "metadata" - ledgers
> > list?
> > > > >
> > > > > Yes, mostly ledger id list and LAC of the last ledger.
> > > >
> > > >
> > > > > > One question comes to mind here: Why not simply read the ledger
> > > > > information
> > > > > > from original topic, without copy?
> > > > >
> > > > > Yes, old ledger information will be read from metadata store when
> > > > > ShadowManagedLedger initializes. The replicator is only for new
> > > messages,
> > > > > to
> > > > > reduce the consume latency of subscription in shadow topic. And the
> > > reason
> > > > > we also replicates message data is to populates the entry cache when
> > > shadow
> > > > > topic have many active subscriptions.
> > > > >
> > > > > One optimization we can do is that, there would be not much help for
> > > shadow
> > > > > replicator to replicate message in backlog. We can come up with some
> > > > > policy to
> > > > > reset shadow replicator cursor in future PR.
> > > > >
> > > >
> > >
> > >
> > > > I'm not sure I'm following you.
> > > > What do you mean by old ledger information and new ledger information?
> > > >
> > >
> > > The old and new is said relative to the moment of shadow topic
> > > initialization.
> > > Because shadow topic and its source topic are usually in different
> > brokers,
> > > when clients keep writing messages to the source topic, "new" ledgers
> > will
> > > be
> > > created in source topic, and the shadow topic must have some way to know
> > > the
> > > "new" ledgers.
> > >
> > >
> > > > What I'm trying to understand is: why do you need to copy the source
> > > topic
> > > > metadata: Ledgers ID list and LAC of the last ledger? Why can't you
> > just
> > > > use the original topic metadata?
> > > >
> > >
> > > Maybe you are missing that the source topic and the shadow topic is
> > > normally
> > > not in the same broker? There won't be any way to just USE the original
> > > topic
> > > metadata. so RPC is always required.
> > >
> > > For ledgers id list: I am not copying all the ledger id list, these can
> > be
> > > read from metadata store (zk) directly.
> > >
> > > As for LAC, this info is only stored in source topic BK client and synced
> > > to
> > > BK server in a piggyback way. So updates shadow topic LAC with the
> > message
> > > id
> > > brought by shadow replicator is an easy and direct way.
> > >
> > > >
> > > >
> > > > >
> > > > > > Another question - I couldn't understand why you need to change the
> > > > > > protocol to introduce shadow message id. Can you please explain
> > that
> > > to
> > > > > me?
> > > > > > Is CommandSend used only internally between Pulsar Clusters or used
> > > by a
> > > > > > Pulsar Client?
> > > > >
> > > > > CommandSend is designed for pulsar producer client first, and
> > > > > geo-replication
> > > > > reuse producer client to replicate messages between pulsar clusters.
> > > > >
> > > > > The shadow message id contains the ledger id and entry id of this
> > > message.
> > > > > When shadow topic receive the message id, it is able to update
> > > > > `lastConfirmedEntry` directly, so that subscription can consume this
> > > this
> > > > > new
> > > > > message.
> > > > > Also shadow topic can tell if the message is from shadow replicator
> > and
> > > > > reject
> > > > > otherwise.
> > > > >
> > > > >
> > > > I think the flow of information is the part I don't understand.
> > > >
> > > > In the PIP you write "The message sync procedure of shadow topic is
> > > > supported by shadow replication, which is very like geo-replication,
> > with
> > > > these differences:"
> > > > What I don't understand is that you write that this is a read-only
> > topic,
> > > > so why replicate/sync messages?
> > > >
> > >
> > > The "read-only" is a bit mis-leading here, and this is the reason why I
> > > introduced the concept of  "shadow" topic. From a consumer's view, the
> > > shadow
> > > topic is not "read-only", there still will be new messages writing into
> > > this
> > > topic. But from a  producer's view, the shadow topic can't be written
> > > directly. All messages goes from source topic. Just like a shadow.
> > >
> > > > I managed to understand that you want to populate the BK entry cache of
> > > the
> > > > topic ledgers in the shadow topic broker. Instead of reading from BK
> > and
> > > > storing it in the cache, you favor copying from the source topic broker
> > > > cache memory to the shadow topic broker cache. Is this to save the
> > > > bandwidth of BK? I presume the most recent messages of BK would be in
> > > > memory anyway, no?
> > > >
> > > >
> > >
> > > The reason I prefer to use copying is not to save bandwidth or disk io
> > > usage of BK.
> > >
> > > The main concern here is to minimize consuming latency and implementation
> > > complexity.
> > >
> > > For consuming latency, if we try to populte message from BK in shadow
> > > topic,
> > > then we must have a trigger to do so, and I can't think of any other ways
> > > with
> > > smaller consuming latency than this shadow replicator.
> > >
> > > For implementation complexity. Currently the entry cache in broker can
> > only
> > > be
> > > populated by message producing. By reusing this write path, we can avoid
> > > solving a lot of race conditions which maybe introduced by adding another
> > > entry write path. And it's already stable and easy to maintain.
> > >
> > >
> > > >
> > > >
> > > > > Thanks,
> > > > > Haiting
> > > > >
> > > > > On 2022/06/22 15:57:11 Asaf Mesika wrote:
> > > > > > Hi,
> > > > > >
> > > > > > I did a quick reading and I couldn't understand the gist of this
> > > change:
> > > > > > The shadow topic doesn't really have it's own messages, or it's own
> > > > > ledgers
> > > > > > right? When it reads messages, it reads from the original topic
> > > ledgers.
> > > > > So
> > > > > > the only thing you need to do is sync the "metadata" - ledgers
> > list?
> > > > > > One question comes to mind here: Why not simply read the ledger
> > > > > information
> > > > > > from original topic, without copy?
> > > > > >
> > > > > > Another question - I couldn't understand why you need to change the
> > > > > > protocol to introduce shadow message id. Can you please explain
> > that
> > > to
> > > > > me?
> > > > > > Is CommandSend used only internally between Pulsar Clusters or used
> > > by a
> > > > > > Pulsar Client?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Asaf
> > > > > >
> > > > > > On Tue, Jun 21, 2022 at 11:00 AM Haiting Jiang <
> > > jianghait...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Pulsar community:
> > > > > > >
> > > > > > > I open a pip to discuss "Shadow Topic, an alternative way to
> > > support
> > > > > > > readonly topic ownership."
> > > > > > >
> > > > > > > Proposal Link: https://github.com/apache/pulsar/issues/16153
> > > > > > >
> > > > > > > ---
> > > > > > >
> > > > > > > ## Motivation
> > > > > > >
> > > > > > > The motivation is the same as PIP-63[1], with a new broadcast use
> > > case
> > > > > of
> > > > > > > supporting 100K subscriptions in a single topic.
> > > > > > > 1. The bandwidth of a broker limits the number of subscriptions
> > for
> > > a
> > > > > > > single
> > > > > > >    topic.
> > > > > > > 2. Subscriptions are competing for the network bandwidth on
> > > brokers.
> > > > > > > Different
> > > > > > >    subscriptions might have different levels of severity.
> > > > > > > 3. When synchronizing cross-city message reading, cross-city
> > access
> > > > > needs
> > > > > > > to
> > > > > > >    be minimized.
> > > > > > > 4. [New] Broadcast with 100K subscriptions. There is a limitation
> > > of
> > > > > the
> > > > > > >    subscription number of a single topic. It's tested by Hongjie
> > > from
> > > > > NTT
> > > > > > > Lab
> > > > > > >    that with 40K subscriptions in a single topic, the client
> > needs
> > > > > about
> > > > > > > 20min
> > > > > > >    to start all client connections, and under 1 msg/s message
> > > producer
> > > > > > > rate,
> > > > > > >    the average end to end latency is about 2.9s. And for 100K
> > > > > > > subscriptions,
> > > > > > >    the time of start connection and E2E latency is beyond
> > > > > consideration.
> > > > > > >
> > > > > > > However, it's too complicated to implement with original PIP-63
> > > > > proposal,
> > > > > > > the
> > > > > > > changed code is already over 3K+ lines, see PR#11960[2], and
> > there
> > > are
> > > > > > > still
> > > > > > > some problems left,
> > > > > > > 1. The LAC in readonly topic is updated in a polling pattern,
> > which
> > > > > > > increases
> > > > > > >    the bookie load bookie.
> > > > > > > 2. The message data of readonly topic won't be cached in broker.
> > > > > Increase
> > > > > > > the
> > > > > > >    network usage between broker and bookie when there are more
> > than
> > > one
> > > > > > >    subscriber is tail-reading.
> > > > > > > 3. All the subscriptions is managed in original writable-topic,
> > so
> > > the
> > > > > > > support
> > > > > > >    max subscription number is not scaleable.
> > > > > > >
> > > > > > > This PIP tries to come up with a simpler solution to support
> > > readonly
> > > > > topic
> > > > > > > ownership and solve the problems the previous PR left. The main
> > > idea of
> > > > > > > this
> > > > > > > solution is to reuse the feature of geo-replication, but instead
> > of
> > > > > > > duplicating storage, it shares underlying bookie ledgers between
> > > > > different
> > > > > > > topics.
> > > > > > >
> > > > > > > ## Goal
> > > > > > >
> > > > > > > The goal is to introduce **Shadow Topic** as a new type of topic
> > to
> > > > > support
> > > > > > > readonly topic ownership. Just as its name implies, a shadow
> > topic
> > > is
> > > > > the
> > > > > > > shadow of some normal persistent topic (let's call it source
> > topic
> > > > > here).
> > > > > > > The
> > > > > > > source topic and the shadow topic must have the same number of
> > > > > partitions
> > > > > > > or
> > > > > > > both non-partitioned. Multiply shadow topics can be created from
> > a
> > > > > source
> > > > > > > topic.
> > > > > > >
> > > > > > > Shadow topic shares the underlying bookie ledgers from its source
> > > > > topic.
> > > > > > > User
> > > > > > > can't produce any messages to shadow topic directly and shadow
> > > topic
> > > > > don't
> > > > > > > create any new ledger for messages, all messages in shadow topic
> > > come
> > > > > from
> > > > > > > source topic.
> > > > > > >
> > > > > > > Shadow topic have its own subscriptions and don't share with its
> > > source
> > > > > > > topic.
> > > > > > > This means the shadow topic have its own cursor ledger to store
> > > > > persistent
> > > > > > > mark-delete info for each persistent subscriptions.
> > > > > > >
> > > > > > > The message sync procedure of shadow topic is supported by shadow
> > > > > > > replication,
> > > > > > > which is very like geo-replication, with these difference:
> > > > > > > 1. Geo-replication only works between topic with the same name in
> > > > > different
> > > > > > >    broker clusters. But shadow topic have no naming limitation
> > and
> > > they
> > > > > > > can be
> > > > > > >    in the same cluster.
> > > > > > > 2. Geo-replication duplicates data storage, but shadow topic
> > don't.
> > > > > > > 3. Geo-replication replicates data from each other, it's
> > > > > bidirectional, but
> > > > > > >    shadow replication only have one way data flow.
> > > > > > >
> > > > > > >
> > > > > > > ## API Changes
> > > > > > >
> > > > > > > 1. PulsarApi.proto.
> > > > > > >
> > > > > > > Shadow topic need to know the original message id of the
> > replicated
> > > > > > > messages,
> > > > > > > in order to update new ledger and lac. So we need add a
> > > > > > > `shadow_message_id` in
> > > > > > > CommandSend for replicator.
> > > > > > >
> > > > > > > ```
> > > > > > > message CommandSend { // ... // message id for shadow topic
> > > optional
> > > > > > >    MessageIdData shadow_message_id = 9; }
> > > > > > > ```
> > > > > > >
> > > > > > > 2. Admin API for creating shadow topic with source topic
> > > > > > > ```
> > > > > > >    admin.topics().createShadowTopic(source-topic-name,
> > > > > shadow-topic-name)
> > > > > > > ```
> > > > > > >
> > > > > > > ## Implementation
> > > > > > >
> > > > > > > A picture showing key components relations is added in github
> > issue
> > > > > [3].
> > > > > > >
> > > > > > > There are two key changes for implementation.
> > > > > > > 1. How to replicate messages to shadow topics.
> > > > > > > 2. How shadow topic manage shared ledgers info.
> > > > > > >
> > > > > > > ### 1. How to replicate messages to shadow topics.
> > > > > > >
> > > > > > > This part is mostly implemented by `ShadowReplicator`, which
> > > extends
> > > > > > > `PersistentReplicator` introduced in geo-replication. The shadow
> > > topic
> > > > > list
> > > > > > > is added as a new topic policy of the source topic. Source topic
> > > > > manage the
> > > > > > > lifecycle of all the replicators. The key is to add
> > > `shadow_message_id`
> > > > > > > when
> > > > > > > produce message to shadow topics.
> > > > > > >
> > > > > > > ### 2. How shadow topic manage shared ledgers info.
> > > > > > >
> > > > > > > This part is mostly implemented by `ShadowManagedLedger`, which
> > > extends
> > > > > > > current `ManagedLedgerImpl` with two key override methods.
> > > > > > >
> > > > > > > 1. `initialize(..)`
> > > > > > > a. Fetch ManagedLedgerInfo of source topic instead of current
> > > shadow
> > > > > topic.
> > > > > > >    The source topic name is stored in the topic policy of the
> > > shadow
> > > > > topic.
> > > > > > > b. Open the last ledger and read the explicit LAC from bookie,
> > > instead
> > > > > of
> > > > > > >    creating new ledger. Reading LAC here requires that the source
> > > topic
> > > > > > > must
> > > > > > >    enable explicit LAC feature by set
> > > > > > > `bookkeeperExplicitLacIntervalInMills`
> > > > > > >    to non-zero value in broker.conf.
> > > > > > > c. Do not start checkLedgerRollTask, which tries roll over ledger
> > > > > > > periodically
> > > > > > >
> > > > > > > 2. `internalAsyncAddEntry()` Instead of write entry data to
> > bookie,
> > > It
> > > > > only
> > > > > > >    update metadata of ledgers, like `currentLedger`,
> > > > > `lastConfirmedEntry`
> > > > > > > and
> > > > > > >    put the replicated message into `EntryCache`.
> > > > > > >
> > > > > > > Besides, some other problems need to be taken care of.
> > > > > > > - Any ledger metadata updates need to be synced to shadow topic,
> > > > > including
> > > > > > >   ledger offloading or ledger deletion. Shadow topic needs to
> > watch
> > > the
> > > > > > > ledger
> > > > > > >   info updates with metadata store and update in time.
> > > > > > > - The local cached LAC of `LedgerHandle` won't updated in time,
> > so
> > > we
> > > > > need
> > > > > > >   refresh LAC when a managed cursor requests entries beyond known
> > > LAC.
> > > > > > >
> > > > > > > ## Reject Alternatives
> > > > > > >
> > > > > > > See PIP-63[1].
> > > > > > >
> > > > > > > ## Reference
> > > > > > > [1]
> > > > > > >
> > > > >
> > >
> > >
> > https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support
> > > > > > > [2] https://github.com/apache/pulsar/pull/11960
> > > > > > > [3] https://github.com/apache/pulsar/issues/16153
> > > > > > >
> > > > > > >
> > > > > > > BR,
> > > > > > > Haiting Jiang
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > BR,
> > > Haiting
> > >
> >
> 

Reply via email to