Hi Penghui,

> Can we use `message_id` directly? It's a message ID from the source
> cluster, it can be used
> by the shadow topic feature but can also be used by other features in the
> feature.
> 

Sure, `message_id`  is better.

> > there are not much difference to reset the cursor in persistent
> replicator or
> in the raw reader.
> 
> Do you want to control it in the shadow topic? or by users?
> Maybe we can make some changes to the replicator to support skip backlogs.
> 

+1. We can add new configuration like 
`shadowReplicatorAutoResetBacklogEntries`. 
Once backlog entry number exceeded this threshold, the shadow replicator will 
reset
the cursor to LATEST automatically.

Thanks,
Haiting


On 2022/07/18 00:38:37 PengHui Li wrote:
> Hi Haiting,
> 
> > For this point, like I wrote in the reply to Asaf, I still think the cost
> of
> maintaining the consumers would be a little too high, especially the
> lifecycle
> management part. It's quite complicated in replicator already.
> 
> Yes, I see it. Makes sense. Without the replicator, it looks like we are
> trying to implement
> a very similar function to replicate data. The only difference is which
> broker to
> maintain the consumer.
> 
> Can we use `message_id` directly? It's a message ID from the source
> cluster, it can be used
> by the shadow topic feature but can also be used by other features in the
> feature.
> 
> > there are not much difference to reset the cursor in persistent
> replicator or
> in the raw reader.
> 
> Do you want to control it in the shadow topic? or by users?
> Maybe we can make some changes to the replicator to support skip backlogs.
> 
> Thanks,
> Penghui
> 
> On Sun, Jul 17, 2022 at 10:57 PM Haiting Jiang <jianghait...@apache.org>
> wrote:
> 
> > Hi Penghui,
> >
> > > I had a conversation with Asaf, looks like we can use the RawReader for a
> > > shadow topic internally to consume
> > > messages from the original topic. So that we don't need to introduce
> > > `shadow_message_id`, and don't need to introduce
> > > changes to the replicator.
> >
> > For this point, like I wrote in the reply to Asaf, I still think the cost
> > of
> > maintaining the consumers would be a little too high, especially the
> > lifecycle
> > management part. It's quite complicated in replicator already.
> >
> >
> > >
> > > And using a persistent replicator might cause the very old messages to
> > add
> > > to the entry cache of the shadow topic?
> > > Maybe there are some backlogs of the replicator.
> >
> > Yes, we should skip old backlogs for shadow replicator. But it seems that
> > there are not much difference to reset the cursor in persistent replicator
> > or
> > in the raw reader.
> >
> > BR,
> > Haiting
> >
> >
> > On 2022/07/13 08:29:34 PengHui Li wrote:
> > > Hi Haiting,
> > >
> > > I had a conversation with Asaf, looks like we can use the RawReader for a
> > > shadow topic internally to consume
> > > messages from the original topic. So that we don't need to introduce
> > > `shadow_message_id`, and don't need to introduce
> > > changes to the replicator.
> > >
> > > And using a persistent replicator might cause the very old messages to
> > add
> > > to the entry cache of the shadow topic?
> > > Maybe there are some backlogs of the replicator.
> > >
> > > Thanks,
> > > Penghui
> > >
> > > On Wed, Jul 13, 2022 at 4:16 PM Asaf Mesika <asaf.mes...@gmail.com>
> > wrote:
> > >
> > > > Thanks for the detailed answer. I have a few follow-up questions:
> > > >
> > > > 1. Can you clarify how data is flowing exactly. Specifically, from
> > source
> > > > topic broker to shadow topic broker? I tried to follow the PIP again to
> > > > understand that, especially the overview but couldn't figure it out.
> > > > Also if you can while you're at it, how do you do flow control? I
> > mean, you
> > > > stream all messages from source topic to shadow topic broker - there
> > will
> > > > be some cases it will not have any more room in memory.
> > > > Once data arrives at the shadow topic, where does it go exactly?
> > > >
> > > > 2. You wrote it was the easiest way to populate the entry cache, but
> > there
> > > > is a very big penalty to pay here: contaminating the protocol.
> > > > The way I see it, the protocol should be sacred.
> > > > Most importantly, the internal details of a broker-to-broker should
> > never
> > > > leak to the client.
> > > > In this case, the Command Send which is what the producer's clients are
> > > > using now has an extra field, called shadow_message_id. This field is
> > an
> > > > internal detail of broker-to-broker communication and thus should never
> > > > leak outside.
> > > >
> > > > I'm not that fluent in the Pulsar codebase to give a decent
> > alternative of
> > > > implementation. I can only point out that this IMO is too high a price
> > to
> > > > pay.
> > > >
> > > > One possible solution comes to mind for brokers to broker RPC without
> > > > interfering with the protocol: open a consumer in shadow topic broker
> > and
> > > > consume the messages and from there fill up the cache of managed
> > ledger?
> > > >
> > > >
> > > > 3. I understand that you want to minimize consumption latency, hence
> > you're
> > > > copying produced messages from the source topic broker topic cache
> > (memory)
> > > > to the shadow topic broker cache (memory).
> > > > Since those produced messages are fairly new, if you are reading from
> > BK,
> > > > won't those messages appear in the BK cache? How bad percentage-wise is
> > > > reading from BK cache relative to reading from Pulsar cache - both on a
> > > > remote machine?
> > > >
> > > > Geo-replication, from my understanding, is a fairly simple idea: the
> > source
> > > > topic is in charge to push out messages (replicate) to a remote cluster
> > > > topic, using the normal client (producer) for this.
> > > > If I understand correctly, here, we re-use the replicator, which
> > pushes out
> > > > messages to a topic, but those messages won't really be written to BK.
> > > > I'm afraid that creates a complicated code that will be hard to read
> > later
> > > > and ties up one area with another which will be very hard later to
> > > > untangle.
> > > >
> > > >
> > > > Thanks!
> > > >
> > > >
> > > > On Tue, Jul 12, 2022 at 5:35 PM Haiting Jiang <jianghait...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Asaf,
> > > > >
> > > > > On 2022/07/11 13:08:52 Asaf Mesika wrote:
> > > > > > On Thu, Jun 23, 2022 at 6:26 AM Haiting Jiang <
> > jianghait...@apache.org
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Asaf,
> > > > > > >
> > > > > > > > I did a quick reading and I couldn't understand the gist of
> > this
> > > > > change:
> > > > > > > > The shadow topic doesn't really have it's own messages, or
> > it's own
> > > > > > > ledgers
> > > > > > > > right? When it reads messages, it reads from the original topic
> > > > > ledgers.
> > > > > > > So
> > > > > > > > the only thing you need to do is sync the "metadata" - ledgers
> > > > list?
> > > > > > >
> > > > > > > Yes, mostly ledger id list and LAC of the last ledger.
> > > > > >
> > > > > >
> > > > > > > > One question comes to mind here: Why not simply read the ledger
> > > > > > > information
> > > > > > > > from original topic, without copy?
> > > > > > >
> > > > > > > Yes, old ledger information will be read from metadata store when
> > > > > > > ShadowManagedLedger initializes. The replicator is only for new
> > > > > messages,
> > > > > > > to
> > > > > > > reduce the consume latency of subscription in shadow topic. And
> > the
> > > > > reason
> > > > > > > we also replicates message data is to populates the entry cache
> > when
> > > > > shadow
> > > > > > > topic have many active subscriptions.
> > > > > > >
> > > > > > > One optimization we can do is that, there would be not much help
> > for
> > > > > shadow
> > > > > > > replicator to replicate message in backlog. We can come up with
> > some
> > > > > > > policy to
> > > > > > > reset shadow replicator cursor in future PR.
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > > I'm not sure I'm following you.
> > > > > > What do you mean by old ledger information and new ledger
> > information?
> > > > > >
> > > > >
> > > > > The old and new is said relative to the moment of shadow topic
> > > > > initialization.
> > > > > Because shadow topic and its source topic are usually in different
> > > > brokers,
> > > > > when clients keep writing messages to the source topic, "new" ledgers
> > > > will
> > > > > be
> > > > > created in source topic, and the shadow topic must have some way to
> > know
> > > > > the
> > > > > "new" ledgers.
> > > > >
> > > > >
> > > > > > What I'm trying to understand is: why do you need to copy the
> > source
> > > > > topic
> > > > > > metadata: Ledgers ID list and LAC of the last ledger? Why can't you
> > > > just
> > > > > > use the original topic metadata?
> > > > > >
> > > > >
> > > > > Maybe you are missing that the source topic and the shadow topic is
> > > > > normally
> > > > > not in the same broker? There won't be any way to just USE the
> > original
> > > > > topic
> > > > > metadata. so RPC is always required.
> > > > >
> > > > > For ledgers id list: I am not copying all the ledger id list, these
> > can
> > > > be
> > > > > read from metadata store (zk) directly.
> > > > >
> > > > > As for LAC, this info is only stored in source topic BK client and
> > synced
> > > > > to
> > > > > BK server in a piggyback way. So updates shadow topic LAC with the
> > > > message
> > > > > id
> > > > > brought by shadow replicator is an easy and direct way.
> > > > >
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > > Another question - I couldn't understand why you need to
> > change the
> > > > > > > > protocol to introduce shadow message id. Can you please explain
> > > > that
> > > > > to
> > > > > > > me?
> > > > > > > > Is CommandSend used only internally between Pulsar Clusters or
> > used
> > > > > by a
> > > > > > > > Pulsar Client?
> > > > > > >
> > > > > > > CommandSend is designed for pulsar producer client first, and
> > > > > > > geo-replication
> > > > > > > reuse producer client to replicate messages between pulsar
> > clusters.
> > > > > > >
> > > > > > > The shadow message id contains the ledger id and entry id of this
> > > > > message.
> > > > > > > When shadow topic receive the message id, it is able to update
> > > > > > > `lastConfirmedEntry` directly, so that subscription can consume
> > this
> > > > > this
> > > > > > > new
> > > > > > > message.
> > > > > > > Also shadow topic can tell if the message is from shadow
> > replicator
> > > > and
> > > > > > > reject
> > > > > > > otherwise.
> > > > > > >
> > > > > > >
> > > > > > I think the flow of information is the part I don't understand.
> > > > > >
> > > > > > In the PIP you write "The message sync procedure of shadow topic is
> > > > > > supported by shadow replication, which is very like
> > geo-replication,
> > > > with
> > > > > > these differences:"
> > > > > > What I don't understand is that you write that this is a read-only
> > > > topic,
> > > > > > so why replicate/sync messages?
> > > > > >
> > > > >
> > > > > The "read-only" is a bit mis-leading here, and this is the reason
> > why I
> > > > > introduced the concept of  "shadow" topic. From a consumer's view,
> > the
> > > > > shadow
> > > > > topic is not "read-only", there still will be new messages writing
> > into
> > > > > this
> > > > > topic. But from a  producer's view, the shadow topic can't be written
> > > > > directly. All messages goes from source topic. Just like a shadow.
> > > > >
> > > > > > I managed to understand that you want to populate the BK entry
> > cache of
> > > > > the
> > > > > > topic ledgers in the shadow topic broker. Instead of reading from
> > BK
> > > > and
> > > > > > storing it in the cache, you favor copying from the source topic
> > broker
> > > > > > cache memory to the shadow topic broker cache. Is this to save the
> > > > > > bandwidth of BK? I presume the most recent messages of BK would be
> > in
> > > > > > memory anyway, no?
> > > > > >
> > > > > >
> > > > >
> > > > > The reason I prefer to use copying is not to save bandwidth or disk
> > io
> > > > > usage of BK.
> > > > >
> > > > > The main concern here is to minimize consuming latency and
> > implementation
> > > > > complexity.
> > > > >
> > > > > For consuming latency, if we try to populte message from BK in shadow
> > > > > topic,
> > > > > then we must have a trigger to do so, and I can't think of any other
> > ways
> > > > > with
> > > > > smaller consuming latency than this shadow replicator.
> > > > >
> > > > > For implementation complexity. Currently the entry cache in broker
> > can
> > > > only
> > > > > be
> > > > > populated by message producing. By reusing this write path, we can
> > avoid
> > > > > solving a lot of race conditions which maybe introduced by adding
> > another
> > > > > entry write path. And it's already stable and easy to maintain.
> > > > >
> > > > >
> > > > > >
> > > > > >
> > > > > > > Thanks,
> > > > > > > Haiting
> > > > > > >
> > > > > > > On 2022/06/22 15:57:11 Asaf Mesika wrote:
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > I did a quick reading and I couldn't understand the gist of
> > this
> > > > > change:
> > > > > > > > The shadow topic doesn't really have it's own messages, or
> > it's own
> > > > > > > ledgers
> > > > > > > > right? When it reads messages, it reads from the original topic
> > > > > ledgers.
> > > > > > > So
> > > > > > > > the only thing you need to do is sync the "metadata" - ledgers
> > > > list?
> > > > > > > > One question comes to mind here: Why not simply read the ledger
> > > > > > > information
> > > > > > > > from original topic, without copy?
> > > > > > > >
> > > > > > > > Another question - I couldn't understand why you need to
> > change the
> > > > > > > > protocol to introduce shadow message id. Can you please explain
> > > > that
> > > > > to
> > > > > > > me?
> > > > > > > > Is CommandSend used only internally between Pulsar Clusters or
> > used
> > > > > by a
> > > > > > > > Pulsar Client?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Asaf
> > > > > > > >
> > > > > > > > On Tue, Jun 21, 2022 at 11:00 AM Haiting Jiang <
> > > > > jianghait...@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Pulsar community:
> > > > > > > > >
> > > > > > > > > I open a pip to discuss "Shadow Topic, an alternative way to
> > > > > support
> > > > > > > > > readonly topic ownership."
> > > > > > > > >
> > > > > > > > > Proposal Link: https://github.com/apache/pulsar/issues/16153
> > > > > > > > >
> > > > > > > > > ---
> > > > > > > > >
> > > > > > > > > ## Motivation
> > > > > > > > >
> > > > > > > > > The motivation is the same as PIP-63[1], with a new
> > broadcast use
> > > > > case
> > > > > > > of
> > > > > > > > > supporting 100K subscriptions in a single topic.
> > > > > > > > > 1. The bandwidth of a broker limits the number of
> > subscriptions
> > > > for
> > > > > a
> > > > > > > > > single
> > > > > > > > >    topic.
> > > > > > > > > 2. Subscriptions are competing for the network bandwidth on
> > > > > brokers.
> > > > > > > > > Different
> > > > > > > > >    subscriptions might have different levels of severity.
> > > > > > > > > 3. When synchronizing cross-city message reading, cross-city
> > > > access
> > > > > > > needs
> > > > > > > > > to
> > > > > > > > >    be minimized.
> > > > > > > > > 4. [New] Broadcast with 100K subscriptions. There is a
> > limitation
> > > > > of
> > > > > > > the
> > > > > > > > >    subscription number of a single topic. It's tested by
> > Hongjie
> > > > > from
> > > > > > > NTT
> > > > > > > > > Lab
> > > > > > > > >    that with 40K subscriptions in a single topic, the client
> > > > needs
> > > > > > > about
> > > > > > > > > 20min
> > > > > > > > >    to start all client connections, and under 1 msg/s message
> > > > > producer
> > > > > > > > > rate,
> > > > > > > > >    the average end to end latency is about 2.9s. And for 100K
> > > > > > > > > subscriptions,
> > > > > > > > >    the time of start connection and E2E latency is beyond
> > > > > > > consideration.
> > > > > > > > >
> > > > > > > > > However, it's too complicated to implement with original
> > PIP-63
> > > > > > > proposal,
> > > > > > > > > the
> > > > > > > > > changed code is already over 3K+ lines, see PR#11960[2], and
> > > > there
> > > > > are
> > > > > > > > > still
> > > > > > > > > some problems left,
> > > > > > > > > 1. The LAC in readonly topic is updated in a polling pattern,
> > > > which
> > > > > > > > > increases
> > > > > > > > >    the bookie load bookie.
> > > > > > > > > 2. The message data of readonly topic won't be cached in
> > broker.
> > > > > > > Increase
> > > > > > > > > the
> > > > > > > > >    network usage between broker and bookie when there are
> > more
> > > > than
> > > > > one
> > > > > > > > >    subscriber is tail-reading.
> > > > > > > > > 3. All the subscriptions is managed in original
> > writable-topic,
> > > > so
> > > > > the
> > > > > > > > > support
> > > > > > > > >    max subscription number is not scaleable.
> > > > > > > > >
> > > > > > > > > This PIP tries to come up with a simpler solution to support
> > > > > readonly
> > > > > > > topic
> > > > > > > > > ownership and solve the problems the previous PR left. The
> > main
> > > > > idea of
> > > > > > > > > this
> > > > > > > > > solution is to reuse the feature of geo-replication, but
> > instead
> > > > of
> > > > > > > > > duplicating storage, it shares underlying bookie ledgers
> > between
> > > > > > > different
> > > > > > > > > topics.
> > > > > > > > >
> > > > > > > > > ## Goal
> > > > > > > > >
> > > > > > > > > The goal is to introduce **Shadow Topic** as a new type of
> > topic
> > > > to
> > > > > > > support
> > > > > > > > > readonly topic ownership. Just as its name implies, a shadow
> > > > topic
> > > > > is
> > > > > > > the
> > > > > > > > > shadow of some normal persistent topic (let's call it source
> > > > topic
> > > > > > > here).
> > > > > > > > > The
> > > > > > > > > source topic and the shadow topic must have the same number
> > of
> > > > > > > partitions
> > > > > > > > > or
> > > > > > > > > both non-partitioned. Multiply shadow topics can be created
> > from
> > > > a
> > > > > > > source
> > > > > > > > > topic.
> > > > > > > > >
> > > > > > > > > Shadow topic shares the underlying bookie ledgers from its
> > source
> > > > > > > topic.
> > > > > > > > > User
> > > > > > > > > can't produce any messages to shadow topic directly and
> > shadow
> > > > > topic
> > > > > > > don't
> > > > > > > > > create any new ledger for messages, all messages in shadow
> > topic
> > > > > come
> > > > > > > from
> > > > > > > > > source topic.
> > > > > > > > >
> > > > > > > > > Shadow topic have its own subscriptions and don't share with
> > its
> > > > > source
> > > > > > > > > topic.
> > > > > > > > > This means the shadow topic have its own cursor ledger to
> > store
> > > > > > > persistent
> > > > > > > > > mark-delete info for each persistent subscriptions.
> > > > > > > > >
> > > > > > > > > The message sync procedure of shadow topic is supported by
> > shadow
> > > > > > > > > replication,
> > > > > > > > > which is very like geo-replication, with these difference:
> > > > > > > > > 1. Geo-replication only works between topic with the same
> > name in
> > > > > > > different
> > > > > > > > >    broker clusters. But shadow topic have no naming
> > limitation
> > > > and
> > > > > they
> > > > > > > > > can be
> > > > > > > > >    in the same cluster.
> > > > > > > > > 2. Geo-replication duplicates data storage, but shadow topic
> > > > don't.
> > > > > > > > > 3. Geo-replication replicates data from each other, it's
> > > > > > > bidirectional, but
> > > > > > > > >    shadow replication only have one way data flow.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > ## API Changes
> > > > > > > > >
> > > > > > > > > 1. PulsarApi.proto.
> > > > > > > > >
> > > > > > > > > Shadow topic need to know the original message id of the
> > > > replicated
> > > > > > > > > messages,
> > > > > > > > > in order to update new ledger and lac. So we need add a
> > > > > > > > > `shadow_message_id` in
> > > > > > > > > CommandSend for replicator.
> > > > > > > > >
> > > > > > > > > ```
> > > > > > > > > message CommandSend { // ... // message id for shadow topic
> > > > > optional
> > > > > > > > >    MessageIdData shadow_message_id = 9; }
> > > > > > > > > ```
> > > > > > > > >
> > > > > > > > > 2. Admin API for creating shadow topic with source topic
> > > > > > > > > ```
> > > > > > > > >    admin.topics().createShadowTopic(source-topic-name,
> > > > > > > shadow-topic-name)
> > > > > > > > > ```
> > > > > > > > >
> > > > > > > > > ## Implementation
> > > > > > > > >
> > > > > > > > > A picture showing key components relations is added in github
> > > > issue
> > > > > > > [3].
> > > > > > > > >
> > > > > > > > > There are two key changes for implementation.
> > > > > > > > > 1. How to replicate messages to shadow topics.
> > > > > > > > > 2. How shadow topic manage shared ledgers info.
> > > > > > > > >
> > > > > > > > > ### 1. How to replicate messages to shadow topics.
> > > > > > > > >
> > > > > > > > > This part is mostly implemented by `ShadowReplicator`, which
> > > > > extends
> > > > > > > > > `PersistentReplicator` introduced in geo-replication. The
> > shadow
> > > > > topic
> > > > > > > list
> > > > > > > > > is added as a new topic policy of the source topic. Source
> > topic
> > > > > > > manage the
> > > > > > > > > lifecycle of all the replicators. The key is to add
> > > > > `shadow_message_id`
> > > > > > > > > when
> > > > > > > > > produce message to shadow topics.
> > > > > > > > >
> > > > > > > > > ### 2. How shadow topic manage shared ledgers info.
> > > > > > > > >
> > > > > > > > > This part is mostly implemented by `ShadowManagedLedger`,
> > which
> > > > > extends
> > > > > > > > > current `ManagedLedgerImpl` with two key override methods.
> > > > > > > > >
> > > > > > > > > 1. `initialize(..)`
> > > > > > > > > a. Fetch ManagedLedgerInfo of source topic instead of current
> > > > > shadow
> > > > > > > topic.
> > > > > > > > >    The source topic name is stored in the topic policy of the
> > > > > shadow
> > > > > > > topic.
> > > > > > > > > b. Open the last ledger and read the explicit LAC from
> > bookie,
> > > > > instead
> > > > > > > of
> > > > > > > > >    creating new ledger. Reading LAC here requires that the
> > source
> > > > > topic
> > > > > > > > > must
> > > > > > > > >    enable explicit LAC feature by set
> > > > > > > > > `bookkeeperExplicitLacIntervalInMills`
> > > > > > > > >    to non-zero value in broker.conf.
> > > > > > > > > c. Do not start checkLedgerRollTask, which tries roll over
> > ledger
> > > > > > > > > periodically
> > > > > > > > >
> > > > > > > > > 2. `internalAsyncAddEntry()` Instead of write entry data to
> > > > bookie,
> > > > > It
> > > > > > > only
> > > > > > > > >    update metadata of ledgers, like `currentLedger`,
> > > > > > > `lastConfirmedEntry`
> > > > > > > > > and
> > > > > > > > >    put the replicated message into `EntryCache`.
> > > > > > > > >
> > > > > > > > > Besides, some other problems need to be taken care of.
> > > > > > > > > - Any ledger metadata updates need to be synced to shadow
> > topic,
> > > > > > > including
> > > > > > > > >   ledger offloading or ledger deletion. Shadow topic needs to
> > > > watch
> > > > > the
> > > > > > > > > ledger
> > > > > > > > >   info updates with metadata store and update in time.
> > > > > > > > > - The local cached LAC of `LedgerHandle` won't updated in
> > time,
> > > > so
> > > > > we
> > > > > > > need
> > > > > > > > >   refresh LAC when a managed cursor requests entries beyond
> > known
> > > > > LAC.
> > > > > > > > >
> > > > > > > > > ## Reject Alternatives
> > > > > > > > >
> > > > > > > > > See PIP-63[1].
> > > > > > > > >
> > > > > > > > > ## Reference
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > >
> > > > >
> > > > >
> > > >
> > https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support
> > > > > > > > > [2] https://github.com/apache/pulsar/pull/11960
> > > > > > > > > [3] https://github.com/apache/pulsar/issues/16153
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > BR,
> > > > > > > > > Haiting Jiang
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > BR,
> > > > > Haiting
> > > > >
> > > >
> > >
> >
> 

Reply via email to