Thanks, Arvid for all the clarification. I will work on the approach you suggested.
Thanks, Anuj On Sat, Jan 18, 2020 at 10:17 PM Arvid Heise <ar...@ververica.com> wrote: > Hi Anuj, > > I think that there may be a fundamental misunderstanding about the role of > a schema registry in Kafka. So let me first clarify that. > In each Avro/Parquet file, all records have the same schema. The schema is > stored within the file, such that we can always retrieve the writer schema > for the records. > When Avro was first applied to Kafka, there was the basic question on how > the writer schema for any record is known to the consumer. Storing the > complete schema on each record would mean that each record would be much > larger than needed. Hence, they added the schema registry that assigns a > unique id to schema, which is then embedded into the records. > Now, whenever I update a schema in my producer, I would have old records > with the old schema id and new records with the new schema id. > In my consumer, I'd use a fixed reader schema, such that my application > would not need to worry if the record is written with old or new schema; my > consumer would only see records with the reader schema. > > Given that background information, you see that in general, it's > impossible with a generic approach to write the parquet with the same > schema as it has been written in Kafka: the parquet schema needs to be > supplied statically during query compilation while the actual used Avro > schema in Kafka is only known when actually consuming data. > > But looking further down the road: > * since you need one schema to write the parquet files, you'd need to > decide: do you want to write with the new or the old schema in case of a > schema update? That should also be the reader schema of your application > for a given event type. > * this decision has further implications: your application need to extract > exactly one specific version of the schema from the schema registry at > query compilation. That could be either a specific schema id or the latest > schema for the event type. > * that means that the output schema is locked until you restart your > application and fetch a new latest schema in case of an update. > * at that point, it might just be easier to use the approach that I > outlined previously by bundling a specific schema with your application. > > If you want to extract the latest schema for a subject: > > var registryClient = new CachedSchemaRegistryClient(<schemaRepoUrls>, 1000); > var versions = registryClient.getAllVersions(<subject>); > var schema = registryClient.getSchemaById(versions.get(versions.size() - 1)); > > > On Sat, Jan 18, 2020 at 5:22 PM aj <ajainje...@gmail.com> wrote: > >> Thanks, Arvid. >> >> I do not fully understand the above approach, >> so currently, I am thinking to go with the envelope approach that you >> suggested. >> >> One more question I have if I do not want to keep schema in my consumer >> project even its a single envelope schema. I want it to be fetched from the >> schema registry and pass to my parquet-sink so that I always use the same >> schema that is used by the producer. Can you provide a sample code how can >> i infer the schema from the generic record or get it from schema registry? >> >> >> Regards, >> Anuj >> >> >> >> On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com> wrote: >> >>> (Readded user mailing list) >>> >>> Hi Anuj, >>> >>> since I'd still recommend going with distinct sources/sinks, let me try >>> to solve your issues in this mail. If that doesn't work out, I'd address >>> your concerns about the envelope format later. >>> >>> In Flink, you can have several subtopologies in the same application. >>> >>> Thus, for each event type, you can add >>> AvroSource(eventType) -> generic transformation/validation -> >>> AvroSink(eventType) >>> for each event. >>> >>> I'd put all avro schema in one project and use an avro plugin to >>> generate the respective Java Classes. Then I'd simply create a map of Avro >>> Schema (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name >>> (event-a, event-b, ...). >>> Next, I'd iterate over the list to add the respective subtopologies to >>> env. >>> Finally, execute everything. >>> >>> You have one project where all validations reside. But you'd have almost >>> no overhead to process a given source of eventType. The downside of that >>> approach is of course, that each new event type would require a >>> redeployment, but that seems like what you'd want to do anyhow. >>> >>> Best, >>> >>> Arvid >>> >>> On Sat, Jan 18, 2020 at 2:08 PM aj <ajainje...@gmail.com> wrote: >>> >>>> Thanks, Arvid. >>>> >>>> 1. I like your approach as I can write a single consumer and put the >>>> data in S3 in parquet format. The only challenge is there are extra columns >>>> that always going to be null as at a time I will get one type of event. >>>> >>>> 2. if I go with a separate schema I am not sure how I can solve it >>>> using a single generalize consumer. Till now what my understanding is I >>>> have to write a consumer for each type of event. Each consumer will read >>>> the whole data then filter the respective events from this and then I can >>>> pass this stream to sink. But this does not look scalable solution as the >>>> new events keep growing then I have to write a consumer for each new type. >>>> >>>> >>>> DataStreamSource<GenericRecord> input = env >>>> .addSource( >>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>> new >>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>> config).setStartFromEarliest()); >>>> >>>> Example : >>>> >>>> * 1st Consumer:* >>>> DataStreamSource<GenericRecord> input = env.addSource( >>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>> new >>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>> config).setStartFromEarliest()); >>>> * DataStream<GenericRecord> aInput = >>>> input.filter("event_name"= "a")* >>>> >>>> * 2nd Consumer:* >>>> DataStreamSource<GenericRecord> input = env.addSource( >>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>> new >>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>> config).setStartFromEarliest()); >>>> * DataStream<GenericRecord> bInput = >>>> input.filter("event_name"= "b")* >>>> >>>> >>>> Can you help me How I solve this using a single consumer as I do not >>>> want to write a separate consumer for each type of schema? >>>> >>>> For example, this is my consumer that contains different types of >>>> records. >>>> >>>> DataStreamSource<GenericRecord> input = env >>>> .addSource( >>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>> new >>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>> config).setStartFromEarliest()); >>>> >>>> Now I can not write this stream directly as there is no common schema >>>> of records in this stream. So possible way I am thinking is >>>> >>>> 1. Can I create multiple streams from this stream using the key by on >>>> *"event_name" >>>> *and then write each stream separately. >>>> >>>> Just wanna know is this possible ?? >>>> >>>> >>>> Thanks, >>>> Anuj >>>> >>>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com> >>>> wrote: >>>> >>>>> Hi Anuj, >>>>> >>>>> I originally understood that you would like to store data in the same >>>>> Kafka topic and also want to store it in the same parquet file. In the >>>>> past, I mostly used schema registry with Kafka Streams, where you could >>>>> only store a schema for a key and value respectively. To use different >>>>> record types in the same kafka topic, you had to disable schema >>>>> compatibility checks and just stored the schemas as different versions >>>>> under the same subject. >>>>> >>>>> Your approach is much better. You can ensure full schema >>>>> compatibility. Nevertheless, it still shares the same drawback that >>>>> consumption is much harder (using GenericRecord is proof of that) if you >>>>> want to read/write everything into the same place. Also you will never be >>>>> able to write one consistent file, as they can only have one schema (both >>>>> on Avro and Parquet). >>>>> So you only have two options: >>>>> * keep schemas separated, but then you also need to write separate >>>>> files per record type. >>>>> * have a common schema (either my outlined approach or any other >>>>> wrapper schema). >>>>> The approach with a common schema makes only sense if you want to >>>>> write it into one table/kafka topic. >>>>> >>>>> However, in the last mail you pointed out that you actually want to >>>>> store the record types separately. Then, you should keep everything >>>>> separated. Then you should have a sink for each type each getting the >>>>> respective schema. Note that you'd need to fetch the schema manually from >>>>> the schema registry when creating the query as you would need to pass it >>>>> to >>>>> the sink. >>>>> >>>>> Btw, do you actually have a need to write all events into one Kafka >>>>> topic? The only real use case is to preserve the time order per key. >>>>> Everything else is much more complicated then storing events individually. >>>>> >>>>> Best, >>>>> >>>>> Arvid >>>>> >>>>> >>>>> On Thu, Jan 16, 2020 at 3:39 PM aj <ajainje...@gmail.com> wrote: >>>>> >>>>>> Hi Arvid, >>>>>> Thanks for the quick response. I am new to this Avro design so can >>>>>> you please help me understand and design for my use case. >>>>>> >>>>>> I have use case like this : >>>>>> >>>>>> 1. we have an app where a lot of action happened from the user side. >>>>>> 2. for each action we collect some set of information that >>>>>> defined using some key-value pairs. This information we want to define as >>>>>> proper schemas so that we maintain the proper format and not push random >>>>>> data. >>>>>> 3. So we are defining for each action a schema and register in the >>>>>> schema registry using topic+record.name as the subject . >>>>>> 4. So I do not think the producer side has any issue as whenever we >>>>>> push the event to Kafka we register a new schema with the above subject. >>>>>> >>>>>> Example : >>>>>> >>>>>> { >>>>>> event_name : "a" >>>>>> "timestamp": >>>>>> "properties" :[ >>>>>> "key-1 : "val-1" >>>>>> "key-2 : "val-2" >>>>>> ] >>>>>> } >>>>>> >>>>>> { >>>>>> event_name : "b" >>>>>> "timestamp": >>>>>> "properties" :[ >>>>>> "key-3 : "val-3" >>>>>> "key-4 : "val-4" >>>>>> ] >>>>>> } >>>>>> >>>>>> Now I have a consumer that will parse the data by fetching the >>>>>> schema from schema registry and deserialize in the generic record >>>>>> streams. >>>>>> >>>>>> Why you think it will break as I am always deserializing with writer >>>>>> schema only. >>>>>> >>>>>> As you suggested to keep an envelope Avro schema and not separate >>>>>> schema for each type of event that I am generating. I have some doubts >>>>>> about that: >>>>>> >>>>>> 1. How I enforce a schema on each event as it subtypes in the main >>>>>> schema. so when I am getting a JSON event of type "a" how I enforce and >>>>>> convert it to subschema type of "a" and push to Kafka. >>>>>> 2. I want to create a separate hive table for each of the events so >>>>>> when I write this data and lets says I have 20 events than for 19 >>>>>> columns I >>>>>> am getting null values always in data. >>>>>> >>>>>> Please help me in doing this right way. It will be a great help and >>>>>> learning for me. >>>>>> >>>>>> Thanks, >>>>>> Anuj >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Anuj, >>>>>>> >>>>>>> you should always avoid having records with different schemas in the >>>>>>> same topic/dataset. You will break the compatibility features of the >>>>>>> schema >>>>>>> registry and your consumer/producer code is always hard to maintain. >>>>>>> >>>>>>> A common and scalable way to avoid it is to use some kind of >>>>>>> envelope format. >>>>>>> >>>>>>> { >>>>>>> "namespace": "example", >>>>>>> "name": "Envelope", >>>>>>> "type": "record", >>>>>>> "fields": [ >>>>>>> { >>>>>>> "name": "type1", >>>>>>> "type": ["null", { >>>>>>> "type": "record", >>>>>>> "fields": [ ... ] >>>>>>> }], >>>>>>> "default": null >>>>>>> }, >>>>>>> { >>>>>>> "name": "type2", >>>>>>> "type": ["null", { >>>>>>> "type": "record", >>>>>>> "fields": [ ... ] >>>>>>> }], >>>>>>> "default": null >>>>>>> } >>>>>>> ] >>>>>>> } >>>>>>> >>>>>>> This envelope is evolvable (arbitrary addition/removal of wrapped >>>>>>> types, which by themselves can be evolved), and adds only a little >>>>>>> overhead >>>>>>> (1 byte per subtype). The downside is that you cannot enforce that >>>>>>> exactly >>>>>>> one of the subtypes is set. >>>>>>> >>>>>>> This schema is fully compatible with the schema registry, so no need >>>>>>> to parse anything manually. >>>>>>> >>>>>>> This schema can easily be used with Parquet. If you can't change the >>>>>>> input format anymore, you can at least use that approach on your output. >>>>>>> >>>>>>> Best, >>>>>>> >>>>>>> Arvid >>>>>>> >>>>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <ajainje...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi All, >>>>>>>> >>>>>>>> I have a use case where I am getting a different set of Avro >>>>>>>> records in Kafka. I am using the schema registry to store Avro schema. >>>>>>>> One >>>>>>>> topic can also have different types of records. >>>>>>>> >>>>>>>> Now I have created a GenericRecord Stream using >>>>>>>> kafkaAvroDeseralizer by defining custom >>>>>>>> Deserializer class like this >>>>>>>> >>>>>>>> @Override >>>>>>>> public GenericRecord deserialize( >>>>>>>> byte[] messageKey, byte[] message, String topic, int partition, >>>>>>>> long offset) { >>>>>>>> checkInitialized(); >>>>>>>> return (GenericRecord) inner.deserialize(topic, message); >>>>>>>> } >>>>>>>> >>>>>>>> private void checkInitialized() { >>>>>>>> if (inner == null) { >>>>>>>> Map<String, Object> props = new HashMap<>(); >>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, >>>>>>>> registryUrl); >>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, >>>>>>>> false); >>>>>>>> SchemaRegistryClient client = >>>>>>>> new CachedSchemaRegistryClient( >>>>>>>> registryUrl, >>>>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); >>>>>>>> inner = new KafkaAvroDeserializer(client, props); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> And this is my consumer code : >>>>>>>> >>>>>>>> DataStreamSource<GenericRecord> input = env >>>>>>>> .addSource( >>>>>>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>>>>>> new >>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>>>>> config).setStartFromEarliest()); >>>>>>>> >>>>>>>> Now I want to write this stream partition on >>>>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I can >>>>>>>> expose hive tables directly on top of this data. >>>>>>>> event_name is common field for all types of records that I am >>>>>>>> getting in Kafka. >>>>>>>> I am stuck as parquet writer needs a schema to write but my >>>>>>>> different records have different schemas So how do I write this >>>>>>>> stream in >>>>>>>> s3 in above partition format. >>>>>>>> >>>>>>>> >>>>>>>> Thanks & Regards, >>>>>>>> Anuj Jain >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Thanks & Regards, >>>>>> Anuj Jain >>>>>> Mob. : +91- 8588817877 >>>>>> Skype : anuj.jain07 >>>>>> <http://www.oracle.com/> >>>>>> >>>>>> >>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>>> >>>>> >>>> >>>> -- >>>> Thanks & Regards, >>>> Anuj Jain >>>> Mob. : +91- 8588817877 >>>> Skype : anuj.jain07 >>>> <http://www.oracle.com/> >>>> >>>> >>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>> >>> >> >> -- >> Thanks & Regards, >> Anuj Jain >> Mob. : +91- 8588817877 >> Skype : anuj.jain07 >> <http://www.oracle.com/> >> >> >> <http://www.cse.iitm.ac.in/%7Eanujjain/> >> > -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>