Hello Dawid:
Thanks for your answers and references.
I do have a few questions:
1. Is there any scenario where the reader and writer schema should differ ? 
2. How is the mismatch b/w the two schemas (one passed as argument and other 
retrieved from schema registry) resolved at run time ?
3. As mentioned - "If you provide just the writer schema, the reader schema is 
assumed to be the same." - Is it possible in Flink to just use schema registry 
to retrieve the schema which can be used for both reading/writing  ?
4. Regarding "You can say that we read multiple different versions from e.g. 
Kafka and normalize it to that provided schema that's required across the 
pipeline." - Does this mean that if the reader (passed as argument) and writer 
(retrieved from the registry) schemas differ then Flink will normalize the 
differences ? If so, are there any guidelines as to how the fields are 
normalized ?
5. Regarding: "he forGeneric is the readerSchema and at the same time the 
schema that Flink will be working with in the pipeline. It will be the schema 
used to serialize and deserialize records between different TaskManagers" - Is 
the reader schema (passed as argument) used for reading and writing b/w 
Taskmanagers, the what role does schema from the registry play ?  Does it have 
to do with the "normalization" you've mentioned ?
Thanks again for your time.
Mans
    On Tuesday, July 13, 2021, 10:22:32 AM EDT, Dawid Wysakowicz 
<dwysakow...@apache.org> wrote:  
 
  
Hi,
 
Yes, you are right the schema in the forGeneric is the readerSchema and at the 
same time the schema that Flink will be working with in the pipeline. It will 
be the schema used to serialize and deserialize records between different 
TaskManagers. Between the Flink TaskManagers that schema plays the role of both 
the reader and the writer schema.
 
The way that Avro works is that you must provide both the writer and the reader 
schema. Otherwise it simply does not work. If you provide just the writer 
schema, the reader schema is assumed to be the same. Without the writer schema 
it is not possible to deserialize Avro. See extract from Avro spec: [1]
 
 
Binary encoded Avro data does not include type information or field names. The 
benefit is that the serialized data is small, but as a result a schema must 
always be used in order to read Avro data correctly. The best way to ensure 
that the schema is structurally identical to the one used to write the data is 
to use the exact same schema.
   
Therefore, files or systems that store Avro data should always include the 
writer's schema for that data. Avro-based remote procedure call (RPC) systems 
must also guarantee that remote recipients of data have a copy of the schema 
used to write that data. In general, it is advisable that any reader of Avro 
data should use a schema that is the same (as defined more fully in Parsing 
Canonical Form for Schemas) as the schema that was used to write the data in 
order to deserialize it correctly. Deserializing data into a newer schema is 
accomplished by specifying an additional schema, the results of which are 
described in Schema Resolution.
 
 
Now to the question why do we need to pass a schema as a parameter for the 
forGeneric method. For the DeserializationSchema, so when reading e.g. from 
Kafka it is indeed used as the reader schema. However as I said earlier it is 
also used when serializing between Flink's TaskManagers. In this scenario it is 
used both as the writer and as the reader schema. The design here is that we do 
not want to query the schema registry from every TaskManager. You can say that 
we read multiple different versions from e.g. Kafka and normalize it to that 
provided schema that's required across the pipeline.
 
The reason why you don't need to provide any schema in the 
KafkaAvroDeserializer is that it is ever used in a single parallel instance and 
it is not sent over a network again. So basically there you use the writer 
schema retrieved from schema registry as the reader schema.
 
I hope this answers your questions.
 
Best,
 
Dawid
 
 
[1] https://avro.apache.org/docs/1.10.2/spec.html
 
 On 09/07/2021 03:09, M Singh wrote:
  
 
 Hi: 
  I am trying to read avro encoded messages from Kafka with schema registered 
in schema registry. 
  I am using the class 
(https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html)
 using the method: 
    
| static ConfluentRegistryAvroDeserializationSchema
<org.apache.avro.generic.GenericRecord>forGeneric(...) | 
  |

   
  The arguments for this method are: 
   forGeneric(org.apache.avro.Schema schema, String url) Creates 
ConfluentRegistryAvroDeserializationSchema that produces GenericRecord using 
the provided reader schema and looks up the writer schema in the Confluent 
Schema Registry. 
   As I understand, the schema is the reader schema and the url is the schema 
registry url used to retrieve writer schema.   
  I have a few questions: 
  1. Why do we need the writer schema from the registry ?  Is it to validate 
that the reader schema is same as writer schema ? 2. Since the url of the 
schema registry is provided, why do we need to provide the reader schema ? Can 
the schema be retrieved at run time from the avro message metadata dynamically 
and then cache it (as shown in the example snippet from confluent below) ?   
  The confluent consumer example 
(https://docs.confluent.io/5.0.0/schema-registry/docs/serializer-formatter.html)
 has the following example snippet where the schema.registry.url is provided to 
the consumer and the message can be converted to generic record using the 
KafkaAvroDeserializer without the need to pass the reader schema. 
  <snip> 
    Properties props = new Properties(); 
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); 
  
  
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"io.confluent.kafka.serializers.KafkaAvroDeserializer"); 
props.put("schema.registry.url", "http://localhost:8081";); 
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
  String topic = "topic1"; final Consumer<String, GenericRecord> consumer = new 
KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); 
  try {   while (true) {     ConsumerRecords<String, String> records = 
consumer.poll(100);     for (ConsumerRecord<String, String> record : records) { 
      System.out.printf("offset = %d, key = %s, value = %s \n", 
record.offset(), record.key(), record.value());     }   } } finally {   
consumer.close(); }  
  
  <snip> 
  Please let me know if I have missed anything and there is a way to read avro 
encoded messages from kafka with schema registry without requiring reader 
schema. 
  Thanks 
  
  
  
  
     

Reply via email to