Hi Arvid, I want to keep generic records only and I do not want to keep the schema definition on the consumer side and should be resolve from the schema registry only. I am following the below post
https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without/59865360#59865360 so please help me what is wrong with my code. On Thu, Jan 23, 2020, 00:38 Arvid Heise <ar...@ververica.com> wrote: > Hi Anuj, > > I recommend using the ConfluentRegistryAvroDeserializationSchema [1] with > a specific record that has been generated with the Avro Maven Plugin [2] or > Avro Gradle Plugin [3]. That should result into almost no code and maximal > maintainability. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema > [2] https://avro.apache.org/docs/1.8.2/gettingstartedjava.html > [3] https://github.com/davidmc24/gradle-avro-plugin > > On Wed, Jan 22, 2020 at 6:43 PM aj <ajainje...@gmail.com> wrote: > >> Hi Arvid, >> >> I have implemented the code with envelope schema as you suggested but now >> I am facing issues with the consumer . I have written code like this: >> >> FlinkKafkaConsumer010 kafkaConsumer010 = new >> FlinkKafkaConsumer010(KAFKA_TOPICS, >> new >> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >> properties); >> >> And the Deserialization class looks like this : >> >> pblic class KafkaGenericAvroDeserializationSchema implements >> KeyedDeserializationSchema<GenericRecord> { >> >> private final String registryUrl; >> private transient KafkaAvroDeserializer inner; >> >> public KafkaGenericAvroDeserializationSchema(String registryUrl) { >> this.registryUrl = registryUrl; >> } >> >> @Override >> public GenericRecord deserialize(byte[] messageKey, byte[] message, >> String topic, int partition, long offset) { >> checkInitialized(); >> return (GenericRecord) inner.deserialize(topic, message); >> } >> >> @Override >> public boolean isEndOfStream(GenericRecord nextElement) { >> return false; >> } >> >> @Override >> public TypeInformation<GenericRecord> getProducedType() { >> return TypeExtractor.getForClass(GenericRecord.class); >> } >> >> private void checkInitialized() { >> if (inner == null) { >> Map<String, Object> props = new HashMap<>(); >> >> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, >> registryUrl); >> >> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false); >> SchemaRegistryClient client = >> new CachedSchemaRegistryClient( >> registryUrl, >> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); >> inner = new KafkaAvroDeserializer(client, props); >> } >> } >> } >> >> >> It's working locally on my machine but when I deployed it on yarn cluster >> I am getting below exception: >> >> >> java.lang.Exception: org.apache.flink.streaming.runtime.tasks. >> ExceptionInChainedOperatorException: Could not forward element to next >> operator >> at org.apache.flink.streaming.runtime.tasks. >> SourceStreamTask$LegacySourceFunctionThread >> .checkThrowSourceExecutionException(SourceStreamTask.java:212) >> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask >> .performDefaultAction(SourceStreamTask.java:132) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask >> .java:298) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >> StreamTask.java:403) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.flink.streaming.runtime.tasks. >> ExceptionInChainedOperatorException: Could not forward element to next >> operator >> at org.apache.flink.streaming.runtime.tasks. >> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654 >> ) >> at org.apache.flink.streaming.runtime.tasks. >> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) >> at org.apache.flink.streaming.runtime.tasks. >> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) >> at org.apache.flink.streaming.api.operators. >> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator >> .java:727) >> at org.apache.flink.streaming.api.operators. >> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator >> .java:705) >> at org.apache.flink.streaming.api.operators. >> StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts >> .java:104) >> at org.apache.flink.streaming.api.operators. >> StreamSourceContexts$NonTimestampContext.collectWithTimestamp( >> StreamSourceContexts.java:111) >> at org.apache.flink.streaming.connectors.kafka.internals. >> AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) >> at org.apache.flink.streaming.connectors.kafka.internal. >> Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91) >> at org.apache.flink.streaming.connectors.kafka.internal. >> Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156) >> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase >> .run(FlinkKafkaConsumerBase.java:715) >> at org.apache.flink.streaming.api.operators.StreamSource.run( >> StreamSource.java:100) >> at org.apache.flink.streaming.api.operators.StreamSource.run( >> StreamSource.java:63) >> at org.apache.flink.streaming.runtime.tasks. >> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202 >> ) >> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing >> instance of class: org.apache.avro.Schema$LockableArrayList >> Serialization trace: >> types (org.apache.avro.Schema$UnionSchema) >> schema (org.apache.avro.Schema$Field) >> fieldMap (org.apache.avro.Schema$RecordSchema) >> schema (org.apache.avro.generic.GenericData$Record) >> at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase >> .scala:136) >> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) >> at com.esotericsoftware.kryo.serializers.CollectionSerializer.create( >> CollectionSerializer.java:89) >> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read( >> CollectionSerializer.java:93) >> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read( >> CollectionSerializer.java:22) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField >> .java:106) >> at com.esotericsoftware.kryo.serializers.FieldSerializer.read( >> FieldSerializer.java:528) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField >> .java:106) >> at com.esotericsoftware.kryo.serializers.FieldSerializer.read( >> FieldSerializer.java:528) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >> MapSerializer.java:143) >> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >> MapSerializer.java:21) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField >> .java:106) >> at com.esotericsoftware.kryo.serializers.FieldSerializer.read( >> FieldSerializer.java:528) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField >> .java:106) >> at com.esotericsoftware.kryo.serializers.FieldSerializer.read( >> FieldSerializer.java:528) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) >> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer >> .copy(KryoSerializer.java:262) >> at org.apache.flink.streaming.runtime.tasks. >> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635 >> ) >> ... 13 more >> Caused by: java.lang.IllegalAccessException: Class com.twitter.chill. >> Instantiators$$anonfun$normalJava$1 can not access a member of class >> org.apache.avro.Schema$LockableArrayList with modifiers "public" >> >> Please help me to resolve this issue. >> >> Thanks, >> Anuj >> >> >> >> >> >> On Mon, Jan 20, 2020 at 9:42 PM aj <ajainje...@gmail.com> wrote: >> >>> Thanks, Arvid for all the clarification. I will work on the approach you >>> suggested. >>> >>> Thanks, >>> Anuj >>> >>> On Sat, Jan 18, 2020 at 10:17 PM Arvid Heise <ar...@ververica.com> >>> wrote: >>> >>>> Hi Anuj, >>>> >>>> I think that there may be a fundamental misunderstanding about the role >>>> of a schema registry in Kafka. So let me first clarify that. >>>> In each Avro/Parquet file, all records have the same schema. The schema >>>> is stored within the file, such that we can always retrieve the writer >>>> schema for the records. >>>> When Avro was first applied to Kafka, there was the basic question on >>>> how the writer schema for any record is known to the consumer. Storing the >>>> complete schema on each record would mean that each record would be much >>>> larger than needed. Hence, they added the schema registry that assigns a >>>> unique id to schema, which is then embedded into the records. >>>> Now, whenever I update a schema in my producer, I would have old >>>> records with the old schema id and new records with the new schema id. >>>> In my consumer, I'd use a fixed reader schema, such that my application >>>> would not need to worry if the record is written with old or new schema; my >>>> consumer would only see records with the reader schema. >>>> >>>> Given that background information, you see that in general, it's >>>> impossible with a generic approach to write the parquet with the same >>>> schema as it has been written in Kafka: the parquet schema needs to be >>>> supplied statically during query compilation while the actual used Avro >>>> schema in Kafka is only known when actually consuming data. >>>> >>>> But looking further down the road: >>>> * since you need one schema to write the parquet files, you'd need to >>>> decide: do you want to write with the new or the old schema in case of a >>>> schema update? That should also be the reader schema of your application >>>> for a given event type. >>>> * this decision has further implications: your application need to >>>> extract exactly one specific version of the schema from the schema registry >>>> at query compilation. That could be either a specific schema id or the >>>> latest schema for the event type. >>>> * that means that the output schema is locked until you restart your >>>> application and fetch a new latest schema in case of an update. >>>> * at that point, it might just be easier to use the approach that I >>>> outlined previously by bundling a specific schema with your application. >>>> >>>> If you want to extract the latest schema for a subject: >>>> >>>> var registryClient = new CachedSchemaRegistryClient(<schemaRepoUrls>, >>>> 1000); >>>> var versions = registryClient.getAllVersions(<subject>); >>>> var schema = registryClient.getSchemaById(versions.get(versions.size() - >>>> 1)); >>>> >>>> >>>> On Sat, Jan 18, 2020 at 5:22 PM aj <ajainje...@gmail.com> wrote: >>>> >>>>> Thanks, Arvid. >>>>> >>>>> I do not fully understand the above approach, >>>>> so currently, I am thinking to go with the envelope approach that you >>>>> suggested. >>>>> >>>>> One more question I have if I do not want to keep schema in my >>>>> consumer project even its a single envelope schema. I want it to be >>>>> fetched >>>>> from the schema registry and pass to my parquet-sink so that I always use >>>>> the same schema that is used by the producer. Can you provide a sample >>>>> code how can i infer the schema from the generic record or get it from >>>>> schema registry? >>>>> >>>>> >>>>> Regards, >>>>> Anuj >>>>> >>>>> >>>>> >>>>> On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com> >>>>> wrote: >>>>> >>>>>> (Readded user mailing list) >>>>>> >>>>>> Hi Anuj, >>>>>> >>>>>> since I'd still recommend going with distinct sources/sinks, let me >>>>>> try to solve your issues in this mail. If that doesn't work out, I'd >>>>>> address your concerns about the envelope format later. >>>>>> >>>>>> In Flink, you can have several subtopologies in the same application. >>>>>> >>>>>> Thus, for each event type, you can add >>>>>> AvroSource(eventType) -> generic transformation/validation -> >>>>>> AvroSink(eventType) >>>>>> for each event. >>>>>> >>>>>> I'd put all avro schema in one project and use an avro plugin to >>>>>> generate the respective Java Classes. Then I'd simply create a map of >>>>>> Avro >>>>>> Schema (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic >>>>>> name >>>>>> (event-a, event-b, ...). >>>>>> Next, I'd iterate over the list to add the respective subtopologies >>>>>> to env. >>>>>> Finally, execute everything. >>>>>> >>>>>> You have one project where all validations reside. But you'd have >>>>>> almost no overhead to process a given source of eventType. The downside >>>>>> of >>>>>> that approach is of course, that each new event type would require a >>>>>> redeployment, but that seems like what you'd want to do anyhow. >>>>>> >>>>>> Best, >>>>>> >>>>>> Arvid >>>>>> >>>>>> On Sat, Jan 18, 2020 at 2:08 PM aj <ajainje...@gmail.com> wrote: >>>>>> >>>>>>> Thanks, Arvid. >>>>>>> >>>>>>> 1. I like your approach as I can write a single consumer and put the >>>>>>> data in S3 in parquet format. The only challenge is there are extra >>>>>>> columns >>>>>>> that always going to be null as at a time I will get one type of event. >>>>>>> >>>>>>> 2. if I go with a separate schema I am not sure how I can solve it >>>>>>> using a single generalize consumer. Till now what my understanding is I >>>>>>> have to write a consumer for each type of event. Each consumer will read >>>>>>> the whole data then filter the respective events from this and then I >>>>>>> can >>>>>>> pass this stream to sink. But this does not look scalable solution as >>>>>>> the >>>>>>> new events keep growing then I have to write a consumer for each new >>>>>>> type. >>>>>>> >>>>>>> >>>>>>> DataStreamSource<GenericRecord> input = env >>>>>>> .addSource( >>>>>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>>>>> new >>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>>>> config).setStartFromEarliest()); >>>>>>> >>>>>>> Example : >>>>>>> >>>>>>> * 1st Consumer:* >>>>>>> DataStreamSource<GenericRecord> input = >>>>>>> env.addSource( >>>>>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>>>>> new >>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>>>> config).setStartFromEarliest()); >>>>>>> * DataStream<GenericRecord> aInput = >>>>>>> input.filter("event_name"= "a")* >>>>>>> >>>>>>> * 2nd Consumer:* >>>>>>> DataStreamSource<GenericRecord> input = env.addSource( >>>>>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>>>>> new >>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>>>> config).setStartFromEarliest()); >>>>>>> * DataStream<GenericRecord> bInput = >>>>>>> input.filter("event_name"= "b")* >>>>>>> >>>>>>> >>>>>>> Can you help me How I solve this using a single consumer as I do not >>>>>>> want to write a separate consumer for each type of schema? >>>>>>> >>>>>>> For example, this is my consumer that contains different types of >>>>>>> records. >>>>>>> >>>>>>> DataStreamSource<GenericRecord> input = env >>>>>>> .addSource( >>>>>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>>>>> new >>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>>>> config).setStartFromEarliest()); >>>>>>> >>>>>>> Now I can not write this stream directly as there is no common >>>>>>> schema of records in this stream. So possible way I am thinking is >>>>>>> >>>>>>> 1. Can I create multiple streams from this stream using the key by >>>>>>> on *"event_name" *and then write each stream separately. >>>>>>> >>>>>>> Just wanna know is this possible ?? >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> Anuj >>>>>>> >>>>>>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Anuj, >>>>>>>> >>>>>>>> I originally understood that you would like to store data in the >>>>>>>> same Kafka topic and also want to store it in the same parquet file. >>>>>>>> In the >>>>>>>> past, I mostly used schema registry with Kafka Streams, where you could >>>>>>>> only store a schema for a key and value respectively. To use different >>>>>>>> record types in the same kafka topic, you had to disable schema >>>>>>>> compatibility checks and just stored the schemas as different versions >>>>>>>> under the same subject. >>>>>>>> >>>>>>>> Your approach is much better. You can ensure full schema >>>>>>>> compatibility. Nevertheless, it still shares the same drawback that >>>>>>>> consumption is much harder (using GenericRecord is proof of that) if >>>>>>>> you >>>>>>>> want to read/write everything into the same place. Also you will never >>>>>>>> be >>>>>>>> able to write one consistent file, as they can only have one schema >>>>>>>> (both >>>>>>>> on Avro and Parquet). >>>>>>>> So you only have two options: >>>>>>>> * keep schemas separated, but then you also need to write separate >>>>>>>> files per record type. >>>>>>>> * have a common schema (either my outlined approach or any other >>>>>>>> wrapper schema). >>>>>>>> The approach with a common schema makes only sense if you want to >>>>>>>> write it into one table/kafka topic. >>>>>>>> >>>>>>>> However, in the last mail you pointed out that you actually want to >>>>>>>> store the record types separately. Then, you should keep everything >>>>>>>> separated. Then you should have a sink for each type each getting the >>>>>>>> respective schema. Note that you'd need to fetch the schema manually >>>>>>>> from >>>>>>>> the schema registry when creating the query as you would need to pass >>>>>>>> it to >>>>>>>> the sink. >>>>>>>> >>>>>>>> Btw, do you actually have a need to write all events into one Kafka >>>>>>>> topic? The only real use case is to preserve the time order per key. >>>>>>>> Everything else is much more complicated then storing events >>>>>>>> individually. >>>>>>>> >>>>>>>> Best, >>>>>>>> >>>>>>>> Arvid >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jan 16, 2020 at 3:39 PM aj <ajainje...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Arvid, >>>>>>>>> Thanks for the quick response. I am new to this Avro design so can >>>>>>>>> you please help me understand and design for my use case. >>>>>>>>> >>>>>>>>> I have use case like this : >>>>>>>>> >>>>>>>>> 1. we have an app where a lot of action happened from the user >>>>>>>>> side. >>>>>>>>> 2. for each action we collect some set of information that >>>>>>>>> defined using some key-value pairs. This information we want to >>>>>>>>> define as >>>>>>>>> proper schemas so that we maintain the proper format and not push >>>>>>>>> random >>>>>>>>> data. >>>>>>>>> 3. So we are defining for each action a schema and register in the >>>>>>>>> schema registry using topic+record.name as the subject . >>>>>>>>> 4. So I do not think the producer side has any issue as whenever >>>>>>>>> we push the event to Kafka we register a new schema with the above >>>>>>>>> subject. >>>>>>>>> >>>>>>>>> Example : >>>>>>>>> >>>>>>>>> { >>>>>>>>> event_name : "a" >>>>>>>>> "timestamp": >>>>>>>>> "properties" :[ >>>>>>>>> "key-1 : "val-1" >>>>>>>>> "key-2 : "val-2" >>>>>>>>> ] >>>>>>>>> } >>>>>>>>> >>>>>>>>> { >>>>>>>>> event_name : "b" >>>>>>>>> "timestamp": >>>>>>>>> "properties" :[ >>>>>>>>> "key-3 : "val-3" >>>>>>>>> "key-4 : "val-4" >>>>>>>>> ] >>>>>>>>> } >>>>>>>>> >>>>>>>>> Now I have a consumer that will parse the data by fetching the >>>>>>>>> schema from schema registry and deserialize in the generic record >>>>>>>>> streams. >>>>>>>>> >>>>>>>>> Why you think it will break as I am always deserializing with >>>>>>>>> writer schema only. >>>>>>>>> >>>>>>>>> As you suggested to keep an envelope Avro schema and not separate >>>>>>>>> schema for each type of event that I am generating. I have some doubts >>>>>>>>> about that: >>>>>>>>> >>>>>>>>> 1. How I enforce a schema on each event as it subtypes in the main >>>>>>>>> schema. so when I am getting a JSON event of type "a" how I enforce >>>>>>>>> and >>>>>>>>> convert it to subschema type of "a" and push to Kafka. >>>>>>>>> 2. I want to create a separate hive table for each of the events >>>>>>>>> so when I write this data and lets says I have 20 events than for 19 >>>>>>>>> columns I am getting null values always in data. >>>>>>>>> >>>>>>>>> Please help me in doing this right way. It will be a great help >>>>>>>>> and learning for me. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Anuj >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Anuj, >>>>>>>>>> >>>>>>>>>> you should always avoid having records with different schemas in >>>>>>>>>> the same topic/dataset. You will break the compatibility features of >>>>>>>>>> the >>>>>>>>>> schema registry and your consumer/producer code is always hard to >>>>>>>>>> maintain. >>>>>>>>>> >>>>>>>>>> A common and scalable way to avoid it is to use some kind of >>>>>>>>>> envelope format. >>>>>>>>>> >>>>>>>>>> { >>>>>>>>>> "namespace": "example", >>>>>>>>>> "name": "Envelope", >>>>>>>>>> "type": "record", >>>>>>>>>> "fields": [ >>>>>>>>>> { >>>>>>>>>> "name": "type1", >>>>>>>>>> "type": ["null", { >>>>>>>>>> "type": "record", >>>>>>>>>> "fields": [ ... ] >>>>>>>>>> }], >>>>>>>>>> "default": null >>>>>>>>>> }, >>>>>>>>>> { >>>>>>>>>> "name": "type2", >>>>>>>>>> "type": ["null", { >>>>>>>>>> "type": "record", >>>>>>>>>> "fields": [ ... ] >>>>>>>>>> }], >>>>>>>>>> "default": null >>>>>>>>>> } >>>>>>>>>> ] >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> This envelope is evolvable (arbitrary addition/removal of wrapped >>>>>>>>>> types, which by themselves can be evolved), and adds only a little >>>>>>>>>> overhead >>>>>>>>>> (1 byte per subtype). The downside is that you cannot enforce that >>>>>>>>>> exactly >>>>>>>>>> one of the subtypes is set. >>>>>>>>>> >>>>>>>>>> This schema is fully compatible with the schema registry, so no >>>>>>>>>> need to parse anything manually. >>>>>>>>>> >>>>>>>>>> This schema can easily be used with Parquet. If you can't change >>>>>>>>>> the input format anymore, you can at least use that approach on your >>>>>>>>>> output. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> >>>>>>>>>> Arvid >>>>>>>>>> >>>>>>>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <ajainje...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi All, >>>>>>>>>>> >>>>>>>>>>> I have a use case where I am getting a different set of Avro >>>>>>>>>>> records in Kafka. I am using the schema registry to store Avro >>>>>>>>>>> schema. One >>>>>>>>>>> topic can also have different types of records. >>>>>>>>>>> >>>>>>>>>>> Now I have created a GenericRecord Stream using >>>>>>>>>>> kafkaAvroDeseralizer by defining custom >>>>>>>>>>> Deserializer class like this >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public GenericRecord deserialize( >>>>>>>>>>> byte[] messageKey, byte[] message, String topic, int partition, >>>>>>>>>>> long offset) { >>>>>>>>>>> checkInitialized(); >>>>>>>>>>> return (GenericRecord) inner.deserialize(topic, message); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> private void checkInitialized() { >>>>>>>>>>> if (inner == null) { >>>>>>>>>>> Map<String, Object> props = new HashMap<>(); >>>>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, >>>>>>>>>>> registryUrl); >>>>>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, >>>>>>>>>>> false); >>>>>>>>>>> SchemaRegistryClient client = >>>>>>>>>>> new CachedSchemaRegistryClient( >>>>>>>>>>> registryUrl, >>>>>>>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); >>>>>>>>>>> inner = new KafkaAvroDeserializer(client, props); >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> And this is my consumer code : >>>>>>>>>>> >>>>>>>>>>> DataStreamSource<GenericRecord> input = env >>>>>>>>>>> .addSource( >>>>>>>>>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>>>>>>>>> new >>>>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>>>>>>>> config).setStartFromEarliest()); >>>>>>>>>>> >>>>>>>>>>> Now I want to write this stream partition on >>>>>>>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I >>>>>>>>>>> can expose hive tables directly on top of this data. >>>>>>>>>>> event_name is common field for all types of records that I am >>>>>>>>>>> getting in Kafka. >>>>>>>>>>> I am stuck as parquet writer needs a schema to write but my >>>>>>>>>>> different records have different schemas So how do I write this >>>>>>>>>>> stream in >>>>>>>>>>> s3 in above partition format. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks & Regards, >>>>>>>>>>> Anuj Jain >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Thanks & Regards, >>>>>>>>> Anuj Jain >>>>>>>>> Mob. : +91- 8588817877 >>>>>>>>> Skype : anuj.jain07 >>>>>>>>> <http://www.oracle.com/> >>>>>>>>> >>>>>>>>> >>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Thanks & Regards, >>>>>>> Anuj Jain >>>>>>> Mob. : +91- 8588817877 >>>>>>> Skype : anuj.jain07 >>>>>>> <http://www.oracle.com/> >>>>>>> >>>>>>> >>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Thanks & Regards, >>>>> Anuj Jain >>>>> Mob. : +91- 8588817877 >>>>> Skype : anuj.jain07 >>>>> <http://www.oracle.com/> >>>>> >>>>> >>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>> >>>> >>> >>> -- >>> Thanks & Regards, >>> Anuj Jain >>> Mob. : +91- 8588817877 >>> Skype : anuj.jain07 >>> <http://www.oracle.com/> >>> >>> >>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>> >> >> >> -- >> Thanks & Regards, >> Anuj Jain >> Mob. : +91- 8588817877 >> Skype : anuj.jain07 >> <http://www.oracle.com/> >> >> >> <http://www.cse.iitm.ac.in/%7Eanujjain/> >> >