BTW: I also added to a Google doc if someone prefers to add comments/suggestions there rather here inline. -- Matteo Merli <mme...@apache.org>
On Fri, Mar 22, 2019 at 12:14 PM Matteo Merli <mme...@apache.org> wrote: > > https://github.com/apache/pulsar/wiki/PIP-33%3A-Replicated-subscriptions > > ---------------------------------------------------------------------------------------- > > * **Status**: Proposal > * **Author**: Ivan Kelly, Matteo Merli > * **Pull Request**: > * **Mailing List discussion**: > * **Release**: > > ## Goal > > Provide a mechanism to keep subscription state in-sync, within a > sub-second timeframe, in the context of a topic that is being > asynchronously replicated across multiple geographical regions. > > ## Current state of affairs > > Pulsar support geo-replication feature, in which a topic can be > configured to be replicated across N regions, (eg: `us-west`, `us-east` > and `eu-central`). > > The topic is presented as a virtual "global" entity in which messages > can be published and consumer from any of the configured cluster. > > The only limitation is that subscriptions are currently "local" to the > cluster in which they are created. That is, no state for the subscription > is transferred across regions. > > If a consumer reconnects to a new region it will trigger the creation of a > new unrelated subscription, albeit with the same name. This subscription > will be created at the end of the topic in the new region (or at the > beginning, depending on configuration) and at the same time, the > original subscription will be left dangling in the previous region. > > The main problem is that the message ids of the messages are not > consistent across different regions. > > ## Problem we're trying to solve > > There are many scenarios in which it would be very convenient for > an application to have the ability to failover consumers from one > region to another. > > During this failover event, a consumer should be able to restart > consumer from where it left off in the previous region. > > Given that for the very nature of async replication, having the > exact position will be impossible, in most cases restarting "close" > to that point will be already good enough. > > ## Proposed solution > > A Pulsar topic that is being geo-replicated can be seen as a collection > of partially ordered logs. > > Since producers can publish messages on each of the regions, each region > can end up having a sequence of messages different from the others, though > messages from one particular region will be always stored in order. > > The main idea is to create a consistent distributed snapshot to establish > an association between message ids from different clusters. > > The snapshot of message ids will be constructed in a way such that: > * For a given message `M1-a` (written or replicated into region `a`) > * We have a set of associated message ids from other regions, eg. `M3-b` > and `M1-c` > * When a consumer has acknowledged all messages <= `M1-a`, it will > imply that it also have received (and acknowledged) all messages in > region `b` with message id <= `M3-b` and all messages in region `c` > with message id <= `M1-c` > * With this, when the "mark-delete" position (the cursor pointer) of > a given subscription moves past the `M1-a` message in region `a`, > the broker will be able to instruct brokers in `b` and `c` to > update the subscription respectively to `M3-b` and `M1-c`. > > These snapshots will be stored as "marker" messages in the topic itself > and they will be filtered out by broker before dispatching messages > to consumers. > > Similarly, the snapshot themselves will be created by letting "marker" > messages flow inline through the replication channel. > > ### Advantages > > * The logic is quite simple and straightforward to implement > * Low and configurable overhead when enabled > * Zero overhead when disabled > > ### Limitations > > * The snapshots are taken periodically. Proposed default is every 1 > second. That will > mean that a consumer failing over to a different cluster can > potentially receive > 1 second worth of duplicates. > * For this proposal, we're only targeting to sync the "mark-delete" > position (eg: offset), > without considering the messages deleted out of order after that > point. These will > appear as duplicates after a cluster failover. In future, it might > be possible to > combine different techniques to track individually deleted messages as > well. > * The snapshots can only be taken if all involved clusters are > available. This is to > ensure correctness and avoid skipping over messages published in > remote clusters that > were not yet seen by consumers. > The practical implication of this is that this proposal is useful to > either: > - Support consumer failover across clusters when there are no failures > - Support one cluster failure and allow consumer to immediately > recover from a different > cluster. > After one cluster is down, the snapshots will not be taken, so it > will not be possible > to do another consumer failover to another cluster (and preserve > the position) until > the failed cluster is either brought back online, or removed from > the replication list. > > ## Proposed implementation > > ### Client API > > Applications that want to enable the replication subscription feature > will be able to configure so when creating a consumer. For example: > > ```java > Consumer<String> consumer = client.newConsumer(Schema.STRING) > .topic("my-topic") > .subscriptionName("my-subscription") > .replicateSubscriptionState(true) > .subscribe(); > ``` > > ### Marker messages > > Most of the implementation of replicated subscription is based on > establishing a set of points in the topic storage for each region > for which there's strict relation with each region's message ids. > > To achieve that, the communication between brokers needs to be done > inline with the same flow of messages replicated across regions and > it will have to establish a new message id that can be referenced > from the other regions. > > An additional usage of marker messages will be to store snapshot > information in a scalable way, so that we don't have to keep all > the snapshots together but rather we can reconstruct them while we > fetch the entries from BookKeeper (eg: when a consumer comes back > after a while and starts draining the backlog). > > Essentially, marker messages will be a special class of messages that > are used by Pulsar for internal purposes and are stored inline > in the topic. > > These messages will be identified on the `MessageMetadata` protobuf > definition with one additional field: > > ```protobuf > // Contains the enum value with the type of marker > // Each marker message will have a different format defined > // in protobuf > optional int32 marker_type = 18; > ``` > > ### Constructing a cursor snapshot > > When the Pulsar broker is serving a topic for which at least one > subscription is "replicated", it will activate a periodic task to > create the cursor snapshot. > > The frequency of this snapshot will be configurable in `broker.conf` and, > possibly, also as part of namespace policies (though that might not be > necessary in the first implementation). > > #### Snapshot steps > > ##### Source region starts a snapshot > > Broker in region `a` will start a new snapshot by writing locally a > marker message like: > > ```json > "ReplicatedSubscriptionsSnapshotRequest" : { > "snapshot_id" : "444D3632-F96C-48D7-83DB-041C32164EC1", > "source_cluster" : "a", > } > ``` > > This marker will get replicated to all other clusters, eg: `b` and `c`. > > When replicators in each of the other regions will get this marker > message, they will reply by sending another marker back to region `a`. > > ```json > "ReplicatedSubscriptionsSnapshotResponse" : { > "snapshotId" : "444D3632-F96C-48D7-83DB-041C32164EC1", > "cluster" : { > "cluster" : "b", > "message_id" : { > "ledger_id" : 1234, > "endtry_id" : 45678 > } > } > } > ``` > > Broker in region `a` will wait to receive all responses from `b` and `c` > and then it will finalize the snapshot. > > The snapshot content will be like: > > ```json > { > "snapshot_id" : "444D3632-F96C-48D7-83DB-041C32164EC1", > "local_message_id" : { > "ledger_id" : 192, > "endtry_id" : 123123 > }, > "clusters" : [ > { > "cluster" : "b", > "message_id" : { > "ledger_id" : 1234, > "endtry_id" : 45678 > } > }, > { > "cluster" : "c", > "message_id" : { > "ledger_id" : 7655, > "endtry_id" : 13421 > } > } > ], > } > ``` > > The `local_message_id` field will be set to the the message id (in region > `a`) of the last response that completed the snapshot. > > Note, when there are more than 2 clusters involved, like in the above case > with cluster `a`, `b` and `c`, a second round of request-response will be > necessary, to ensure we are including all the message that might have > been exchanged between the remote clusters. > > In this situation, for the snapshot we will be using: > * For remote cluster, the message id reported in the 1st round > * For `local_message_id` the id of the last response from the 2nd round > > Typically there will be only one (or few) in progress snapshots. If a > region doesn't respond within a certain timeout period, the snapshot > will be aborted. > > The reason we cannot use partial snapshot is that we might be missing > some of the messages that were originated from that missing region. > > This is an example: > 1. `a` start a snapshot > 2. A message `M1-b` from `b` was replicated into `c` (and possibly not > `a` or with a bigger delay) > 3. `c` returns a message id for the snapshot that includes `M1-b` (as > replicated in `c`) > > If `a` doesn't wait for the snapshot response from `b`, it would then > instruct `b` to skip the `M1-b` message. > > As default behavior, to avoid any possible message loss, only completed > snapshot will be applied. In future some configuration or operational > tool could be provided to either: > > * Create partial snapshots after a certain time that a region has been > disconnected. Eg: after few hours just move on and restart creating > snapshot, on the assumption that a region might be completely lost. > > * Have a tool to manually re-enable the snapshots creations even in > presence of failures. > > ### Storing snapshots > > A topic with replicated subscriptions enabled, will be periodically > creating snapshots, for example every 1 or 10 seconds. > > These snapshots need to be stored until the all the subscriptions have > moved past a certain point. Additionally, if time-based retention is > enabled, they would need to be stored for the same time as the underlying > data. > > In normal scenario, when consumers are caught up with the publishers, > the number of active snapshots will be small, though it would be > increasing if a consumer starts lagging behind. > > For this, the proposed solution is to store the snapshot as "marker" > messages in the topic itself, inline with the regular data. Similarly > to the other markers, these will not be propagated to clients and in > this cases they won't either be replicated to other regions. > > Given this, each subscription will be able to keep a small cache of > these snapshots (eg: 10 to 100 items) and keep updating as the > subscription read cursor progresses through the topic. > > ### Updating subscription in remote regions > > When a subscription moves the cursor ("mark-delete" position) forward, > it will lookup in the "replicated subscription snapshots cache" for a > snapshot with associated messageId that is <= to the current cursor > position. > > If a snapshot matching the criteria is found, the broker will publish > a `ReplicatedSubscriptionsUpdate`: > > ```json > { > "subscription_name" : "my-subscription", > "clusters" : [ > { > "cluster" : "b", > "message_id" : { > "ledger_id" : 1234, > "endtry_id" : 45678 > } > }, > { > "cluster" : "c", > "message_id" : { > "ledger_id" : 7655, > "endtry_id" : 13421 > } > } > ], > } > ``` > > The "update" marker is written locally and replicated everywhere. > > When a broker in the target region will receive the marker message to > update the subscription, it will move the mark-delete cursor to the new > message id for the specific region. > > If the subscription doesn't exist yet in that cluster, it will be > automatically created by the "update" marker. > > > -- > Matteo Merli > <mme...@apache.org>