Hi Anuj, I recommend using the ConfluentRegistryAvroDeserializationSchema [1] with a specific record that has been generated with the Avro Maven Plugin [2] or Avro Gradle Plugin [3]. That should result into almost no code and maximal maintainability.
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema [2] https://avro.apache.org/docs/1.8.2/gettingstartedjava.html [3] https://github.com/davidmc24/gradle-avro-plugin On Wed, Jan 22, 2020 at 6:43 PM aj <ajainje...@gmail.com> wrote: > Hi Arvid, > > I have implemented the code with envelope schema as you suggested but now > I am facing issues with the consumer . I have written code like this: > > FlinkKafkaConsumer010 kafkaConsumer010 = new > FlinkKafkaConsumer010(KAFKA_TOPICS, > new > KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), > properties); > > And the Deserialization class looks like this : > > pblic class KafkaGenericAvroDeserializationSchema implements > KeyedDeserializationSchema<GenericRecord> { > > private final String registryUrl; > private transient KafkaAvroDeserializer inner; > > public KafkaGenericAvroDeserializationSchema(String registryUrl) { > this.registryUrl = registryUrl; > } > > @Override > public GenericRecord deserialize(byte[] messageKey, byte[] message, > String topic, int partition, long offset) { > checkInitialized(); > return (GenericRecord) inner.deserialize(topic, message); > } > > @Override > public boolean isEndOfStream(GenericRecord nextElement) { > return false; > } > > @Override > public TypeInformation<GenericRecord> getProducedType() { > return TypeExtractor.getForClass(GenericRecord.class); > } > > private void checkInitialized() { > if (inner == null) { > Map<String, Object> props = new HashMap<>(); > > props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, > registryUrl); > > props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false); > SchemaRegistryClient client = > new CachedSchemaRegistryClient( > registryUrl, > AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); > inner = new KafkaAvroDeserializer(client, props); > } > } > } > > > It's working locally on my machine but when I deployed it on yarn cluster > I am getting below exception: > > > java.lang.Exception: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at org.apache.flink.streaming.runtime.tasks. > SourceStreamTask$LegacySourceFunctionThread > .checkThrowSourceExecutionException(SourceStreamTask.java:212) > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask > .performDefaultAction(SourceStreamTask.java:132) > at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask > .java:298) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:403) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at org.apache.flink.streaming.api.operators. > AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: > 727) > at org.apache.flink.streaming.api.operators. > AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: > 705) > at org.apache.flink.streaming.api.operators. > StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts > .java:104) > at org.apache.flink.streaming.api.operators. > StreamSourceContexts$NonTimestampContext.collectWithTimestamp( > StreamSourceContexts.java:111) > at org.apache.flink.streaming.connectors.kafka.internals. > AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) > at org.apache.flink.streaming.connectors.kafka.internal. > Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91) > at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher > .runFetchLoop(Kafka09Fetcher.java:156) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase > .run(FlinkKafkaConsumerBase.java:715) > at org.apache.flink.streaming.api.operators.StreamSource.run( > StreamSource.java:100) > at org.apache.flink.streaming.api.operators.StreamSource.run( > StreamSource.java:63) > at org.apache.flink.streaming.runtime.tasks. > SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202) > Caused by: com.esotericsoftware.kryo.KryoException: Error constructing > instance of class: org.apache.avro.Schema$LockableArrayList > Serialization trace: > types (org.apache.avro.Schema$UnionSchema) > schema (org.apache.avro.Schema$Field) > fieldMap (org.apache.avro.Schema$RecordSchema) > schema (org.apache.avro.generic.GenericData$Record) > at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala: > 136) > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) > at com.esotericsoftware.kryo.serializers.CollectionSerializer.create( > CollectionSerializer.java:89) > at com.esotericsoftware.kryo.serializers.CollectionSerializer.read( > CollectionSerializer.java:93) > at com.esotericsoftware.kryo.serializers.CollectionSerializer.read( > CollectionSerializer.java:22) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField > .java:106) > at com.esotericsoftware.kryo.serializers.FieldSerializer.read( > FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField > .java:106) > at com.esotericsoftware.kryo.serializers.FieldSerializer.read( > FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at com.esotericsoftware.kryo.serializers.MapSerializer.read( > MapSerializer.java:143) > at com.esotericsoftware.kryo.serializers.MapSerializer.read( > MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField > .java:106) > at com.esotericsoftware.kryo.serializers.FieldSerializer.read( > FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField > .java:106) > at com.esotericsoftware.kryo.serializers.FieldSerializer.read( > FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) > at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer > .copy(KryoSerializer.java:262) > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635) > ... 13 more > Caused by: java.lang.IllegalAccessException: Class com.twitter.chill. > Instantiators$$anonfun$normalJava$1 can not access a member of class > org.apache.avro.Schema$LockableArrayList with modifiers "public" > > Please help me to resolve this issue. > > Thanks, > Anuj > > > > > > On Mon, Jan 20, 2020 at 9:42 PM aj <ajainje...@gmail.com> wrote: > >> Thanks, Arvid for all the clarification. I will work on the approach you >> suggested. >> >> Thanks, >> Anuj >> >> On Sat, Jan 18, 2020 at 10:17 PM Arvid Heise <ar...@ververica.com> wrote: >> >>> Hi Anuj, >>> >>> I think that there may be a fundamental misunderstanding about the role >>> of a schema registry in Kafka. So let me first clarify that. >>> In each Avro/Parquet file, all records have the same schema. The schema >>> is stored within the file, such that we can always retrieve the writer >>> schema for the records. >>> When Avro was first applied to Kafka, there was the basic question on >>> how the writer schema for any record is known to the consumer. Storing the >>> complete schema on each record would mean that each record would be much >>> larger than needed. Hence, they added the schema registry that assigns a >>> unique id to schema, which is then embedded into the records. >>> Now, whenever I update a schema in my producer, I would have old records >>> with the old schema id and new records with the new schema id. >>> In my consumer, I'd use a fixed reader schema, such that my application >>> would not need to worry if the record is written with old or new schema; my >>> consumer would only see records with the reader schema. >>> >>> Given that background information, you see that in general, it's >>> impossible with a generic approach to write the parquet with the same >>> schema as it has been written in Kafka: the parquet schema needs to be >>> supplied statically during query compilation while the actual used Avro >>> schema in Kafka is only known when actually consuming data. >>> >>> But looking further down the road: >>> * since you need one schema to write the parquet files, you'd need to >>> decide: do you want to write with the new or the old schema in case of a >>> schema update? That should also be the reader schema of your application >>> for a given event type. >>> * this decision has further implications: your application need to >>> extract exactly one specific version of the schema from the schema registry >>> at query compilation. That could be either a specific schema id or the >>> latest schema for the event type. >>> * that means that the output schema is locked until you restart your >>> application and fetch a new latest schema in case of an update. >>> * at that point, it might just be easier to use the approach that I >>> outlined previously by bundling a specific schema with your application. >>> >>> If you want to extract the latest schema for a subject: >>> >>> var registryClient = new CachedSchemaRegistryClient(<schemaRepoUrls>, 1000); >>> var versions = registryClient.getAllVersions(<subject>); >>> var schema = registryClient.getSchemaById(versions.get(versions.size() - >>> 1)); >>> >>> >>> On Sat, Jan 18, 2020 at 5:22 PM aj <ajainje...@gmail.com> wrote: >>> >>>> Thanks, Arvid. >>>> >>>> I do not fully understand the above approach, >>>> so currently, I am thinking to go with the envelope approach that you >>>> suggested. >>>> >>>> One more question I have if I do not want to keep schema in my consumer >>>> project even its a single envelope schema. I want it to be fetched from the >>>> schema registry and pass to my parquet-sink so that I always use the same >>>> schema that is used by the producer. Can you provide a sample code how can >>>> i infer the schema from the generic record or get it from schema registry? >>>> >>>> >>>> Regards, >>>> Anuj >>>> >>>> >>>> >>>> On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com> >>>> wrote: >>>> >>>>> (Readded user mailing list) >>>>> >>>>> Hi Anuj, >>>>> >>>>> since I'd still recommend going with distinct sources/sinks, let me >>>>> try to solve your issues in this mail. If that doesn't work out, I'd >>>>> address your concerns about the envelope format later. >>>>> >>>>> In Flink, you can have several subtopologies in the same application. >>>>> >>>>> Thus, for each event type, you can add >>>>> AvroSource(eventType) -> generic transformation/validation -> >>>>> AvroSink(eventType) >>>>> for each event. >>>>> >>>>> I'd put all avro schema in one project and use an avro plugin to >>>>> generate the respective Java Classes. Then I'd simply create a map of Avro >>>>> Schema (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name >>>>> (event-a, event-b, ...). >>>>> Next, I'd iterate over the list to add the respective subtopologies to >>>>> env. >>>>> Finally, execute everything. >>>>> >>>>> You have one project where all validations reside. But you'd have >>>>> almost no overhead to process a given source of eventType. The downside of >>>>> that approach is of course, that each new event type would require a >>>>> redeployment, but that seems like what you'd want to do anyhow. >>>>> >>>>> Best, >>>>> >>>>> Arvid >>>>> >>>>> On Sat, Jan 18, 2020 at 2:08 PM aj <ajainje...@gmail.com> wrote: >>>>> >>>>>> Thanks, Arvid. >>>>>> >>>>>> 1. I like your approach as I can write a single consumer and put the >>>>>> data in S3 in parquet format. The only challenge is there are extra >>>>>> columns >>>>>> that always going to be null as at a time I will get one type of event. >>>>>> >>>>>> 2. if I go with a separate schema I am not sure how I can solve it >>>>>> using a single generalize consumer. Till now what my understanding is I >>>>>> have to write a consumer for each type of event. Each consumer will read >>>>>> the whole data then filter the respective events from this and then I can >>>>>> pass this stream to sink. But this does not look scalable solution as the >>>>>> new events keep growing then I have to write a consumer for each new >>>>>> type. >>>>>> >>>>>> >>>>>> DataStreamSource<GenericRecord> input = env >>>>>> .addSource( >>>>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>>>> new >>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>>> config).setStartFromEarliest()); >>>>>> >>>>>> Example : >>>>>> >>>>>> * 1st Consumer:* >>>>>> DataStreamSource<GenericRecord> input = >>>>>> env.addSource( >>>>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>>>> new >>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>>> config).setStartFromEarliest()); >>>>>> * DataStream<GenericRecord> aInput = >>>>>> input.filter("event_name"= "a")* >>>>>> >>>>>> * 2nd Consumer:* >>>>>> DataStreamSource<GenericRecord> input = env.addSource( >>>>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>>>> new >>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>>> config).setStartFromEarliest()); >>>>>> * DataStream<GenericRecord> bInput = >>>>>> input.filter("event_name"= "b")* >>>>>> >>>>>> >>>>>> Can you help me How I solve this using a single consumer as I do not >>>>>> want to write a separate consumer for each type of schema? >>>>>> >>>>>> For example, this is my consumer that contains different types of >>>>>> records. >>>>>> >>>>>> DataStreamSource<GenericRecord> input = env >>>>>> .addSource( >>>>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>>>> new >>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>>> config).setStartFromEarliest()); >>>>>> >>>>>> Now I can not write this stream directly as there is no common schema >>>>>> of records in this stream. So possible way I am thinking is >>>>>> >>>>>> 1. Can I create multiple streams from this stream using the key by on >>>>>> *"event_name" >>>>>> *and then write each stream separately. >>>>>> >>>>>> Just wanna know is this possible ?? >>>>>> >>>>>> >>>>>> Thanks, >>>>>> Anuj >>>>>> >>>>>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Anuj, >>>>>>> >>>>>>> I originally understood that you would like to store data in the >>>>>>> same Kafka topic and also want to store it in the same parquet file. In >>>>>>> the >>>>>>> past, I mostly used schema registry with Kafka Streams, where you could >>>>>>> only store a schema for a key and value respectively. To use different >>>>>>> record types in the same kafka topic, you had to disable schema >>>>>>> compatibility checks and just stored the schemas as different versions >>>>>>> under the same subject. >>>>>>> >>>>>>> Your approach is much better. You can ensure full schema >>>>>>> compatibility. Nevertheless, it still shares the same drawback that >>>>>>> consumption is much harder (using GenericRecord is proof of that) if you >>>>>>> want to read/write everything into the same place. Also you will never >>>>>>> be >>>>>>> able to write one consistent file, as they can only have one schema >>>>>>> (both >>>>>>> on Avro and Parquet). >>>>>>> So you only have two options: >>>>>>> * keep schemas separated, but then you also need to write separate >>>>>>> files per record type. >>>>>>> * have a common schema (either my outlined approach or any other >>>>>>> wrapper schema). >>>>>>> The approach with a common schema makes only sense if you want to >>>>>>> write it into one table/kafka topic. >>>>>>> >>>>>>> However, in the last mail you pointed out that you actually want to >>>>>>> store the record types separately. Then, you should keep everything >>>>>>> separated. Then you should have a sink for each type each getting the >>>>>>> respective schema. Note that you'd need to fetch the schema manually >>>>>>> from >>>>>>> the schema registry when creating the query as you would need to pass >>>>>>> it to >>>>>>> the sink. >>>>>>> >>>>>>> Btw, do you actually have a need to write all events into one Kafka >>>>>>> topic? The only real use case is to preserve the time order per key. >>>>>>> Everything else is much more complicated then storing events >>>>>>> individually. >>>>>>> >>>>>>> Best, >>>>>>> >>>>>>> Arvid >>>>>>> >>>>>>> >>>>>>> On Thu, Jan 16, 2020 at 3:39 PM aj <ajainje...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Arvid, >>>>>>>> Thanks for the quick response. I am new to this Avro design so can >>>>>>>> you please help me understand and design for my use case. >>>>>>>> >>>>>>>> I have use case like this : >>>>>>>> >>>>>>>> 1. we have an app where a lot of action happened from the user side. >>>>>>>> 2. for each action we collect some set of information that >>>>>>>> defined using some key-value pairs. This information we want to define >>>>>>>> as >>>>>>>> proper schemas so that we maintain the proper format and not push >>>>>>>> random >>>>>>>> data. >>>>>>>> 3. So we are defining for each action a schema and register in the >>>>>>>> schema registry using topic+record.name as the subject . >>>>>>>> 4. So I do not think the producer side has any issue as whenever we >>>>>>>> push the event to Kafka we register a new schema with the above >>>>>>>> subject. >>>>>>>> >>>>>>>> Example : >>>>>>>> >>>>>>>> { >>>>>>>> event_name : "a" >>>>>>>> "timestamp": >>>>>>>> "properties" :[ >>>>>>>> "key-1 : "val-1" >>>>>>>> "key-2 : "val-2" >>>>>>>> ] >>>>>>>> } >>>>>>>> >>>>>>>> { >>>>>>>> event_name : "b" >>>>>>>> "timestamp": >>>>>>>> "properties" :[ >>>>>>>> "key-3 : "val-3" >>>>>>>> "key-4 : "val-4" >>>>>>>> ] >>>>>>>> } >>>>>>>> >>>>>>>> Now I have a consumer that will parse the data by fetching the >>>>>>>> schema from schema registry and deserialize in the generic record >>>>>>>> streams. >>>>>>>> >>>>>>>> Why you think it will break as I am always deserializing with >>>>>>>> writer schema only. >>>>>>>> >>>>>>>> As you suggested to keep an envelope Avro schema and not separate >>>>>>>> schema for each type of event that I am generating. I have some doubts >>>>>>>> about that: >>>>>>>> >>>>>>>> 1. How I enforce a schema on each event as it subtypes in the main >>>>>>>> schema. so when I am getting a JSON event of type "a" how I enforce and >>>>>>>> convert it to subschema type of "a" and push to Kafka. >>>>>>>> 2. I want to create a separate hive table for each of the events so >>>>>>>> when I write this data and lets says I have 20 events than for 19 >>>>>>>> columns I >>>>>>>> am getting null values always in data. >>>>>>>> >>>>>>>> Please help me in doing this right way. It will be a great help and >>>>>>>> learning for me. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Anuj >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Anuj, >>>>>>>>> >>>>>>>>> you should always avoid having records with different schemas in >>>>>>>>> the same topic/dataset. You will break the compatibility features of >>>>>>>>> the >>>>>>>>> schema registry and your consumer/producer code is always hard to >>>>>>>>> maintain. >>>>>>>>> >>>>>>>>> A common and scalable way to avoid it is to use some kind of >>>>>>>>> envelope format. >>>>>>>>> >>>>>>>>> { >>>>>>>>> "namespace": "example", >>>>>>>>> "name": "Envelope", >>>>>>>>> "type": "record", >>>>>>>>> "fields": [ >>>>>>>>> { >>>>>>>>> "name": "type1", >>>>>>>>> "type": ["null", { >>>>>>>>> "type": "record", >>>>>>>>> "fields": [ ... ] >>>>>>>>> }], >>>>>>>>> "default": null >>>>>>>>> }, >>>>>>>>> { >>>>>>>>> "name": "type2", >>>>>>>>> "type": ["null", { >>>>>>>>> "type": "record", >>>>>>>>> "fields": [ ... ] >>>>>>>>> }], >>>>>>>>> "default": null >>>>>>>>> } >>>>>>>>> ] >>>>>>>>> } >>>>>>>>> >>>>>>>>> This envelope is evolvable (arbitrary addition/removal of wrapped >>>>>>>>> types, which by themselves can be evolved), and adds only a little >>>>>>>>> overhead >>>>>>>>> (1 byte per subtype). The downside is that you cannot enforce that >>>>>>>>> exactly >>>>>>>>> one of the subtypes is set. >>>>>>>>> >>>>>>>>> This schema is fully compatible with the schema registry, so no >>>>>>>>> need to parse anything manually. >>>>>>>>> >>>>>>>>> This schema can easily be used with Parquet. If you can't change >>>>>>>>> the input format anymore, you can at least use that approach on your >>>>>>>>> output. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> >>>>>>>>> Arvid >>>>>>>>> >>>>>>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <ajainje...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi All, >>>>>>>>>> >>>>>>>>>> I have a use case where I am getting a different set of Avro >>>>>>>>>> records in Kafka. I am using the schema registry to store Avro >>>>>>>>>> schema. One >>>>>>>>>> topic can also have different types of records. >>>>>>>>>> >>>>>>>>>> Now I have created a GenericRecord Stream using >>>>>>>>>> kafkaAvroDeseralizer by defining custom >>>>>>>>>> Deserializer class like this >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public GenericRecord deserialize( >>>>>>>>>> byte[] messageKey, byte[] message, String topic, int partition, >>>>>>>>>> long offset) { >>>>>>>>>> checkInitialized(); >>>>>>>>>> return (GenericRecord) inner.deserialize(topic, message); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> private void checkInitialized() { >>>>>>>>>> if (inner == null) { >>>>>>>>>> Map<String, Object> props = new HashMap<>(); >>>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, >>>>>>>>>> registryUrl); >>>>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, >>>>>>>>>> false); >>>>>>>>>> SchemaRegistryClient client = >>>>>>>>>> new CachedSchemaRegistryClient( >>>>>>>>>> registryUrl, >>>>>>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); >>>>>>>>>> inner = new KafkaAvroDeserializer(client, props); >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> And this is my consumer code : >>>>>>>>>> >>>>>>>>>> DataStreamSource<GenericRecord> input = env >>>>>>>>>> .addSource( >>>>>>>>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>>>>>>>> new >>>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>>>>>>> config).setStartFromEarliest()); >>>>>>>>>> >>>>>>>>>> Now I want to write this stream partition on >>>>>>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I >>>>>>>>>> can expose hive tables directly on top of this data. >>>>>>>>>> event_name is common field for all types of records that I am >>>>>>>>>> getting in Kafka. >>>>>>>>>> I am stuck as parquet writer needs a schema to write but my >>>>>>>>>> different records have different schemas So how do I write this >>>>>>>>>> stream in >>>>>>>>>> s3 in above partition format. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks & Regards, >>>>>>>>>> Anuj Jain >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Thanks & Regards, >>>>>>>> Anuj Jain >>>>>>>> Mob. : +91- 8588817877 >>>>>>>> Skype : anuj.jain07 >>>>>>>> <http://www.oracle.com/> >>>>>>>> >>>>>>>> >>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Thanks & Regards, >>>>>> Anuj Jain >>>>>> Mob. : +91- 8588817877 >>>>>> Skype : anuj.jain07 >>>>>> <http://www.oracle.com/> >>>>>> >>>>>> >>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>>> >>>>> >>>> >>>> -- >>>> Thanks & Regards, >>>> Anuj Jain >>>> Mob. : +91- 8588817877 >>>> Skype : anuj.jain07 >>>> <http://www.oracle.com/> >>>> >>>> >>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>> >>> >> >> -- >> Thanks & Regards, >> Anuj Jain >> Mob. : +91- 8588817877 >> Skype : anuj.jain07 >> <http://www.oracle.com/> >> >> >> <http://www.cse.iitm.ac.in/%7Eanujjain/> >> > > > -- > Thanks & Regards, > Anuj Jain > Mob. : +91- 8588817877 > Skype : anuj.jain07 > <http://www.oracle.com/> > > > <http://www.cse.iitm.ac.in/%7Eanujjain/> >