Hi Anuj, you should always avoid having records with different schemas in the same topic/dataset. You will break the compatibility features of the schema registry and your consumer/producer code is always hard to maintain.
A common and scalable way to avoid it is to use some kind of envelope format. { "namespace": "example", "name": "Envelope", "type": "record", "fields": [ { "name": "type1", "type": ["null", { "type": "record", "fields": [ ... ] }], "default": null }, { "name": "type2", "type": ["null", { "type": "record", "fields": [ ... ] }], "default": null } ] } This envelope is evolvable (arbitrary addition/removal of wrapped types, which by themselves can be evolved), and adds only a little overhead (1 byte per subtype). The downside is that you cannot enforce that exactly one of the subtypes is set. This schema is fully compatible with the schema registry, so no need to parse anything manually. This schema can easily be used with Parquet. If you can't change the input format anymore, you can at least use that approach on your output. Best, Arvid On Thu, Jan 16, 2020 at 2:53 PM aj <ajainje...@gmail.com> wrote: > Hi All, > > I have a use case where I am getting a different set of Avro records in > Kafka. I am using the schema registry to store Avro schema. One topic can > also have different types of records. > > Now I have created a GenericRecord Stream using kafkaAvroDeseralizer by > defining custom > Deserializer class like this > > @Override > public GenericRecord deserialize( > byte[] messageKey, byte[] message, String topic, int partition, long > offset) { > checkInitialized(); > return (GenericRecord) inner.deserialize(topic, message); > } > > private void checkInitialized() { > if (inner == null) { > Map<String, Object> props = new HashMap<>(); > props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, > registryUrl); > props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false); > SchemaRegistryClient client = > new CachedSchemaRegistryClient( > registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); > inner = new KafkaAvroDeserializer(client, props); > } > } > > > And this is my consumer code : > > DataStreamSource<GenericRecord> input = env > .addSource( > new FlinkKafkaConsumer010<GenericRecord>(topics, > new > KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), > config).setStartFromEarliest()); > > Now I want to write this stream partition on > *event_name="a"/year=/month=/day=* in parquet format so that I can expose > hive tables directly on top of this data. > event_name is common field for all types of records that I am getting in > Kafka. > I am stuck as parquet writer needs a schema to write but my different > records have different schemas So how do I write this stream in s3 in > above partition format. > > > Thanks & Regards, > Anuj Jain > > > > <http://www.cse.iitm.ac.in/%7Eanujjain/> >