...and link 
https://docs.google.com/document/d/1WLN9vbhcqv-ZQ-9udlUBkFURpGPO9GSB6GsI7guJvZQ/edit?usp=sharing
--
Matteo Merli
<mme...@apache.org>

On Fri, Mar 22, 2019 at 2:32 PM Matteo Merli <mme...@apache.org> wrote:
>
> BTW: I also added to a Google doc if someone prefers to add
> comments/suggestions there rather here inline.
> --
> Matteo Merli
> <mme...@apache.org>
>
> On Fri, Mar 22, 2019 at 12:14 PM Matteo Merli <mme...@apache.org> wrote:
> >
> > https://github.com/apache/pulsar/wiki/PIP-33%3A-Replicated-subscriptions
> >
> > ----------------------------------------------------------------------------------------
> >
> > * **Status**: Proposal
> > * **Author**: Ivan Kelly, Matteo Merli
> > * **Pull Request**:
> > * **Mailing List discussion**:
> > * **Release**:
> >
> > ## Goal
> >
> > Provide a mechanism to keep subscription state in-sync, within a
> > sub-second timeframe, in the context of a topic that is being
> > asynchronously replicated across multiple geographical regions.
> >
> > ## Current state of affairs
> >
> > Pulsar support geo-replication feature, in which a topic can be
> > configured to be replicated across N regions, (eg: `us-west`, `us-east`
> > and `eu-central`).
> >
> > The topic is presented as a virtual "global" entity in which messages
> > can be published and consumer from any of the configured cluster.
> >
> > The only limitation is that subscriptions are currently "local" to the
> > cluster in which they are created. That is, no state for the subscription
> > is transferred across regions.
> >
> > If a consumer reconnects to a new region it will trigger the creation of a
> > new unrelated subscription, albeit with the same name. This subscription
> > will be created at the end of the topic in the new region (or at the
> > beginning, depending on configuration) and at the same time, the
> > original subscription will be left dangling in the previous region.
> >
> > The main problem is that the message ids of the messages are not
> > consistent across different regions.
> >
> > ## Problem we're trying to solve
> >
> > There are many scenarios in which it would be very convenient for
> > an application to have the ability to failover consumers from one
> > region to another.
> >
> > During this failover event, a consumer should be able to restart
> > consumer from where it left off in the previous region.
> >
> > Given that for the very nature of async replication, having the
> > exact position will be impossible, in most cases restarting "close"
> > to that point will be already good enough.
> >
> > ## Proposed solution
> >
> > A Pulsar topic that is being geo-replicated can be seen as a collection
> > of partially ordered logs.
> >
> > Since producers can publish messages on each of the regions, each region
> > can end up having a sequence of messages different from the others, though
> > messages from one particular region will be always stored in order.
> >
> > The main idea is to create a consistent distributed snapshot to establish
> > an association between message ids from different clusters.
> >
> > The snapshot of message ids will be constructed in a way such that:
> >  * For a given message `M1-a` (written or replicated into region `a`)
> >  * We have a set of associated message ids from other regions, eg. `M3-b`
> >    and `M1-c`
> >  * When a consumer has acknowledged all messages <= `M1-a`, it will
> >    imply that it also have received (and acknowledged) all messages in
> >    region `b` with message id <= `M3-b` and all messages in region `c`
> >    with message id <= `M1-c`
> >  * With this, when the "mark-delete" position (the cursor pointer) of
> >    a given subscription moves past the `M1-a` message in region `a`,
> >    the broker will be able to instruct brokers in `b` and `c` to
> >    update the subscription respectively to `M3-b` and `M1-c`.
> >
> > These snapshots will be stored as "marker" messages in the topic itself
> > and they will be filtered out by broker before dispatching messages
> > to consumers.
> >
> > Similarly, the snapshot themselves will be created by letting "marker"
> > messages flow inline through the replication channel.
> >
> > ### Advantages
> >
> >  * The logic is quite simple and straightforward to implement
> >  * Low and configurable overhead when enabled
> >  * Zero overhead when disabled
> >
> > ### Limitations
> >
> >  * The snapshots are taken periodically. Proposed default is every 1
> > second. That will
> >    mean that a consumer failing over to a different cluster can
> > potentially receive
> >    1 second worth of duplicates.
> >  * For this proposal, we're only targeting to sync the "mark-delete"
> > position (eg: offset),
> >    without considering the messages deleted out of order after that
> > point. These will
> >    appear as duplicates after a cluster failover. In future, it might
> > be possible to
> >    combine different techniques to track individually deleted messages as 
> > well.
> >  * The snapshots can only be taken if all involved clusters are
> > available. This is to
> >    ensure correctness and avoid skipping over messages published in
> > remote clusters that
> >    were not yet seen by consumers.
> >    The practical implication of this is that this proposal is useful to 
> > either:
> >     - Support consumer failover across clusters when there are no failures
> >     - Support one cluster failure and allow consumer to immediately
> > recover from a different
> >       cluster.
> >    After one cluster is down, the snapshots will not be taken, so it
> > will not be possible
> >    to do another consumer failover to another cluster (and preserve
> > the position) until
> >    the failed cluster is either brought back online, or removed from
> > the replication list.
> >
> > ## Proposed implementation
> >
> > ### Client API
> >
> > Applications that want to enable the replication subscription feature
> > will be able to configure so when creating a consumer. For example:
> >
> > ```java
> > Consumer<String> consumer = client.newConsumer(Schema.STRING)
> >             .topic("my-topic")
> >             .subscriptionName("my-subscription")
> >             .replicateSubscriptionState(true)
> >             .subscribe();
> > ```
> >
> > ### Marker messages
> >
> > Most of the implementation of replicated subscription is based on
> > establishing a set of points in the topic storage for each region
> > for which there's strict relation with each region's message ids.
> >
> > To achieve that, the communication between brokers needs to be done
> > inline with the same flow of messages replicated across regions and
> > it will have to establish a new message id that can be referenced
> > from the other regions.
> >
> > An additional usage of marker messages will be to store snapshot
> > information in a scalable way, so that we don't have to keep all
> > the snapshots together but rather we can reconstruct them while we
> > fetch the entries from BookKeeper (eg: when a consumer comes back
> > after a while and starts draining the backlog).
> >
> > Essentially, marker messages will be a special class of messages that
> > are used by Pulsar for internal purposes and are stored inline
> > in the topic.
> >
> > These messages will be identified on the `MessageMetadata` protobuf
> > definition with one additional field:
> >
> > ```protobuf
> > // Contains the enum value with the type of marker
> > // Each marker message will have a different format defined
> > // in protobuf
> > optional int32 marker_type = 18;
> > ```
> >
> > ### Constructing a cursor snapshot
> >
> > When the Pulsar broker is serving a topic for which at least one
> > subscription is "replicated", it will activate a periodic task to
> > create the cursor snapshot.
> >
> > The frequency of this snapshot will be configurable in `broker.conf` and,
> > possibly, also as part of namespace policies (though that might not be
> > necessary in the first implementation).
> >
> > #### Snapshot steps
> >
> > ##### Source region starts a snapshot
> >
> > Broker in region `a` will start a new snapshot by writing locally a
> > marker message like:
> >
> > ```json
> > "ReplicatedSubscriptionsSnapshotRequest" : {
> >     "snapshot_id" : "444D3632-F96C-48D7-83DB-041C32164EC1",
> >     "source_cluster" : "a",
> > }
> > ```
> >
> > This marker will get replicated to all other clusters, eg: `b` and `c`.
> >
> > When replicators in each of the other regions will get this marker
> > message, they will reply by sending another marker back to region `a`.
> >
> > ```json
> > "ReplicatedSubscriptionsSnapshotResponse" : {
> >     "snapshotId" : "444D3632-F96C-48D7-83DB-041C32164EC1",
> >     "cluster" : {
> >         "cluster" : "b",
> >         "message_id" : {
> >             "ledger_id" : 1234,
> >             "endtry_id" : 45678
> >         }
> >     }
> > }
> > ```
> >
> > Broker in region `a` will wait to receive all responses from `b` and `c`
> > and then it will finalize the snapshot.
> >
> > The snapshot content will be like:
> >
> > ```json
> > {
> >     "snapshot_id" : "444D3632-F96C-48D7-83DB-041C32164EC1",
> >     "local_message_id" : {
> >         "ledger_id" : 192,
> >         "endtry_id" : 123123
> >     },
> >     "clusters" : [
> >         {
> >             "cluster" : "b",
> >             "message_id" : {
> >                 "ledger_id" : 1234,
> >                 "endtry_id" : 45678
> >             }
> >         },
> >         {
> >             "cluster" : "c",
> >             "message_id" : {
> >                 "ledger_id" : 7655,
> >                 "endtry_id" : 13421
> >             }
> >         }
> >     ],
> > }
> > ```
> >
> > The `local_message_id` field will be set to the the message id (in region
> > `a`) of the last response that completed the snapshot.
> >
> > Note, when there are more than 2 clusters involved, like in the above case
> > with cluster `a`, `b` and `c`, a second round of request-response will be
> > necessary, to ensure we are including all the message that might have
> > been exchanged between the remote clusters.
> >
> > In this situation, for the snapshot we will be using:
> >  * For remote cluster, the message id reported in the 1st round
> >  * For `local_message_id` the id of the last response from the 2nd round
> >
> > Typically there will be only one (or few) in progress snapshots. If a
> > region doesn't respond within a certain timeout period, the snapshot
> > will be aborted.
> >
> > The reason we cannot use partial snapshot is that we might be missing
> > some of the messages that were originated from that missing region.
> >
> > This is an example:
> >  1. `a` start a snapshot
> >  2. A message `M1-b` from `b` was replicated into `c` (and possibly not
> >      `a` or with a bigger delay)
> >  3. `c` returns a message id for the snapshot that includes `M1-b` (as
> >      replicated in `c`)
> >
> > If `a` doesn't wait for the snapshot response from `b`, it would then
> > instruct `b` to skip the `M1-b` message.
> >
> > As default behavior, to avoid any possible message loss, only completed
> > snapshot will be applied. In future some configuration or operational
> > tool could be provided to either:
> >
> >  * Create partial snapshots after a certain time that a region has been
> >    disconnected. Eg: after few hours just move on and restart creating
> >    snapshot, on the assumption that a region might be completely lost.
> >
> >  * Have a tool to manually re-enable the snapshots creations even in
> >    presence of failures.
> >
> > ### Storing snapshots
> >
> > A topic with replicated subscriptions enabled, will be periodically
> > creating snapshots, for example every 1 or 10 seconds.
> >
> > These snapshots need to be stored until the all the subscriptions have
> > moved past a certain point. Additionally, if time-based retention is
> > enabled, they would need to be stored for the same time as the underlying
> > data.
> >
> > In normal scenario, when consumers are caught up with the publishers,
> > the number of active snapshots will be small, though it would be
> > increasing if a consumer starts lagging behind.
> >
> > For this, the proposed solution is to store the snapshot as "marker"
> > messages in the topic itself, inline with the regular data. Similarly
> > to the other markers, these will not be propagated to clients and in
> > this cases they won't either be replicated to other regions.
> >
> > Given this, each subscription will be able to keep a small cache of
> > these snapshots (eg: 10 to 100 items) and keep updating as the
> > subscription read cursor progresses through the topic.
> >
> > ### Updating subscription in remote regions
> >
> > When a subscription moves the cursor ("mark-delete" position) forward,
> > it will lookup in the "replicated subscription snapshots cache" for a
> > snapshot with associated messageId that is <= to the current cursor
> > position.
> >
> > If a snapshot matching the criteria is found, the broker will publish
> > a `ReplicatedSubscriptionsUpdate`:
> >
> > ```json
> > {
> >     "subscription_name" : "my-subscription",
> >     "clusters" : [
> >         {
> >             "cluster" : "b",
> >             "message_id" : {
> >                 "ledger_id" : 1234,
> >                 "endtry_id" : 45678
> >             }
> >         },
> >         {
> >             "cluster" : "c",
> >             "message_id" : {
> >                 "ledger_id" : 7655,
> >                 "endtry_id" : 13421
> >             }
> >         }
> >     ],
> > }
> > ```
> >
> > The "update" marker is written locally and replicated everywhere.
> >
> > When a broker in the target region will receive the marker message to
> > update the subscription, it will move the mark-delete cursor to the new
> > message id for the specific region.
> >
> > If the subscription doesn't exist yet in that cluster, it will be
> > automatically created by the "update" marker.
> >
> >
> > --
> > Matteo Merli
> > <mme...@apache.org>

Reply via email to