Thanks, Arvid. I do not fully understand the above approach, so currently, I am thinking to go with the envelope approach that you suggested.
One more question I have if I do not want to keep schema in my consumer project even its a single envelope schema. I want it to be fetched from the schema registry and pass to my parquet-sink so that I always use the same schema that is used by the producer. Can you provide a sample code how can i infer the schema from the generic record or get it from schema registry? Regards, Anuj On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com> wrote: > (Readded user mailing list) > > Hi Anuj, > > since I'd still recommend going with distinct sources/sinks, let me try to > solve your issues in this mail. If that doesn't work out, I'd address your > concerns about the envelope format later. > > In Flink, you can have several subtopologies in the same application. > > Thus, for each event type, you can add > AvroSource(eventType) -> generic transformation/validation -> > AvroSink(eventType) > for each event. > > I'd put all avro schema in one project and use an avro plugin to generate > the respective Java Classes. Then I'd simply create a map of Avro Schema > (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name > (event-a, event-b, ...). > Next, I'd iterate over the list to add the respective subtopologies to env. > Finally, execute everything. > > You have one project where all validations reside. But you'd have almost > no overhead to process a given source of eventType. The downside of that > approach is of course, that each new event type would require a > redeployment, but that seems like what you'd want to do anyhow. > > Best, > > Arvid > > On Sat, Jan 18, 2020 at 2:08 PM aj <ajainje...@gmail.com> wrote: > >> Thanks, Arvid. >> >> 1. I like your approach as I can write a single consumer and put the data >> in S3 in parquet format. The only challenge is there are extra columns that >> always going to be null as at a time I will get one type of event. >> >> 2. if I go with a separate schema I am not sure how I can solve it using >> a single generalize consumer. Till now what my understanding is I have to >> write a consumer for each type of event. Each consumer will read the whole >> data then filter the respective events from this and then I can pass this >> stream to sink. But this does not look scalable solution as the new events >> keep growing then I have to write a consumer for each new type. >> >> >> DataStreamSource<GenericRecord> input = env >> .addSource( >> new FlinkKafkaConsumer010<GenericRecord>(topics, >> new >> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >> config).setStartFromEarliest()); >> >> Example : >> >> * 1st Consumer:* >> DataStreamSource<GenericRecord> input = env.addSource( >> new FlinkKafkaConsumer010<GenericRecord>(topics, >> new >> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >> config).setStartFromEarliest()); >> * DataStream<GenericRecord> aInput = >> input.filter("event_name"= "a")* >> >> * 2nd Consumer:* >> DataStreamSource<GenericRecord> input = env.addSource( >> new FlinkKafkaConsumer010<GenericRecord>(topics, >> new >> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >> config).setStartFromEarliest()); >> * DataStream<GenericRecord> bInput = >> input.filter("event_name"= "b")* >> >> >> Can you help me How I solve this using a single consumer as I do not want >> to write a separate consumer for each type of schema? >> >> For example, this is my consumer that contains different types of records. >> >> DataStreamSource<GenericRecord> input = env >> .addSource( >> new FlinkKafkaConsumer010<GenericRecord>(topics, >> new >> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >> config).setStartFromEarliest()); >> >> Now I can not write this stream directly as there is no common schema of >> records in this stream. So possible way I am thinking is >> >> 1. Can I create multiple streams from this stream using the key by on >> *"event_name" >> *and then write each stream separately. >> >> Just wanna know is this possible ?? >> >> >> Thanks, >> Anuj >> >> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com> wrote: >> >>> Hi Anuj, >>> >>> I originally understood that you would like to store data in the same >>> Kafka topic and also want to store it in the same parquet file. In the >>> past, I mostly used schema registry with Kafka Streams, where you could >>> only store a schema for a key and value respectively. To use different >>> record types in the same kafka topic, you had to disable schema >>> compatibility checks and just stored the schemas as different versions >>> under the same subject. >>> >>> Your approach is much better. You can ensure full schema compatibility. >>> Nevertheless, it still shares the same drawback that consumption is much >>> harder (using GenericRecord is proof of that) if you want to read/write >>> everything into the same place. Also you will never be able to write one >>> consistent file, as they can only have one schema (both on Avro and >>> Parquet). >>> So you only have two options: >>> * keep schemas separated, but then you also need to write separate files >>> per record type. >>> * have a common schema (either my outlined approach or any other wrapper >>> schema). >>> The approach with a common schema makes only sense if you want to write >>> it into one table/kafka topic. >>> >>> However, in the last mail you pointed out that you actually want to >>> store the record types separately. Then, you should keep everything >>> separated. Then you should have a sink for each type each getting the >>> respective schema. Note that you'd need to fetch the schema manually from >>> the schema registry when creating the query as you would need to pass it to >>> the sink. >>> >>> Btw, do you actually have a need to write all events into one Kafka >>> topic? The only real use case is to preserve the time order per key. >>> Everything else is much more complicated then storing events individually. >>> >>> Best, >>> >>> Arvid >>> >>> >>> On Thu, Jan 16, 2020 at 3:39 PM aj <ajainje...@gmail.com> wrote: >>> >>>> Hi Arvid, >>>> Thanks for the quick response. I am new to this Avro design so can you >>>> please help me understand and design for my use case. >>>> >>>> I have use case like this : >>>> >>>> 1. we have an app where a lot of action happened from the user side. >>>> 2. for each action we collect some set of information that >>>> defined using some key-value pairs. This information we want to define as >>>> proper schemas so that we maintain the proper format and not push random >>>> data. >>>> 3. So we are defining for each action a schema and register in the >>>> schema registry using topic+record.name as the subject . >>>> 4. So I do not think the producer side has any issue as whenever we >>>> push the event to Kafka we register a new schema with the above subject. >>>> >>>> Example : >>>> >>>> { >>>> event_name : "a" >>>> "timestamp": >>>> "properties" :[ >>>> "key-1 : "val-1" >>>> "key-2 : "val-2" >>>> ] >>>> } >>>> >>>> { >>>> event_name : "b" >>>> "timestamp": >>>> "properties" :[ >>>> "key-3 : "val-3" >>>> "key-4 : "val-4" >>>> ] >>>> } >>>> >>>> Now I have a consumer that will parse the data by fetching the schema >>>> from schema registry and deserialize in the generic record streams. >>>> >>>> Why you think it will break as I am always deserializing with writer >>>> schema only. >>>> >>>> As you suggested to keep an envelope Avro schema and not separate >>>> schema for each type of event that I am generating. I have some doubts >>>> about that: >>>> >>>> 1. How I enforce a schema on each event as it subtypes in the main >>>> schema. so when I am getting a JSON event of type "a" how I enforce and >>>> convert it to subschema type of "a" and push to Kafka. >>>> 2. I want to create a separate hive table for each of the events so >>>> when I write this data and lets says I have 20 events than for 19 columns I >>>> am getting null values always in data. >>>> >>>> Please help me in doing this right way. It will be a great help and >>>> learning for me. >>>> >>>> Thanks, >>>> Anuj >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com> >>>> wrote: >>>> >>>>> Hi Anuj, >>>>> >>>>> you should always avoid having records with different schemas in the >>>>> same topic/dataset. You will break the compatibility features of the >>>>> schema >>>>> registry and your consumer/producer code is always hard to maintain. >>>>> >>>>> A common and scalable way to avoid it is to use some kind of envelope >>>>> format. >>>>> >>>>> { >>>>> "namespace": "example", >>>>> "name": "Envelope", >>>>> "type": "record", >>>>> "fields": [ >>>>> { >>>>> "name": "type1", >>>>> "type": ["null", { >>>>> "type": "record", >>>>> "fields": [ ... ] >>>>> }], >>>>> "default": null >>>>> }, >>>>> { >>>>> "name": "type2", >>>>> "type": ["null", { >>>>> "type": "record", >>>>> "fields": [ ... ] >>>>> }], >>>>> "default": null >>>>> } >>>>> ] >>>>> } >>>>> >>>>> This envelope is evolvable (arbitrary addition/removal of wrapped >>>>> types, which by themselves can be evolved), and adds only a little >>>>> overhead >>>>> (1 byte per subtype). The downside is that you cannot enforce that exactly >>>>> one of the subtypes is set. >>>>> >>>>> This schema is fully compatible with the schema registry, so no need >>>>> to parse anything manually. >>>>> >>>>> This schema can easily be used with Parquet. If you can't change the >>>>> input format anymore, you can at least use that approach on your output. >>>>> >>>>> Best, >>>>> >>>>> Arvid >>>>> >>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <ajainje...@gmail.com> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> I have a use case where I am getting a different set of Avro records >>>>>> in Kafka. I am using the schema registry to store Avro schema. One topic >>>>>> can also have different types of records. >>>>>> >>>>>> Now I have created a GenericRecord Stream using kafkaAvroDeseralizer >>>>>> by defining custom >>>>>> Deserializer class like this >>>>>> >>>>>> @Override >>>>>> public GenericRecord deserialize( >>>>>> byte[] messageKey, byte[] message, String topic, int partition, long >>>>>> offset) { >>>>>> checkInitialized(); >>>>>> return (GenericRecord) inner.deserialize(topic, message); >>>>>> } >>>>>> >>>>>> private void checkInitialized() { >>>>>> if (inner == null) { >>>>>> Map<String, Object> props = new HashMap<>(); >>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, >>>>>> registryUrl); >>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, >>>>>> false); >>>>>> SchemaRegistryClient client = >>>>>> new CachedSchemaRegistryClient( >>>>>> registryUrl, >>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); >>>>>> inner = new KafkaAvroDeserializer(client, props); >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> And this is my consumer code : >>>>>> >>>>>> DataStreamSource<GenericRecord> input = env >>>>>> .addSource( >>>>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>>>> new >>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>>> config).setStartFromEarliest()); >>>>>> >>>>>> Now I want to write this stream partition on >>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I can >>>>>> expose hive tables directly on top of this data. >>>>>> event_name is common field for all types of records that I am getting >>>>>> in Kafka. >>>>>> I am stuck as parquet writer needs a schema to write but my different >>>>>> records have different schemas So how do I write this stream in s3 in >>>>>> above partition format. >>>>>> >>>>>> >>>>>> Thanks & Regards, >>>>>> Anuj Jain >>>>>> >>>>>> >>>>>> >>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>>> >>>>> >>>> >>>> -- >>>> Thanks & Regards, >>>> Anuj Jain >>>> Mob. : +91- 8588817877 >>>> Skype : anuj.jain07 >>>> <http://www.oracle.com/> >>>> >>>> >>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>> >>> >> >> -- >> Thanks & Regards, >> Anuj Jain >> Mob. : +91- 8588817877 >> Skype : anuj.jain07 >> <http://www.oracle.com/> >> >> >> <http://www.cse.iitm.ac.in/%7Eanujjain/> >> > -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>