Hi All,

I have a use case where I am getting a different set of Avro records in
Kafka. I am using the schema registry to store Avro schema. One topic can
also have different types of records.

Now I have created a GenericRecord Stream using kafkaAvroDeseralizer by
defining custom
Deserializer class like this

public GenericRecord deserialize(
byte[] messageKey, byte[] message, String topic, int partition, long
offset) {
return (GenericRecord) inner.deserialize(topic, message);

private void checkInitialized() {
if (inner == null) {
Map<String, Object> props = new HashMap<>();
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
inner = new KafkaAvroDeserializer(client, props);

And this is my consumer code :

DataStreamSource<GenericRecord> input = env
                new FlinkKafkaConsumer010<GenericRecord>(topics,

Now I want to write this stream partition on
*event_name="a"/year=/month=/day=* in parquet format so that I can expose
hive tables directly on top of this data.
event_name is common field for all types of records that I am getting in
I am stuck as parquet writer needs a schema to write but my different
records have different schemas  So how do I write this stream in s3 in
above partition format.

Thanks & Regards,
Anuj Jain


Reply via email to