Hello Dawid: Thanks for your answers and references. I do have a few questions: 1. Is there any scenario where the reader and writer schema should differ ? 2. How is the mismatch b/w the two schemas (one passed as argument and other retrieved from schema registry) resolved at run time ? 3. As mentioned - "If you provide just the writer schema, the reader schema is assumed to be the same." - Is it possible in Flink to just use schema registry to retrieve the schema which can be used for both reading/writing ? 4. Regarding "You can say that we read multiple different versions from e.g. Kafka and normalize it to that provided schema that's required across the pipeline." - Does this mean that if the reader (passed as argument) and writer (retrieved from the registry) schemas differ then Flink will normalize the differences ? If so, are there any guidelines as to how the fields are normalized ? 5. Regarding: "he forGeneric is the readerSchema and at the same time the schema that Flink will be working with in the pipeline. It will be the schema used to serialize and deserialize records between different TaskManagers" - Is the reader schema (passed as argument) used for reading and writing b/w Taskmanagers, the what role does schema from the registry play ? Does it have to do with the "normalization" you've mentioned ? Thanks again for your time. Mans On Tuesday, July 13, 2021, 10:22:32 AM EDT, Dawid Wysakowicz <dwysakow...@apache.org> wrote: Hi, Yes, you are right the schema in the forGeneric is the readerSchema and at the same time the schema that Flink will be working with in the pipeline. It will be the schema used to serialize and deserialize records between different TaskManagers. Between the Flink TaskManagers that schema plays the role of both the reader and the writer schema. The way that Avro works is that you must provide both the writer and the reader schema. Otherwise it simply does not work. If you provide just the writer schema, the reader schema is assumed to be the same. Without the writer schema it is not possible to deserialize Avro. See extract from Avro spec: [1] Binary encoded Avro data does not include type information or field names. The benefit is that the serialized data is small, but as a result a schema must always be used in order to read Avro data correctly. The best way to ensure that the schema is structurally identical to the one used to write the data is to use the exact same schema. Therefore, files or systems that store Avro data should always include the writer's schema for that data. Avro-based remote procedure call (RPC) systems must also guarantee that remote recipients of data have a copy of the schema used to write that data. In general, it is advisable that any reader of Avro data should use a schema that is the same (as defined more fully in Parsing Canonical Form for Schemas) as the schema that was used to write the data in order to deserialize it correctly. Deserializing data into a newer schema is accomplished by specifying an additional schema, the results of which are described in Schema Resolution. Now to the question why do we need to pass a schema as a parameter for the forGeneric method. For the DeserializationSchema, so when reading e.g. from Kafka it is indeed used as the reader schema. However as I said earlier it is also used when serializing between Flink's TaskManagers. In this scenario it is used both as the writer and as the reader schema. The design here is that we do not want to query the schema registry from every TaskManager. You can say that we read multiple different versions from e.g. Kafka and normalize it to that provided schema that's required across the pipeline. The reason why you don't need to provide any schema in the KafkaAvroDeserializer is that it is ever used in a single parallel instance and it is not sent over a network again. So basically there you use the writer schema retrieved from schema registry as the reader schema. I hope this answers your questions. Best, Dawid [1] https://avro.apache.org/docs/1.10.2/spec.html On 09/07/2021 03:09, M Singh wrote: Hi: I am trying to read avro encoded messages from Kafka with schema registered in schema registry. I am using the class (https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html) using the method: | static ConfluentRegistryAvroDeserializationSchema <org.apache.avro.generic.GenericRecord>forGeneric(...) | |
The arguments for this method are: forGeneric(org.apache.avro.Schema schema, String url) Creates ConfluentRegistryAvroDeserializationSchema that produces GenericRecord using the provided reader schema and looks up the writer schema in the Confluent Schema Registry. As I understand, the schema is the reader schema and the url is the schema registry url used to retrieve writer schema. I have a few questions: 1. Why do we need the writer schema from the registry ? Is it to validate that the reader schema is same as writer schema ? 2. Since the url of the schema registry is provided, why do we need to provide the reader schema ? Can the schema be retrieved at run time from the avro message metadata dynamically and then cache it (as shown in the example snippet from confluent below) ? The confluent consumer example (https://docs.confluent.io/5.0.0/schema-registry/docs/serializer-formatter.html) has the following example snippet where the schema.registry.url is provided to the consumer and the message can be converted to generic record using the KafkaAvroDeserializer without the need to pass the reader schema. <snip> Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); props.put("schema.registry.url", "http://localhost:8081"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); String topic = "topic1"; final Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value()); } } } finally { consumer.close(); } <snip> Please let me know if I have missed anything and there is a way to read avro encoded messages from kafka with schema registry without requiring reader schema. Thanks