Thanks, Arvid for all the clarification. I will work on the approach you
suggested.

Thanks,
Anuj

On Sat, Jan 18, 2020 at 10:17 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi Anuj,
>
> I think that there may be a fundamental misunderstanding about the role of
> a schema registry in Kafka. So let me first clarify that.
> In each Avro/Parquet file, all records have the same schema. The schema is
> stored within the file, such that we can always retrieve the writer schema
> for the records.
> When Avro was first applied to Kafka, there was the basic question on how
> the writer schema for any record is known to the consumer. Storing the
> complete schema on each record would mean that each record would be much
> larger than needed. Hence, they added the schema registry that assigns a
> unique id to schema, which is then embedded into the records.
> Now, whenever I update a schema in my producer, I would have old records
> with the old schema id and new records with the new schema id.
> In my consumer, I'd use a fixed reader schema, such that my application
> would not need to worry if the record is written with old or new schema; my
> consumer would only see records with the reader schema.
>
> Given that background information, you see that in general, it's
> impossible with a generic approach to write the parquet with the same
> schema as it has been written in Kafka: the parquet schema needs to be
> supplied statically during query compilation while the actual used Avro
> schema in Kafka is only known when actually consuming data.
>
> But looking further down the road:
> * since you need one schema to write the parquet files, you'd need to
> decide: do you want to write with the new or the old schema in case of a
> schema update? That should also be the reader schema of your application
> for a given event type.
> * this decision has further implications: your application need to extract
> exactly one specific version of the schema from the schema registry at
> query compilation. That could be either a specific schema id or the latest
> schema for the event type.
> * that means that the output schema is locked until you restart your
> application and fetch a new latest schema in case of an update.
> * at that point, it might just be easier to use the approach that I
> outlined previously by bundling a specific schema with your application.
>
> If you want to extract the latest schema for a subject:
>
> var registryClient = new CachedSchemaRegistryClient(<schemaRepoUrls>, 1000);
> var versions = registryClient.getAllVersions(<subject>);
> var schema = registryClient.getSchemaById(versions.get(versions.size() - 1));
>
>
> On Sat, Jan 18, 2020 at 5:22 PM aj <ajainje...@gmail.com> wrote:
>
>> Thanks, Arvid.
>>
>> I do not fully understand the above approach,
>> so currently, I am thinking to go with the envelope approach that you
>> suggested.
>>
>> One more question I have if I do not want to keep schema in my consumer
>> project even its a single envelope schema. I want it to be fetched from the
>> schema registry and pass to my parquet-sink so that I always use the same
>> schema that is used by the producer.  Can you provide a sample code how can
>> i infer the schema from the generic record or get it from schema registry?
>>
>>
>> Regards,
>> Anuj
>>
>>
>>
>> On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> (Readded user mailing list)
>>>
>>> Hi Anuj,
>>>
>>> since I'd still recommend going with distinct sources/sinks, let me try
>>> to solve your issues in this mail. If that doesn't work out, I'd address
>>> your concerns about the envelope format later.
>>>
>>> In Flink, you can have several subtopologies in the same application.
>>>
>>> Thus, for each event type, you can add
>>> AvroSource(eventType) -> generic transformation/validation ->
>>> AvroSink(eventType)
>>> for each event.
>>>
>>> I'd put all avro schema in one project and use an avro plugin to
>>> generate the respective Java Classes. Then I'd simply create a map of Avro
>>> Schema (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name
>>> (event-a, event-b, ...).
>>> Next, I'd iterate over the list to add the respective subtopologies to
>>> env.
>>> Finally, execute everything.
>>>
>>> You have one project where all validations reside. But you'd have almost
>>> no overhead to process a given source of eventType. The downside of that
>>> approach is of course, that each new event type would require a
>>> redeployment, but that seems like what you'd want to do anyhow.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> On Sat, Jan 18, 2020 at 2:08 PM aj <ajainje...@gmail.com> wrote:
>>>
>>>> Thanks, Arvid.
>>>>
>>>> 1. I like your approach as I can write a single consumer and put the
>>>> data in S3 in parquet format. The only challenge is there are extra columns
>>>> that always going to be null as at a time I will get one type of event.
>>>>
>>>> 2. if I go with a separate schema I am not sure how I can solve it
>>>> using a single generalize consumer. Till now what my understanding is I
>>>> have to write a consumer for each type of event. Each consumer will read
>>>> the whole data then filter the respective events from this and then I can
>>>> pass this stream to sink. But this does not look scalable solution as the
>>>> new events keep growing then I have to write a consumer for each new type.
>>>>
>>>>
>>>> DataStreamSource<GenericRecord> input = env
>>>>         .addSource(
>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>                         new
>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>                         config).setStartFromEarliest());
>>>>
>>>> Example :
>>>>
>>>> * 1st Consumer:*
>>>>                   DataStreamSource<GenericRecord> input = env.addSource(
>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>                         new
>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>                         config).setStartFromEarliest());
>>>> *                 DataStream<GenericRecord> aInput =
>>>> input.filter("event_name"= "a")*
>>>>
>>>> * 2nd Consumer:*
>>>>   DataStreamSource<GenericRecord> input = env.addSource(
>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>                         new
>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>                         config).setStartFromEarliest());
>>>> *                 DataStream<GenericRecord> bInput =
>>>> input.filter("event_name"= "b")*
>>>>
>>>>
>>>> Can you help me How I solve this using a single consumer as I do not
>>>> want to write a separate consumer for each type of schema?
>>>>
>>>> For example, this is my consumer that contains different types of
>>>> records.
>>>>
>>>> DataStreamSource<GenericRecord> input = env
>>>>         .addSource(
>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>                         new
>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>                         config).setStartFromEarliest());
>>>>
>>>> Now I can not write this stream directly as there is no common schema
>>>> of records in this stream. So possible way I am thinking is
>>>>
>>>> 1. Can I create multiple streams from this stream using the key by on 
>>>> *"event_name"
>>>> *and then write each stream separately.
>>>>
>>>> Just wanna know is this possible ??
>>>>
>>>>
>>>> Thanks,
>>>> Anuj
>>>>
>>>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Anuj,
>>>>>
>>>>> I originally understood that you would like to store data in the same
>>>>> Kafka topic and also want to store it in the same parquet file. In the
>>>>> past, I mostly used schema registry with Kafka Streams, where you could
>>>>> only store a schema for a key and value respectively. To use different
>>>>> record types in the same kafka topic, you had to disable schema
>>>>> compatibility checks and just stored the schemas as different versions
>>>>> under the same subject.
>>>>>
>>>>> Your approach is much better. You can ensure full schema
>>>>> compatibility. Nevertheless, it still shares the same drawback that
>>>>> consumption is much harder (using GenericRecord is proof of that) if you
>>>>> want to read/write everything into the same place. Also you will never be
>>>>> able to write one consistent file, as they can only have one schema (both
>>>>> on Avro and Parquet).
>>>>> So you only have two options:
>>>>> * keep schemas separated, but then you also need to write separate
>>>>> files per record type.
>>>>> * have a common schema (either my outlined approach or any other
>>>>> wrapper schema).
>>>>> The approach with a common schema makes only sense if you want to
>>>>> write it into one table/kafka topic.
>>>>>
>>>>> However, in the last mail you pointed out that you actually want to
>>>>> store the record types separately. Then, you should keep everything
>>>>> separated. Then you should have a sink for each type each getting the
>>>>> respective schema. Note that you'd need to fetch the schema manually from
>>>>> the schema registry when creating the query as you would need to pass it 
>>>>> to
>>>>> the sink.
>>>>>
>>>>> Btw, do you actually have a need to write all events into one Kafka
>>>>> topic? The only real use case is to preserve the time order per key.
>>>>> Everything else is much more complicated then storing events individually.
>>>>>
>>>>> Best,
>>>>>
>>>>> Arvid
>>>>>
>>>>>
>>>>> On Thu, Jan 16, 2020 at 3:39 PM aj <ajainje...@gmail.com> wrote:
>>>>>
>>>>>> Hi Arvid,
>>>>>> Thanks for the quick response. I am new to this Avro design so can
>>>>>> you please help me understand and design for my use case.
>>>>>>
>>>>>> I have use case like this :
>>>>>>
>>>>>> 1. we have an app where a lot of action happened from the user side.
>>>>>> 2. for each action we collect some set of information that
>>>>>> defined using some key-value pairs. This information we want to define as
>>>>>> proper schemas so that we maintain the proper format and not push random
>>>>>> data.
>>>>>> 3. So we are defining for each action a schema and register in the
>>>>>> schema registry using  topic+record.name as the subject .
>>>>>> 4. So I do not think the producer side has any issue as whenever we
>>>>>> push the event to Kafka we register a new schema with the above subject.
>>>>>>
>>>>>> Example :
>>>>>>
>>>>>> {
>>>>>> event_name : "a"
>>>>>> "timestamp":
>>>>>> "properties"  :[
>>>>>>   "key-1 : "val-1"
>>>>>>    "key-2 : "val-2"
>>>>>> ]
>>>>>> }
>>>>>>
>>>>>> {
>>>>>> event_name : "b"
>>>>>> "timestamp":
>>>>>> "properties"  :[
>>>>>>   "key-3 : "val-3"
>>>>>>    "key-4 : "val-4"
>>>>>> ]
>>>>>> }
>>>>>>
>>>>>> Now I  have a consumer that will parse the data by fetching the
>>>>>> schema from schema registry and deserialize in the generic record 
>>>>>> streams.
>>>>>>
>>>>>> Why you think it will break as I am always deserializing with writer
>>>>>> schema only.
>>>>>>
>>>>>> As you suggested to keep an envelope Avro schema and not separate
>>>>>> schema for each type of event that I am generating. I have some doubts
>>>>>> about that:
>>>>>>
>>>>>> 1. How I enforce a schema on each event as it subtypes in the main
>>>>>> schema. so when I am getting a JSON event of type "a" how I enforce and
>>>>>> convert it to subschema type of "a" and push to Kafka.
>>>>>> 2. I want to create a separate hive table for each of the events so
>>>>>> when I write this data and lets says I have 20 events than for 19 
>>>>>> columns I
>>>>>> am getting null values always in data.
>>>>>>
>>>>>> Please help me in doing this right way. It will be a great help and
>>>>>> learning for me.
>>>>>>
>>>>>> Thanks,
>>>>>> Anuj
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Anuj,
>>>>>>>
>>>>>>> you should always avoid having records with different schemas in the
>>>>>>> same topic/dataset. You will break the compatibility features of the 
>>>>>>> schema
>>>>>>> registry and your consumer/producer code is always hard to maintain.
>>>>>>>
>>>>>>> A common and scalable way to avoid it is to use some kind of
>>>>>>> envelope format.
>>>>>>>
>>>>>>> {
>>>>>>>   "namespace": "example",
>>>>>>>   "name": "Envelope",
>>>>>>>   "type": "record",
>>>>>>>   "fields": [
>>>>>>>     {
>>>>>>>       "name": "type1",
>>>>>>>       "type": ["null", {
>>>>>>>         "type": "record",
>>>>>>>         "fields": [ ... ]
>>>>>>>       }],
>>>>>>>       "default": null
>>>>>>>     },
>>>>>>>     {
>>>>>>>       "name": "type2",
>>>>>>>       "type": ["null", {
>>>>>>>         "type": "record",
>>>>>>>         "fields": [ ... ]
>>>>>>>       }],
>>>>>>>       "default": null
>>>>>>>     }
>>>>>>>   ]
>>>>>>> }
>>>>>>>
>>>>>>> This envelope is evolvable (arbitrary addition/removal of wrapped
>>>>>>> types, which by themselves can be evolved), and adds only a little 
>>>>>>> overhead
>>>>>>> (1 byte per subtype). The downside is that you cannot enforce that 
>>>>>>> exactly
>>>>>>> one of the subtypes is set.
>>>>>>>
>>>>>>> This schema is fully compatible with the schema registry, so no need
>>>>>>> to parse anything manually.
>>>>>>>
>>>>>>> This schema can easily be used with Parquet. If you can't change the
>>>>>>> input format anymore, you can at least use that approach on your output.
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Arvid
>>>>>>>
>>>>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <ajainje...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I have a use case where I am getting a different set of Avro
>>>>>>>> records in Kafka. I am using the schema registry to store Avro schema. 
>>>>>>>> One
>>>>>>>> topic can also have different types of records.
>>>>>>>>
>>>>>>>> Now I have created a GenericRecord Stream using
>>>>>>>> kafkaAvroDeseralizer by defining custom
>>>>>>>> Deserializer class like this
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public GenericRecord deserialize(
>>>>>>>> byte[] messageKey, byte[] message, String topic, int partition,
>>>>>>>> long offset) {
>>>>>>>> checkInitialized();
>>>>>>>> return (GenericRecord) inner.deserialize(topic, message);
>>>>>>>> }
>>>>>>>>
>>>>>>>> private void checkInitialized() {
>>>>>>>> if (inner == null) {
>>>>>>>> Map<String, Object> props = new HashMap<>();
>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>>>>>> registryUrl);
>>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>>>>>> false);
>>>>>>>> SchemaRegistryClient client =
>>>>>>>> new CachedSchemaRegistryClient(
>>>>>>>> registryUrl,
>>>>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>>>>>> inner = new KafkaAvroDeserializer(client, props);
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> And this is my consumer code :
>>>>>>>>
>>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>>         .addSource(
>>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>                         new
>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>>
>>>>>>>> Now I want to write this stream partition on
>>>>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I can
>>>>>>>> expose hive tables directly on top of this data.
>>>>>>>> event_name is common field for all types of records that I am
>>>>>>>> getting in Kafka.
>>>>>>>> I am stuck as parquet writer needs a schema to write but my
>>>>>>>> different records have different schemas  So how do I write this 
>>>>>>>> stream in
>>>>>>>> s3 in above partition format.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks & Regards,
>>>>>>>> Anuj Jain
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks & Regards,
>>>>>> Anuj Jain
>>>>>> Mob. : +91- 8588817877
>>>>>> Skype : anuj.jain07
>>>>>> <http://www.oracle.com/>
>>>>>>
>>>>>>
>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Thanks & Regards,
>>>> Anuj Jain
>>>> Mob. : +91- 8588817877
>>>> Skype : anuj.jain07
>>>> <http://www.oracle.com/>
>>>>
>>>>
>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>
>>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to