I am able to resolve this issue by setting classloader.resolve-order as
parent-first.

On Wed, Jan 22, 2020, 23:13 aj <ajainje...@gmail.com> wrote:

> Hi Arvid,
>
> I have implemented the code with envelope schema as you suggested but now
> I am facing issues with the consumer . I have written code like this:
>
> FlinkKafkaConsumer010 kafkaConsumer010 = new
> FlinkKafkaConsumer010(KAFKA_TOPICS,
>                 new
> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>                 properties);
>
> And the Deserialization class looks like this :
>
> pblic class KafkaGenericAvroDeserializationSchema implements
> KeyedDeserializationSchema<GenericRecord> {
>
>     private final String registryUrl;
>     private transient KafkaAvroDeserializer inner;
>
>     public KafkaGenericAvroDeserializationSchema(String registryUrl) {
>         this.registryUrl = registryUrl;
>     }
>
>     @Override
>     public GenericRecord deserialize(byte[] messageKey, byte[] message,
> String topic, int partition, long offset) {
>         checkInitialized();
>         return (GenericRecord) inner.deserialize(topic, message);
>     }
>
>     @Override
>     public boolean isEndOfStream(GenericRecord nextElement) {
>         return false;
>     }
>
>     @Override
>     public TypeInformation<GenericRecord> getProducedType() {
>         return TypeExtractor.getForClass(GenericRecord.class);
>     }
>
>     private void checkInitialized() {
>         if (inner == null) {
>             Map<String, Object> props = new HashMap<>();
>
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> registryUrl);
>
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>             SchemaRegistryClient client =
>                     new CachedSchemaRegistryClient(
>                             registryUrl,
> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>             inner = new KafkaAvroDeserializer(client, props);
>         }
>     }
> }
>
>
> It's working locally on my machine but when I deployed it on yarn cluster
> I am getting below exception:
>
>
> java.lang.Exception: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
>     at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask$LegacySourceFunctionThread
> .checkThrowSourceExecutionException(SourceStreamTask.java:212)
>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
> .performDefaultAction(SourceStreamTask.java:132)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
> .java:298)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:403)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 727)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 705)
>     at org.apache.flink.streaming.api.operators.
> StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts
> .java:104)
>     at org.apache.flink.streaming.api.operators.
> StreamSourceContexts$NonTimestampContext.collectWithTimestamp(
> StreamSourceContexts.java:111)
>     at org.apache.flink.streaming.connectors.kafka.internals.
> AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>     at org.apache.flink.streaming.connectors.kafka.internal.
> Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
>     at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher
> .runFetchLoop(Kafka09Fetcher.java:156)
>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
> .run(FlinkKafkaConsumerBase.java:715)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:100)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:63)
>     at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
> instance of class: org.apache.avro.Schema$LockableArrayList
> Serialization trace:
> types (org.apache.avro.Schema$UnionSchema)
> schema (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
>     at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
> 136)
>     at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.create(
> CollectionSerializer.java:89)
>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:93)
>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:22)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:106)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:106)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:143)
>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:21)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:106)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:106)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>     at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> .copy(KryoSerializer.java:262)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
>     ... 13 more
> Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.
> Instantiators$$anonfun$normalJava$1 can not access a member of class
> org.apache.avro.Schema$LockableArrayList with modifiers "public"
>
> Please help me to resolve this issue.
>
> Thanks,
> Anuj
>
>
>
>
>
> On Mon, Jan 20, 2020 at 9:42 PM aj <ajainje...@gmail.com> wrote:
>
>> Thanks, Arvid for all the clarification. I will work on the approach you
>> suggested.
>>
>> Thanks,
>> Anuj
>>
>> On Sat, Jan 18, 2020 at 10:17 PM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Anuj,
>>>
>>> I think that there may be a fundamental misunderstanding about the role
>>> of a schema registry in Kafka. So let me first clarify that.
>>> In each Avro/Parquet file, all records have the same schema. The schema
>>> is stored within the file, such that we can always retrieve the writer
>>> schema for the records.
>>> When Avro was first applied to Kafka, there was the basic question on
>>> how the writer schema for any record is known to the consumer. Storing the
>>> complete schema on each record would mean that each record would be much
>>> larger than needed. Hence, they added the schema registry that assigns a
>>> unique id to schema, which is then embedded into the records.
>>> Now, whenever I update a schema in my producer, I would have old records
>>> with the old schema id and new records with the new schema id.
>>> In my consumer, I'd use a fixed reader schema, such that my application
>>> would not need to worry if the record is written with old or new schema; my
>>> consumer would only see records with the reader schema.
>>>
>>> Given that background information, you see that in general, it's
>>> impossible with a generic approach to write the parquet with the same
>>> schema as it has been written in Kafka: the parquet schema needs to be
>>> supplied statically during query compilation while the actual used Avro
>>> schema in Kafka is only known when actually consuming data.
>>>
>>> But looking further down the road:
>>> * since you need one schema to write the parquet files, you'd need to
>>> decide: do you want to write with the new or the old schema in case of a
>>> schema update? That should also be the reader schema of your application
>>> for a given event type.
>>> * this decision has further implications: your application need to
>>> extract exactly one specific version of the schema from the schema registry
>>> at query compilation. That could be either a specific schema id or the
>>> latest schema for the event type.
>>> * that means that the output schema is locked until you restart your
>>> application and fetch a new latest schema in case of an update.
>>> * at that point, it might just be easier to use the approach that I
>>> outlined previously by bundling a specific schema with your application.
>>>
>>> If you want to extract the latest schema for a subject:
>>>
>>> var registryClient = new CachedSchemaRegistryClient(<schemaRepoUrls>, 1000);
>>> var versions = registryClient.getAllVersions(<subject>);
>>> var schema = registryClient.getSchemaById(versions.get(versions.size() - 
>>> 1));
>>>
>>>
>>> On Sat, Jan 18, 2020 at 5:22 PM aj <ajainje...@gmail.com> wrote:
>>>
>>>> Thanks, Arvid.
>>>>
>>>> I do not fully understand the above approach,
>>>> so currently, I am thinking to go with the envelope approach that you
>>>> suggested.
>>>>
>>>> One more question I have if I do not want to keep schema in my consumer
>>>> project even its a single envelope schema. I want it to be fetched from the
>>>> schema registry and pass to my parquet-sink so that I always use the same
>>>> schema that is used by the producer.  Can you provide a sample code how can
>>>> i infer the schema from the generic record or get it from schema registry?
>>>>
>>>>
>>>> Regards,
>>>> Anuj
>>>>
>>>>
>>>>
>>>> On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com>
>>>> wrote:
>>>>
>>>>> (Readded user mailing list)
>>>>>
>>>>> Hi Anuj,
>>>>>
>>>>> since I'd still recommend going with distinct sources/sinks, let me
>>>>> try to solve your issues in this mail. If that doesn't work out, I'd
>>>>> address your concerns about the envelope format later.
>>>>>
>>>>> In Flink, you can have several subtopologies in the same application.
>>>>>
>>>>> Thus, for each event type, you can add
>>>>> AvroSource(eventType) -> generic transformation/validation ->
>>>>> AvroSink(eventType)
>>>>> for each event.
>>>>>
>>>>> I'd put all avro schema in one project and use an avro plugin to
>>>>> generate the respective Java Classes. Then I'd simply create a map of Avro
>>>>> Schema (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name
>>>>> (event-a, event-b, ...).
>>>>> Next, I'd iterate over the list to add the respective subtopologies to
>>>>> env.
>>>>> Finally, execute everything.
>>>>>
>>>>> You have one project where all validations reside. But you'd have
>>>>> almost no overhead to process a given source of eventType. The downside of
>>>>> that approach is of course, that each new event type would require a
>>>>> redeployment, but that seems like what you'd want to do anyhow.
>>>>>
>>>>> Best,
>>>>>
>>>>> Arvid
>>>>>
>>>>> On Sat, Jan 18, 2020 at 2:08 PM aj <ajainje...@gmail.com> wrote:
>>>>>
>>>>>> Thanks, Arvid.
>>>>>>
>>>>>> 1. I like your approach as I can write a single consumer and put the
>>>>>> data in S3 in parquet format. The only challenge is there are extra 
>>>>>> columns
>>>>>> that always going to be null as at a time I will get one type of event.
>>>>>>
>>>>>> 2. if I go with a separate schema I am not sure how I can solve it
>>>>>> using a single generalize consumer. Till now what my understanding is I
>>>>>> have to write a consumer for each type of event. Each consumer will read
>>>>>> the whole data then filter the respective events from this and then I can
>>>>>> pass this stream to sink. But this does not look scalable solution as the
>>>>>> new events keep growing then I have to write a consumer for each new 
>>>>>> type.
>>>>>>
>>>>>>
>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>         .addSource(
>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>                         new
>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>                         config).setStartFromEarliest());
>>>>>>
>>>>>> Example :
>>>>>>
>>>>>> * 1st Consumer:*
>>>>>>                   DataStreamSource<GenericRecord> input =
>>>>>> env.addSource(
>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>                         new
>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>                         config).setStartFromEarliest());
>>>>>> *                 DataStream<GenericRecord> aInput =
>>>>>> input.filter("event_name"= "a")*
>>>>>>
>>>>>> * 2nd Consumer:*
>>>>>>   DataStreamSource<GenericRecord> input = env.addSource(
>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>                         new
>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>                         config).setStartFromEarliest());
>>>>>> *                 DataStream<GenericRecord> bInput =
>>>>>> input.filter("event_name"= "b")*
>>>>>>
>>>>>>
>>>>>> Can you help me How I solve this using a single consumer as I do not
>>>>>> want to write a separate consumer for each type of schema?
>>>>>>
>>>>>> For example, this is my consumer that contains different types of
>>>>>> records.
>>>>>>
>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>         .addSource(
>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>                         new
>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>                         config).setStartFromEarliest());
>>>>>>
>>>>>> Now I can not write this stream directly as there is no common schema
>>>>>> of records in this stream. So possible way I am thinking is
>>>>>>
>>>>>> 1. Can I create multiple streams from this stream using the key by on 
>>>>>> *"event_name"
>>>>>> *and then write each stream separately.
>>>>>>
>>>>>> Just wanna know is this possible ??
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Anuj
>>>>>>
>>>>>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Anuj,
>>>>>>>
>>>>>>> I originally understood that you would like to store data in the
>>>>>>> same Kafka topic and also want to store it in the same parquet file. In 
>>>>>>> the
>>>>>>> past, I mostly used schema registry with Kafka Streams, where you could
>>>>>>> only store a schema for a key and value respectively. To use different
>>>>>>> record types in the same kafka topic, you had to disable schema
>>>>>>> compatibility checks and just stored the schemas as different versions
>>>>>>> under the same subject.
>>>>>>>
>>>>>>> Your approach is much better. You can ensure full schema
>>>>>>> compatibility. Nevertheless, it still shares the same drawback that
>>>>>>> consumption is much harder (using GenericRecord is proof of that) if you
>>>>>>> want to read/write everything into the same place. Also you will never 
>>>>>>> be
>>>>>>> able to write one consistent file, as they can only have one schema 
>>>>>>> (both
>>>>>>> on Avro and Parquet).
>>>>>>> So you only have two options:
>>>>>>> * keep schemas separated, but then you also need to write separate
>>>>>>> files per record type.
>>>>>>> * have a common schema (either my outlined approach or any other
>>>>>>> wrapper schema).
>>>>>>> The approach with a common schema makes only sense if you want to
>>>>>>> write it into one table/kafka topic.
>>>>>>>
>>>>>>> However, in the last mail you pointed out that you actually want to
>>>>>>> store the record types separately. Then, you should keep everything
>>>>>>> separated. Then you should have a sink for each type each getting the
>>>>>>> respective schema. Note that you'd need to fetch the schema manually 
>>>>>>> from
>>>>>>> the schema registry when creating the query as you would need to pass 
>>>>>>> it to
>>>>>>> the sink.
>>>>>>>
>>>>>>> Btw, do you actually have a need to write all events into one Kafka
>>>>>>> topic? The only real use case is to preserve the time order per key.
>>>>>>> Everything else is much more complicated then storing events 
>>>>>>> individually.
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Arvid
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jan 16, 2020 at 3:39 PM aj <ajainje...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Arvid,
>>>>>>>> Thanks for the quick response. I am new to this Avro design so can
>>>>>>>> you please help me understand and design for my use case.
>>>>>>>>
>>>>>>>> I have use case like this :
>>>>>>>>
>>>>>>>> 1. we have an app where a lot of action happened from the user side.
>>>>>>>> 2. for each action we collect some set of information that
>>>>>>>> defined using some key-value pairs. This information we want to define 
>>>>>>>> as
>>>>>>>> proper schemas so that we maintain the proper format and not push 
>>>>>>>> random
>>>>>>>> data.
>>>>>>>> 3. So we are defining for each action a schema and register in the
>>>>>>>> schema registry using  topic+record.name as the subject .
>>>>>>>> 4. So I do not think the producer side has any issue as whenever we
>>>>>>>> push the event to Kafka we register a new schema with the above 
>>>>>>>> subject.
>>>>>>>>
>>>>>>>> Example :
>>>>>>>>
>>>>>>>> {
>>>>>>>> event_name : "a"
>>>>>>>> "timestamp":
>>>>>>>> "properties"  :[
>>>>>>>>   "key-1 : "val-1"
>>>>>>>>    "key-2 : "val-2"
>>>>>>>> ]
>>>>>>>> }
>>>>>>>>
>>>>>>>> {
>>>>>>>> event_name : "b"
>>>>>>>> "timestamp":
>>>>>>>> "properties"  :[
>>>>>>>>   "key-3 : "val-3"
>>>>>>>>    "key-4 : "val-4"
>>>>>>>> ]
>>>>>>>> }
>>>>>>>>
>>>>>>>> Now I  have a consumer that will parse the data by fetching the
>>>>>>>> schema from schema registry and deserialize in the generic record 
>>>>>>>> streams.
>>>>>>>>
>>>>>>>> Why you think it will break as I am always deserializing with
>>>>>>>> writer schema only.
>>>>>>>>
>>>>>>>> As you suggested to keep an envelope Avro schema and not separate
>>>>>>>> schema for each type of event that I am generating. I have some doubts
>>>>>>>> about that:
>>>>>>>>
>>>>>>>> 1. How I enforce a schema on each event as it subtypes in the main
>>>>>>>> schema. so when I am getting a JSON event of type "a" how I enforce and
>>>>>>>> convert it to subschema type of "a" and push to Kafka.
>>>>>>>> 2. I want to create a separate hive table for each of the events so
>>>>>>>> when I write this data and lets says I have 20 events than for 19 
>>>>>>>> columns I
>>>>>>>> am getting null values always in data.
>>>>>>>>
>>>>>>>> Please help me in doing this right way. It will be a great help and
>>>>>>>> learning for me.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Anuj
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Anuj,
>>>>>>>>>
>>>>>>>>> you should always avoid having records with different schemas in
>>>>>>>>> the same topic/dataset. You will break the compatibility features of 
>>>>>>>>> the
>>>>>>>>> schema registry and your consumer/producer code is always hard to 
>>>>>>>>> maintain.
>>>>>>>>>
>>>>>>>>> A common and scalable way to avoid it is to use some kind of
>>>>>>>>> envelope format.
>>>>>>>>>
>>>>>>>>> {
>>>>>>>>>   "namespace": "example",
>>>>>>>>>   "name": "Envelope",
>>>>>>>>>   "type": "record",
>>>>>>>>>   "fields": [
>>>>>>>>>     {
>>>>>>>>>       "name": "type1",
>>>>>>>>>       "type": ["null", {
>>>>>>>>>         "type": "record",
>>>>>>>>>         "fields": [ ... ]
>>>>>>>>>       }],
>>>>>>>>>       "default": null
>>>>>>>>>     },
>>>>>>>>>     {
>>>>>>>>>       "name": "type2",
>>>>>>>>>       "type": ["null", {
>>>>>>>>>         "type": "record",
>>>>>>>>>         "fields": [ ... ]
>>>>>>>>>       }],
>>>>>>>>>       "default": null
>>>>>>>>>     }
>>>>>>>>>   ]
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> This envelope is evolvable (arbitrary addition/removal of wrapped
>>>>>>>>> types, which by themselves can be evolved), and adds only a little 
>>>>>>>>> overhead
>>>>>>>>> (1 byte per subtype). The downside is that you cannot enforce that 
>>>>>>>>> exactly
>>>>>>>>> one of the subtypes is set.
>>>>>>>>>
>>>>>>>>> This schema is fully compatible with the schema registry, so no
>>>>>>>>> need to parse anything manually.
>>>>>>>>>
>>>>>>>>> This schema can easily be used with Parquet. If you can't change
>>>>>>>>> the input format anymore, you can at least use that approach on your 
>>>>>>>>> output.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Arvid
>>>>>>>>>
>>>>>>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <ajainje...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> I have a use case where I am getting a different set of Avro
>>>>>>>>>> records in Kafka. I am using the schema registry to store Avro 
>>>>>>>>>> schema. One
>>>>>>>>>> topic can also have different types of records.
>>>>>>>>>>
>>>>>>>>>> Now I have created a GenericRecord Stream using
>>>>>>>>>> kafkaAvroDeseralizer by defining custom
>>>>>>>>>> Deserializer class like this
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>> public GenericRecord deserialize(
>>>>>>>>>> byte[] messageKey, byte[] message, String topic, int partition,
>>>>>>>>>> long offset) {
>>>>>>>>>> checkInitialized();
>>>>>>>>>> return (GenericRecord) inner.deserialize(topic, message);
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> private void checkInitialized() {
>>>>>>>>>> if (inner == null) {
>>>>>>>>>> Map<String, Object> props = new HashMap<>();
>>>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>>>>>>>> registryUrl);
>>>>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>>>>>>>> false);
>>>>>>>>>> SchemaRegistryClient client =
>>>>>>>>>> new CachedSchemaRegistryClient(
>>>>>>>>>> registryUrl,
>>>>>>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>>>>>>>> inner = new KafkaAvroDeserializer(client, props);
>>>>>>>>>> }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> And this is my consumer code :
>>>>>>>>>>
>>>>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>>>>         .addSource(
>>>>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>>>                         new
>>>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>>>>
>>>>>>>>>> Now I want to write this stream partition on
>>>>>>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I
>>>>>>>>>> can expose hive tables directly on top of this data.
>>>>>>>>>> event_name is common field for all types of records that I am
>>>>>>>>>> getting in Kafka.
>>>>>>>>>> I am stuck as parquet writer needs a schema to write but my
>>>>>>>>>> different records have different schemas  So how do I write this 
>>>>>>>>>> stream in
>>>>>>>>>> s3 in above partition format.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks & Regards,
>>>>>>>>>> Anuj Jain
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Thanks & Regards,
>>>>>>>> Anuj Jain
>>>>>>>> Mob. : +91- 8588817877
>>>>>>>> Skype : anuj.jain07
>>>>>>>> <http://www.oracle.com/>
>>>>>>>>
>>>>>>>>
>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks & Regards,
>>>>>> Anuj Jain
>>>>>> Mob. : +91- 8588817877
>>>>>> Skype : anuj.jain07
>>>>>> <http://www.oracle.com/>
>>>>>>
>>>>>>
>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Thanks & Regards,
>>>> Anuj Jain
>>>> Mob. : +91- 8588817877
>>>> Skype : anuj.jain07
>>>> <http://www.oracle.com/>
>>>>
>>>>
>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>
>>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>

Reply via email to