Thanks Enrico. I have updated the wiki page https://github.com/apache/pulsar/wiki/PIP-88%3A-Replicate-schemas-across-multiple. Due to a concurrent PIP number, I have change the PIP No. To 88.
Thanks, Penghui On Jul 14, 2021, 10:07 PM +0800, PengHui Li <peng...@apache.org>, wrote: > > ### Background > > Currently, if enabled data replication between different clusters and the > data is written with schema, the data will not available for reading at the > remote cluster because the schemas of the topic are not replicated to the > remote cluster, the consumer or reader will encounter schema not found > exception when reading data from the topic. > > So the purpose of this proposal is to support reading messages with schema > from the remote cluster. Before starting to discuss how to achieve the > purpose, I will discuss the schema version first in Pulsar, because this will > be an important concept repeatedly mentioned in subsequent discussions. > > In Pulsar, the schema version is incremental within a cluster, the newly > added schema will be stored into the schema registry with a schema > version(last schema version + 1). When the producer publishes messages to a > topic, the producer will try to register the schema to the schema registry > and will get a responded schema version, of course, if the schema already > exists, the schema will not put into the schema registry again. And then the > responded schema version will be used in the message metadata. For a > consumer, it will fetch the schema by the schema version that the message > used for the message deserialization. > > For sharing the schema between different clusters, we can have different ways > > - A centralized schema registry that can be accessed by each cluster > - Fetch the schema from the remote cluster directly > - Replicate the schema to the remote clusters > > Both of them can share the schema between different clusters, but considering > the deployment complexity and maintenance, and each cluster should have the > ability to run independently. So the last one is the proposed solution for > achieving schema sharing. > > For the first approach, a centralized schema registry will be a required > service for each cross-region cluster. This will bring the complexity of the > schema registry and maintenance. And the centralized schema registry means a > topic across multiple clusters must have the same schema sequence. But in > some cases, the topic might have different schema sequences for different > clusters such as replicate data from the edge Pulsar service to the data > center. Keep the schema system independent of clusters is also more in line > with the principle that each cluster can be run separately. > For the second approach, we don’t need a centralized schema registry and > replicate schemas across different clusters, but It will violate the > principle that the cluster can be run separately because reading data from a > topic for a cluster will depend on the schema that in another cluster. And > this will make the schema compatibility check more complex because a cluster > might do not have all schemas of the existing data for a topic. It needs to > fetch all schemas from the remote cluster and correct the order in which they > were added. > > So, let us discuss the details of the third approach in the next section. > > ### Solution > > In this section, we will focus on the third approach introduced in the > previous section. So the key point we will discuss later is how to replicate > schema to the remote cluster and how to deal with the impact of this on the > current schema compatibility check. > > The replicator using a durable cursor for reading the data(entry/batch) from > the ManagedLedger and then using a producer to publish data to the remote > cluster. This solution is very similar to the AUTO_PRODUCE_BYTES schema > introduced by PIP 43 > https://github.com/apache/pulsar/wiki/PIP-43%3A-producer-send-message-with-different-schema. > If the message with a new schema has not registered to the remote cluster, > the producer will create the schema to the remote cluster first and then use > the schema version of the newly added schema to publish messages to the > remote cluster. > > So, the schema version of the message that will replicate to the remote > cluster will be rewritten during the replication process. Since we already > deserialize the message metadata for setting the replicate_from field, so > rewrite the schema version will not bring more resource consumption. So the > same message in different clusters might have different schema versions but > the same schema, of course, this doesn’t happen normally. > > If there are some messages that have not been replicated to the remote > clusters and the messages are not with the latest schema, at this time a new > schema is added to the remote cluster. Or the messages are replicated from > multiple clusters with different schemas, this will also lead to the > inconsistent schema version across clusters. > > For the schema evolution, users might consider the data replication state > according to the different schema compatibility check strategies of the topic > when they add a new schema to the topic. Just make sure the new schema will > not prevent the data replication from the remote cluster. A simple way is to > check if all the schemas from the remote clusters are replicated to the local > cluster before adding a new schema to the local cluster for a topic, of > course, if the schema compatibility check strategy for the topic is > ALWAYS_COMPATIBLE, looks like pre-check is redundant. > > If the replicator encounters the schema incompatibility issue during the > replication, it will stop the data replication until the problem is resolved. > Or users want to skip the incompatible data, just need to reset the cursor > for the replicator, but this is dangerous because this will lost the data > from the remote cluster. > > ### Implementation > > Just follow the PIP 43 implementation to enhance the replicator. And most of > the current components can be reused such as schema cache at the client-side. > Compatibility > The approach will not introduce any compatibility issues. > ### Tests Plan > - Make sure the message with schema can be consumed at the remote cluster > - The topic policy using the Avro schema, so the topic policy should > available for the remote cluster > - Make sure the replicator will retry to replicate the schema if got an > exception from the remote cluster except for the incompatible exception