Hi All,

I have a use case where I am getting a different set of Avro records in
Kafka. I am using the schema registry to store Avro schema. One topic can
also have different types of records.

Now I have created a GenericRecord Stream using kafkaAvroDeseralizer by
defining custom
Deserializer class like this

@Override
public GenericRecord deserialize(
byte[] messageKey, byte[] message, String topic, int partition, long
offset) {
checkInitialized();
return (GenericRecord) inner.deserialize(topic, message);
}

private void checkInitialized() {
if (inner == null) {
Map<String, Object> props = new HashMap<>();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
registryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
inner = new KafkaAvroDeserializer(client, props);
}
}


And this is my consumer code :

DataStreamSource<GenericRecord> input = env
        .addSource(
                new FlinkKafkaConsumer010<GenericRecord>(topics,
                        new
KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
                        config).setStartFromEarliest());

Now I want to write this stream partition on
*event_name="a"/year=/month=/day=* in parquet format so that I can expose
hive tables directly on top of this data.
event_name is common field for all types of records that I am getting in
Kafka.
I am stuck as parquet writer needs a schema to write but my different
records have different schemas  So how do I write this stream in s3 in
above partition format.


Thanks & Regards,
Anuj Jain



<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to