Hi All, I have a use case where I am getting a different set of Avro records in Kafka. I am using the schema registry to store Avro schema. One topic can also have different types of records.
Now I have created a GenericRecord Stream using kafkaAvroDeseralizer by defining custom Deserializer class like this @Override public GenericRecord deserialize( byte[] messageKey, byte[] message, String topic, int partition, long offset) { checkInitialized(); return (GenericRecord) inner.deserialize(topic, message); } private void checkInitialized() { if (inner == null) { Map<String, Object> props = new HashMap<>(); props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl); props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false); SchemaRegistryClient client = new CachedSchemaRegistryClient( registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); inner = new KafkaAvroDeserializer(client, props); } } And this is my consumer code : DataStreamSource<GenericRecord> input = env .addSource( new FlinkKafkaConsumer010<GenericRecord>(topics, new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), config).setStartFromEarliest()); Now I want to write this stream partition on *event_name="a"/year=/month=/day=* in parquet format so that I can expose hive tables directly on top of this data. event_name is common field for all types of records that I am getting in Kafka. I am stuck as parquet writer needs a schema to write but my different records have different schemas So how do I write this stream in s3 in above partition format. Thanks & Regards, Anuj Jain <http://www.cse.iitm.ac.in/%7Eanujjain/>