Hi, Yes, you are right the schema in the forGeneric is the readerSchema and at the same time the schema that Flink will be working with in the pipeline. It will be the schema used to serialize and deserialize records between different TaskManagers. Between the Flink TaskManagers that schema plays the role of both the reader and the writer schema.
The way that Avro works is that you must provide both the writer and the reader schema. Otherwise it simply does not work. If you provide just the writer schema, the reader schema is assumed to be the same. Without the writer schema it is not possible to deserialize Avro. See extract from Avro spec: [1] Binary encoded Avro data does not include type information or field names. The benefit is that the serialized data is small, but as a result a schema must always be used in order to read Avro data correctly. The best way to ensure that the schema is structurally identical to the one used to write the data is to use the exact same schema. Therefore, files or systems that store Avro data should always include the writer's schema for that data. Avro-based remote procedure call (RPC) systems must also guarantee that remote recipients of data have a copy of the schema used to write that data. In general, it is advisable that any reader of Avro data should use a schema that is the same (as defined more fully in Parsing Canonical Form for Schemas <https://avro.apache.org/docs/1.10.2/spec.html#Parsing+Canonical+Form+for+Schemas>) as the schema that was used to write the data in order to deserialize it correctly. Deserializing data into a newer schema is accomplished by specifying an additional schema, the results of which are described in Schema Resolution <https://avro.apache.org/docs/1.10.2/spec.html#Schema+Resolution>. Now to the question why do we need to pass a schema as a parameter for the forGeneric method. For the DeserializationSchema, so when reading e.g. from Kafka it is indeed used as the reader schema. However as I said earlier it is also used when serializing between Flink's TaskManagers. In this scenario it is used both as the writer and as the reader schema. The design here is that we do not want to query the schema registry from every TaskManager. You can say that we read multiple different versions from e.g. Kafka and normalize it to that provided schema that's required across the pipeline. The reason why you don't need to provide any schema in the KafkaAvroDeserializer is that it is ever used in a single parallel instance and it is not sent over a network again. So basically there you use the writer schema retrieved from schema registry as the reader schema. I hope this answers your questions. Best, Dawid [1] https://avro.apache.org/docs/1.10.2/spec.html On 09/07/2021 03:09, M Singh wrote: > Hi: > > I am trying to read avro encoded messages from Kafka with schema > registered in schema registry. > > I am using the class > (https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html > <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html>) > using the method: > > |static ConfluentRegistryAvroDeserializationSchema > <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html> > <org.apache.avro.generic.GenericRecord>forGeneric(...)| > > > The arguments for this method are: > > |forGeneric > <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html#forGeneric-org.apache.avro.Schema-java.lang.String->(org.apache.avro.Schema > schema, String > <http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true> > url)| > > Creates |ConfluentRegistryAvroDeserializationSchema| > <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html> > that > produces |GenericRecord| using the provided reader schema and looks up > the writer schema in the Confluent Schema Registry. > > As I understand, the schema is the reader schema and the url is the > schema registry url used to retrieve writer schema. > > I have a few questions: > > 1. Why do we need the writer schema from the registry ? Is it to > validate that the reader schema is same as writer schema ? > 2. Since the url of the schema registry is provided, why do we need to > provide the reader schema ? Can the schema be retrieved at run time > from the avro message metadata dynamically and then cache it (as shown > in the example snippet from confluent below) ? > > The confluent consumer example > (https://docs.confluent.io/5.0.0/schema-registry/docs/serializer-formatter.html > <https://docs.confluent.io/5.0.0/schema-registry/docs/serializer-formatter.html>) > has the following example snippet where the schema.registry.url is > provided to the consumer and the message can be converted to generic > record using the KafkaAvroDeserializer without the need to pass the > reader schema. > > <snip> > > Properties props = new Properties(); > > props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); > > > props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "io.confluent.kafka.serializers.KafkaAvroDeserializer"); > props.put("schema.registry.url", "http://localhost:8081"); > > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > > String topic = "topic1"; > final Consumer<String, GenericRecord> consumer = new > KafkaConsumer<String, String>(props); > consumer.subscribe(Arrays.asList(topic)); > > try { > while (true) { > ConsumerRecords<String, String> records = consumer.poll(100); > for (ConsumerRecord<String, String> record : records) { > System.out.printf("offset = %d, key = %s, value = %s \n", > record.offset(), record.key(), record.value()); > } > } > } finally { > consumer.close(); > } > > > <snip> > > Please let me know if I have missed anything and there is a way to > read avro encoded messages from kafka with schema registry without > requiring reader schema. > > Thanks > > > > >
OpenPGP_signature
Description: OpenPGP digital signature