Hi Arvid,
Thanks for the quick response. I am new to this Avro design so can you
please help me understand and design for my use case.

I have use case like this :

1. we have an app where a lot of action happened from the user side.
2. for each action we collect some set of information that defined using
some key-value pairs. This information we want to define as proper schemas
so that we maintain the proper format and not push random data.
3. So we are defining for each action a schema and register in the schema
registry using  topic+record.name as the subject .
4. So I do not think the producer side has any issue as whenever we push
the event to Kafka we register a new schema with the above subject.

Example :

{
event_name : "a"
"timestamp":
"properties"  :[
  "key-1 : "val-1"
   "key-2 : "val-2"
]
}

{
event_name : "b"
"timestamp":
"properties"  :[
  "key-3 : "val-3"
   "key-4 : "val-4"
]
}

Now I  have a consumer that will parse the data by fetching the schema from
schema registry and deserialize in the generic record streams.

Why you think it will break as I am always deserializing with writer schema
only.

As you suggested to keep an envelope Avro schema and not separate schema
for each type of event that I am generating. I have some doubts about that:

1. How I enforce a schema on each event as it subtypes in the main schema.
so when I am getting a JSON event of type "a" how I enforce and convert it
to subschema type of "a" and push to Kafka.
2. I want to create a separate hive table for each of the events so when I
write this data and lets says I have 20 events than for 19 columns I am
getting null values always in data.

Please help me in doing this right way. It will be a great help and
learning for me.

Thanks,
Anuj







On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi Anuj,
>
> you should always avoid having records with different schemas in the same
> topic/dataset. You will break the compatibility features of the schema
> registry and your consumer/producer code is always hard to maintain.
>
> A common and scalable way to avoid it is to use some kind of envelope
> format.
>
> {
>   "namespace": "example",
>   "name": "Envelope",
>   "type": "record",
>   "fields": [
>     {
>       "name": "type1",
>       "type": ["null", {
>         "type": "record",
>         "fields": [ ... ]
>       }],
>       "default": null
>     },
>     {
>       "name": "type2",
>       "type": ["null", {
>         "type": "record",
>         "fields": [ ... ]
>       }],
>       "default": null
>     }
>   ]
> }
>
> This envelope is evolvable (arbitrary addition/removal of wrapped types,
> which by themselves can be evolved), and adds only a little overhead (1
> byte per subtype). The downside is that you cannot enforce that exactly one
> of the subtypes is set.
>
> This schema is fully compatible with the schema registry, so no need to
> parse anything manually.
>
> This schema can easily be used with Parquet. If you can't change the input
> format anymore, you can at least use that approach on your output.
>
> Best,
>
> Arvid
>
> On Thu, Jan 16, 2020 at 2:53 PM aj <ajainje...@gmail.com> wrote:
>
>> Hi All,
>>
>> I have a use case where I am getting a different set of Avro records in
>> Kafka. I am using the schema registry to store Avro schema. One topic can
>> also have different types of records.
>>
>> Now I have created a GenericRecord Stream using kafkaAvroDeseralizer by
>> defining custom
>> Deserializer class like this
>>
>> @Override
>> public GenericRecord deserialize(
>> byte[] messageKey, byte[] message, String topic, int partition, long
>> offset) {
>> checkInitialized();
>> return (GenericRecord) inner.deserialize(topic, message);
>> }
>>
>> private void checkInitialized() {
>> if (inner == null) {
>> Map<String, Object> props = new HashMap<>();
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> registryUrl);
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>> SchemaRegistryClient client =
>> new CachedSchemaRegistryClient(
>> registryUrl,
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>> inner = new KafkaAvroDeserializer(client, props);
>> }
>> }
>>
>>
>> And this is my consumer code :
>>
>> DataStreamSource<GenericRecord> input = env
>>         .addSource(
>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>                         new
>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>                         config).setStartFromEarliest());
>>
>> Now I want to write this stream partition on
>> *event_name="a"/year=/month=/day=* in parquet format so that I can
>> expose hive tables directly on top of this data.
>> event_name is common field for all types of records that I am getting in
>> Kafka.
>> I am stuck as parquet writer needs a schema to write but my different
>> records have different schemas  So how do I write this stream in s3 in
>> above partition format.
>>
>>
>> Thanks & Regards,
>> Anuj Jain
>>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to