Thanks Enrico.

I have updated the wiki page 
https://github.com/apache/pulsar/wiki/PIP-88%3A-Replicate-schemas-across-multiple.
 Due to a concurrent PIP number, I have change the PIP No. To 88.

Thanks,
Penghui
On Jul 14, 2021, 10:07 PM +0800, PengHui Li <peng...@apache.org>, wrote:
>
> ### Background
>
> Currently, if enabled data replication between different clusters and the 
> data is written with schema, the data will not available for reading at the 
> remote cluster because the schemas of the topic are not replicated to the 
> remote cluster, the consumer or reader will encounter schema not found 
> exception when reading data from the topic.
>
> So the purpose of this proposal is to support reading messages with schema 
> from the remote cluster. Before starting to discuss how to achieve the 
> purpose, I will discuss the schema version first in Pulsar, because this will 
> be an important concept repeatedly mentioned in subsequent discussions.
>
> In Pulsar, the schema version is incremental within a cluster, the newly 
> added schema will be stored into the schema registry with a schema 
> version(last schema version + 1). When the producer publishes messages to a 
> topic, the producer will try to register the schema to the schema registry 
> and will get a responded schema version, of course, if the schema already 
> exists, the schema will not put into the schema registry again. And then the 
> responded schema version will be used in the message metadata. For a 
> consumer, it will fetch the schema by the schema version that the message 
> used for the message deserialization.
>
> For sharing the schema between different clusters, we can have different ways
>
> - A centralized schema registry that can be accessed by each cluster
> - Fetch the schema from the remote cluster directly
> - Replicate the schema to the remote clusters
>
> Both of them can share the schema between different clusters, but considering 
> the deployment complexity and maintenance, and each cluster should have the 
> ability to run independently. So the last one is the proposed solution for 
> achieving schema sharing.
>
> For the first approach, a centralized schema registry will be a required 
> service for each cross-region cluster. This will bring the complexity of the 
> schema registry and maintenance. And the centralized schema registry means a 
> topic across multiple clusters must have the same schema sequence. But in 
> some cases, the topic might have different schema sequences for different 
> clusters such as replicate data from the edge Pulsar service to the data 
> center. Keep the schema system independent of clusters is also more in line 
> with the principle that each cluster can be run separately.
> For the second approach, we don’t need a centralized schema registry and 
> replicate schemas across different clusters, but It will violate the 
> principle that the cluster can be run separately because reading data from a 
> topic for a cluster will depend on the schema that in another cluster. And 
> this will make the schema compatibility check more complex because a cluster 
> might do not have all schemas of the existing data for a topic. It needs to 
> fetch all schemas from the remote cluster and correct the order in which they 
> were added.
>
> So, let us discuss the details of the third approach in the next section.
>
> ### Solution
>
> In this section, we will focus on the third approach introduced in the 
> previous section. So the key point we will discuss later is how to replicate 
> schema to the remote cluster and how to deal with the impact of this on the 
> current schema compatibility check.
>
> The replicator using a durable cursor for reading the data(entry/batch) from 
> the ManagedLedger and then using a producer to publish data to the remote 
> cluster. This solution is very similar to the AUTO_PRODUCE_BYTES schema 
> introduced by PIP 43 
> https://github.com/apache/pulsar/wiki/PIP-43%3A-producer-send-message-with-different-schema.
>  If the message with a new schema has not registered to the remote cluster, 
> the producer will create the schema to the remote cluster first and then use 
> the schema version of the newly added schema to publish messages to the 
> remote cluster.
>
> So, the schema version of the message that will replicate to the remote 
> cluster will be rewritten during the replication process. Since we already 
> deserialize the message metadata for setting the replicate_from field, so 
> rewrite the schema version will not bring more resource consumption. So the 
> same message in different clusters might have different schema versions but 
> the same schema, of course, this doesn’t happen normally.
>
> If there are some messages that have not been replicated to the remote 
> clusters and the messages are not with the latest schema, at this time a new 
> schema is added to the remote cluster. Or the messages are replicated from 
> multiple clusters with different schemas, this will also lead to the 
> inconsistent schema version across clusters.
>
> For the schema evolution, users might consider the data replication state 
> according to the different schema compatibility check strategies of the topic 
> when they add a new schema to the topic. Just make sure the new schema will 
> not prevent the data replication from the remote cluster. A simple way is to 
> check if all the schemas from the remote clusters are replicated to the local 
> cluster before adding a new schema to the local cluster for a topic, of 
> course, if the schema compatibility check strategy for the topic is 
> ALWAYS_COMPATIBLE, looks like pre-check is redundant.
>
> If the replicator encounters the schema incompatibility issue during the 
> replication, it will stop the data replication until the problem is resolved. 
> Or users want to skip the incompatible data, just need to reset the cursor 
> for the replicator, but this is dangerous because this will lost the data 
> from the remote cluster.
>
> ### Implementation
>
> Just follow the PIP 43 implementation to enhance the replicator. And most of 
> the current components can be reused such as schema cache at the client-side.
> Compatibility
> The approach will not introduce any compatibility issues.
> ### Tests Plan
> - Make sure the message with schema can be consumed at the remote cluster
> - The topic policy using the Avro schema, so the topic policy should 
> available for the remote cluster
> - Make sure the replicator will retry to replicate the schema if got an 
> exception from the remote cluster except for the incompatible exception

Reply via email to