Hi Arvid,

I want to keep generic records only and I do not want to keep the schema
definition on the consumer side and should be resolve from the schema
registry only. I am following the below post

https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without/59865360#59865360

so please help me what is wrong with my code.



On Thu, Jan 23, 2020, 00:38 Arvid Heise <ar...@ververica.com> wrote:

> Hi Anuj,
>
> I recommend using the ConfluentRegistryAvroDeserializationSchema [1] with
> a specific record that has been generated with the Avro Maven Plugin [2] or
> Avro Gradle Plugin [3]. That should result into almost no code and maximal
> maintainability.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema
> [2] https://avro.apache.org/docs/1.8.2/gettingstartedjava.html
> [3] https://github.com/davidmc24/gradle-avro-plugin
>
> On Wed, Jan 22, 2020 at 6:43 PM aj <ajainje...@gmail.com> wrote:
>
>> Hi Arvid,
>>
>> I have implemented the code with envelope schema as you suggested but now
>> I am facing issues with the consumer . I have written code like this:
>>
>> FlinkKafkaConsumer010 kafkaConsumer010 = new
>> FlinkKafkaConsumer010(KAFKA_TOPICS,
>>                 new
>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>                 properties);
>>
>> And the Deserialization class looks like this :
>>
>> pblic class KafkaGenericAvroDeserializationSchema implements
>> KeyedDeserializationSchema<GenericRecord> {
>>
>>     private final String registryUrl;
>>     private transient KafkaAvroDeserializer inner;
>>
>>     public KafkaGenericAvroDeserializationSchema(String registryUrl) {
>>         this.registryUrl = registryUrl;
>>     }
>>
>>     @Override
>>     public GenericRecord deserialize(byte[] messageKey, byte[] message,
>> String topic, int partition, long offset) {
>>         checkInitialized();
>>         return (GenericRecord) inner.deserialize(topic, message);
>>     }
>>
>>     @Override
>>     public boolean isEndOfStream(GenericRecord nextElement) {
>>         return false;
>>     }
>>
>>     @Override
>>     public TypeInformation<GenericRecord> getProducedType() {
>>         return TypeExtractor.getForClass(GenericRecord.class);
>>     }
>>
>>     private void checkInitialized() {
>>         if (inner == null) {
>>             Map<String, Object> props = new HashMap<>();
>>
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> registryUrl);
>>
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>>             SchemaRegistryClient client =
>>                     new CachedSchemaRegistryClient(
>>                             registryUrl,
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>             inner = new KafkaAvroDeserializer(client, props);
>>         }
>>     }
>> }
>>
>>
>> It's working locally on my machine but when I deployed it on yarn cluster
>> I am getting below exception:
>>
>>
>> java.lang.Exception: org.apache.flink.streaming.runtime.tasks.
>> ExceptionInChainedOperatorException: Could not forward element to next
>> operator
>>     at org.apache.flink.streaming.runtime.tasks.
>> SourceStreamTask$LegacySourceFunctionThread
>> .checkThrowSourceExecutionException(SourceStreamTask.java:212)
>>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
>> .performDefaultAction(SourceStreamTask.java:132)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
>> .java:298)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:403)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.streaming.runtime.tasks.
>> ExceptionInChainedOperatorException: Could not forward element to next
>> operator
>>     at org.apache.flink.streaming.runtime.tasks.
>> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654
>> )
>>     at org.apache.flink.streaming.runtime.tasks.
>> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>     at org.apache.flink.streaming.runtime.tasks.
>> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>     at org.apache.flink.streaming.api.operators.
>> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator
>> .java:727)
>>     at org.apache.flink.streaming.api.operators.
>> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator
>> .java:705)
>>     at org.apache.flink.streaming.api.operators.
>> StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts
>> .java:104)
>>     at org.apache.flink.streaming.api.operators.
>> StreamSourceContexts$NonTimestampContext.collectWithTimestamp(
>> StreamSourceContexts.java:111)
>>     at org.apache.flink.streaming.connectors.kafka.internals.
>> AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>>     at org.apache.flink.streaming.connectors.kafka.internal.
>> Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
>>     at org.apache.flink.streaming.connectors.kafka.internal.
>> Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)
>>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
>> .run(FlinkKafkaConsumerBase.java:715)
>>     at org.apache.flink.streaming.api.operators.StreamSource.run(
>> StreamSource.java:100)
>>     at org.apache.flink.streaming.api.operators.StreamSource.run(
>> StreamSource.java:63)
>>     at org.apache.flink.streaming.runtime.tasks.
>> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202
>> )
>> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
>> instance of class: org.apache.avro.Schema$LockableArrayList
>> Serialization trace:
>> types (org.apache.avro.Schema$UnionSchema)
>> schema (org.apache.avro.Schema$Field)
>> fieldMap (org.apache.avro.Schema$RecordSchema)
>> schema (org.apache.avro.generic.GenericData$Record)
>>     at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase
>> .scala:136)
>>     at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
>>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.create(
>> CollectionSerializer.java:89)
>>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
>> CollectionSerializer.java:93)
>>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
>> CollectionSerializer.java:22)
>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
>> .java:106)
>>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>> FieldSerializer.java:528)
>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
>> .java:106)
>>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>> FieldSerializer.java:528)
>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>> MapSerializer.java:143)
>>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>> MapSerializer.java:21)
>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
>> .java:106)
>>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>> FieldSerializer.java:528)
>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
>> .java:106)
>>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>> FieldSerializer.java:528)
>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>>     at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
>> .copy(KryoSerializer.java:262)
>>     at org.apache.flink.streaming.runtime.tasks.
>> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635
>> )
>>     ... 13 more
>> Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.
>> Instantiators$$anonfun$normalJava$1 can not access a member of class
>> org.apache.avro.Schema$LockableArrayList with modifiers "public"
>>
>> Please help me to resolve this issue.
>>
>> Thanks,
>> Anuj
>>
>>
>>
>>
>>
>> On Mon, Jan 20, 2020 at 9:42 PM aj <ajainje...@gmail.com> wrote:
>>
>>> Thanks, Arvid for all the clarification. I will work on the approach you
>>> suggested.
>>>
>>> Thanks,
>>> Anuj
>>>
>>> On Sat, Jan 18, 2020 at 10:17 PM Arvid Heise <ar...@ververica.com>
>>> wrote:
>>>
>>>> Hi Anuj,
>>>>
>>>> I think that there may be a fundamental misunderstanding about the role
>>>> of a schema registry in Kafka. So let me first clarify that.
>>>> In each Avro/Parquet file, all records have the same schema. The schema
>>>> is stored within the file, such that we can always retrieve the writer
>>>> schema for the records.
>>>> When Avro was first applied to Kafka, there was the basic question on
>>>> how the writer schema for any record is known to the consumer. Storing the
>>>> complete schema on each record would mean that each record would be much
>>>> larger than needed. Hence, they added the schema registry that assigns a
>>>> unique id to schema, which is then embedded into the records.
>>>> Now, whenever I update a schema in my producer, I would have old
>>>> records with the old schema id and new records with the new schema id.
>>>> In my consumer, I'd use a fixed reader schema, such that my application
>>>> would not need to worry if the record is written with old or new schema; my
>>>> consumer would only see records with the reader schema.
>>>>
>>>> Given that background information, you see that in general, it's
>>>> impossible with a generic approach to write the parquet with the same
>>>> schema as it has been written in Kafka: the parquet schema needs to be
>>>> supplied statically during query compilation while the actual used Avro
>>>> schema in Kafka is only known when actually consuming data.
>>>>
>>>> But looking further down the road:
>>>> * since you need one schema to write the parquet files, you'd need to
>>>> decide: do you want to write with the new or the old schema in case of a
>>>> schema update? That should also be the reader schema of your application
>>>> for a given event type.
>>>> * this decision has further implications: your application need to
>>>> extract exactly one specific version of the schema from the schema registry
>>>> at query compilation. That could be either a specific schema id or the
>>>> latest schema for the event type.
>>>> * that means that the output schema is locked until you restart your
>>>> application and fetch a new latest schema in case of an update.
>>>> * at that point, it might just be easier to use the approach that I
>>>> outlined previously by bundling a specific schema with your application.
>>>>
>>>> If you want to extract the latest schema for a subject:
>>>>
>>>> var registryClient = new CachedSchemaRegistryClient(<schemaRepoUrls>, 
>>>> 1000);
>>>> var versions = registryClient.getAllVersions(<subject>);
>>>> var schema = registryClient.getSchemaById(versions.get(versions.size() - 
>>>> 1));
>>>>
>>>>
>>>> On Sat, Jan 18, 2020 at 5:22 PM aj <ajainje...@gmail.com> wrote:
>>>>
>>>>> Thanks, Arvid.
>>>>>
>>>>> I do not fully understand the above approach,
>>>>> so currently, I am thinking to go with the envelope approach that you
>>>>> suggested.
>>>>>
>>>>> One more question I have if I do not want to keep schema in my
>>>>> consumer project even its a single envelope schema. I want it to be 
>>>>> fetched
>>>>> from the schema registry and pass to my parquet-sink so that I always use
>>>>> the same schema that is used by the producer.  Can you provide a sample
>>>>> code how can i infer the schema from the generic record or get it from
>>>>> schema registry?
>>>>>
>>>>>
>>>>> Regards,
>>>>> Anuj
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> (Readded user mailing list)
>>>>>>
>>>>>> Hi Anuj,
>>>>>>
>>>>>> since I'd still recommend going with distinct sources/sinks, let me
>>>>>> try to solve your issues in this mail. If that doesn't work out, I'd
>>>>>> address your concerns about the envelope format later.
>>>>>>
>>>>>> In Flink, you can have several subtopologies in the same application.
>>>>>>
>>>>>> Thus, for each event type, you can add
>>>>>> AvroSource(eventType) -> generic transformation/validation ->
>>>>>> AvroSink(eventType)
>>>>>> for each event.
>>>>>>
>>>>>> I'd put all avro schema in one project and use an avro plugin to
>>>>>> generate the respective Java Classes. Then I'd simply create a map of 
>>>>>> Avro
>>>>>> Schema (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic 
>>>>>> name
>>>>>> (event-a, event-b, ...).
>>>>>> Next, I'd iterate over the list to add the respective subtopologies
>>>>>> to env.
>>>>>> Finally, execute everything.
>>>>>>
>>>>>> You have one project where all validations reside. But you'd have
>>>>>> almost no overhead to process a given source of eventType. The downside 
>>>>>> of
>>>>>> that approach is of course, that each new event type would require a
>>>>>> redeployment, but that seems like what you'd want to do anyhow.
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Arvid
>>>>>>
>>>>>> On Sat, Jan 18, 2020 at 2:08 PM aj <ajainje...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks, Arvid.
>>>>>>>
>>>>>>> 1. I like your approach as I can write a single consumer and put the
>>>>>>> data in S3 in parquet format. The only challenge is there are extra 
>>>>>>> columns
>>>>>>> that always going to be null as at a time I will get one type of event.
>>>>>>>
>>>>>>> 2. if I go with a separate schema I am not sure how I can solve it
>>>>>>> using a single generalize consumer. Till now what my understanding is I
>>>>>>> have to write a consumer for each type of event. Each consumer will read
>>>>>>> the whole data then filter the respective events from this and then I 
>>>>>>> can
>>>>>>> pass this stream to sink. But this does not look scalable solution as 
>>>>>>> the
>>>>>>> new events keep growing then I have to write a consumer for each new 
>>>>>>> type.
>>>>>>>
>>>>>>>
>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>         .addSource(
>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>                         new
>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>                         config).setStartFromEarliest());
>>>>>>>
>>>>>>> Example :
>>>>>>>
>>>>>>> * 1st Consumer:*
>>>>>>>                   DataStreamSource<GenericRecord> input =
>>>>>>> env.addSource(
>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>                         new
>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>                         config).setStartFromEarliest());
>>>>>>> *                 DataStream<GenericRecord> aInput =
>>>>>>> input.filter("event_name"= "a")*
>>>>>>>
>>>>>>> * 2nd Consumer:*
>>>>>>>   DataStreamSource<GenericRecord> input = env.addSource(
>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>                         new
>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>                         config).setStartFromEarliest());
>>>>>>> *                 DataStream<GenericRecord> bInput =
>>>>>>> input.filter("event_name"= "b")*
>>>>>>>
>>>>>>>
>>>>>>> Can you help me How I solve this using a single consumer as I do not
>>>>>>> want to write a separate consumer for each type of schema?
>>>>>>>
>>>>>>> For example, this is my consumer that contains different types of
>>>>>>> records.
>>>>>>>
>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>         .addSource(
>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>                         new
>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>                         config).setStartFromEarliest());
>>>>>>>
>>>>>>> Now I can not write this stream directly as there is no common
>>>>>>> schema of records in this stream. So possible way I am thinking is
>>>>>>>
>>>>>>> 1. Can I create multiple streams from this stream using the key by
>>>>>>> on *"event_name"  *and then write each stream separately.
>>>>>>>
>>>>>>> Just wanna know is this possible ??
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Anuj
>>>>>>>
>>>>>>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Anuj,
>>>>>>>>
>>>>>>>> I originally understood that you would like to store data in the
>>>>>>>> same Kafka topic and also want to store it in the same parquet file. 
>>>>>>>> In the
>>>>>>>> past, I mostly used schema registry with Kafka Streams, where you could
>>>>>>>> only store a schema for a key and value respectively. To use different
>>>>>>>> record types in the same kafka topic, you had to disable schema
>>>>>>>> compatibility checks and just stored the schemas as different versions
>>>>>>>> under the same subject.
>>>>>>>>
>>>>>>>> Your approach is much better. You can ensure full schema
>>>>>>>> compatibility. Nevertheless, it still shares the same drawback that
>>>>>>>> consumption is much harder (using GenericRecord is proof of that) if 
>>>>>>>> you
>>>>>>>> want to read/write everything into the same place. Also you will never 
>>>>>>>> be
>>>>>>>> able to write one consistent file, as they can only have one schema 
>>>>>>>> (both
>>>>>>>> on Avro and Parquet).
>>>>>>>> So you only have two options:
>>>>>>>> * keep schemas separated, but then you also need to write separate
>>>>>>>> files per record type.
>>>>>>>> * have a common schema (either my outlined approach or any other
>>>>>>>> wrapper schema).
>>>>>>>> The approach with a common schema makes only sense if you want to
>>>>>>>> write it into one table/kafka topic.
>>>>>>>>
>>>>>>>> However, in the last mail you pointed out that you actually want to
>>>>>>>> store the record types separately. Then, you should keep everything
>>>>>>>> separated. Then you should have a sink for each type each getting the
>>>>>>>> respective schema. Note that you'd need to fetch the schema manually 
>>>>>>>> from
>>>>>>>> the schema registry when creating the query as you would need to pass 
>>>>>>>> it to
>>>>>>>> the sink.
>>>>>>>>
>>>>>>>> Btw, do you actually have a need to write all events into one Kafka
>>>>>>>> topic? The only real use case is to preserve the time order per key.
>>>>>>>> Everything else is much more complicated then storing events 
>>>>>>>> individually.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>>
>>>>>>>> Arvid
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jan 16, 2020 at 3:39 PM aj <ajainje...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Arvid,
>>>>>>>>> Thanks for the quick response. I am new to this Avro design so can
>>>>>>>>> you please help me understand and design for my use case.
>>>>>>>>>
>>>>>>>>> I have use case like this :
>>>>>>>>>
>>>>>>>>> 1. we have an app where a lot of action happened from the user
>>>>>>>>> side.
>>>>>>>>> 2. for each action we collect some set of information that
>>>>>>>>> defined using some key-value pairs. This information we want to 
>>>>>>>>> define as
>>>>>>>>> proper schemas so that we maintain the proper format and not push 
>>>>>>>>> random
>>>>>>>>> data.
>>>>>>>>> 3. So we are defining for each action a schema and register in the
>>>>>>>>> schema registry using  topic+record.name as the subject .
>>>>>>>>> 4. So I do not think the producer side has any issue as whenever
>>>>>>>>> we push the event to Kafka we register a new schema with the above 
>>>>>>>>> subject.
>>>>>>>>>
>>>>>>>>> Example :
>>>>>>>>>
>>>>>>>>> {
>>>>>>>>> event_name : "a"
>>>>>>>>> "timestamp":
>>>>>>>>> "properties"  :[
>>>>>>>>>   "key-1 : "val-1"
>>>>>>>>>    "key-2 : "val-2"
>>>>>>>>> ]
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> {
>>>>>>>>> event_name : "b"
>>>>>>>>> "timestamp":
>>>>>>>>> "properties"  :[
>>>>>>>>>   "key-3 : "val-3"
>>>>>>>>>    "key-4 : "val-4"
>>>>>>>>> ]
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> Now I  have a consumer that will parse the data by fetching the
>>>>>>>>> schema from schema registry and deserialize in the generic record 
>>>>>>>>> streams.
>>>>>>>>>
>>>>>>>>> Why you think it will break as I am always deserializing with
>>>>>>>>> writer schema only.
>>>>>>>>>
>>>>>>>>> As you suggested to keep an envelope Avro schema and not separate
>>>>>>>>> schema for each type of event that I am generating. I have some doubts
>>>>>>>>> about that:
>>>>>>>>>
>>>>>>>>> 1. How I enforce a schema on each event as it subtypes in the main
>>>>>>>>> schema. so when I am getting a JSON event of type "a" how I enforce 
>>>>>>>>> and
>>>>>>>>> convert it to subschema type of "a" and push to Kafka.
>>>>>>>>> 2. I want to create a separate hive table for each of the events
>>>>>>>>> so when I write this data and lets says I have 20 events than for 19
>>>>>>>>> columns I am getting null values always in data.
>>>>>>>>>
>>>>>>>>> Please help me in doing this right way. It will be a great help
>>>>>>>>> and learning for me.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Anuj
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Anuj,
>>>>>>>>>>
>>>>>>>>>> you should always avoid having records with different schemas in
>>>>>>>>>> the same topic/dataset. You will break the compatibility features of 
>>>>>>>>>> the
>>>>>>>>>> schema registry and your consumer/producer code is always hard to 
>>>>>>>>>> maintain.
>>>>>>>>>>
>>>>>>>>>> A common and scalable way to avoid it is to use some kind of
>>>>>>>>>> envelope format.
>>>>>>>>>>
>>>>>>>>>> {
>>>>>>>>>>   "namespace": "example",
>>>>>>>>>>   "name": "Envelope",
>>>>>>>>>>   "type": "record",
>>>>>>>>>>   "fields": [
>>>>>>>>>>     {
>>>>>>>>>>       "name": "type1",
>>>>>>>>>>       "type": ["null", {
>>>>>>>>>>         "type": "record",
>>>>>>>>>>         "fields": [ ... ]
>>>>>>>>>>       }],
>>>>>>>>>>       "default": null
>>>>>>>>>>     },
>>>>>>>>>>     {
>>>>>>>>>>       "name": "type2",
>>>>>>>>>>       "type": ["null", {
>>>>>>>>>>         "type": "record",
>>>>>>>>>>         "fields": [ ... ]
>>>>>>>>>>       }],
>>>>>>>>>>       "default": null
>>>>>>>>>>     }
>>>>>>>>>>   ]
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> This envelope is evolvable (arbitrary addition/removal of wrapped
>>>>>>>>>> types, which by themselves can be evolved), and adds only a little 
>>>>>>>>>> overhead
>>>>>>>>>> (1 byte per subtype). The downside is that you cannot enforce that 
>>>>>>>>>> exactly
>>>>>>>>>> one of the subtypes is set.
>>>>>>>>>>
>>>>>>>>>> This schema is fully compatible with the schema registry, so no
>>>>>>>>>> need to parse anything manually.
>>>>>>>>>>
>>>>>>>>>> This schema can easily be used with Parquet. If you can't change
>>>>>>>>>> the input format anymore, you can at least use that approach on your 
>>>>>>>>>> output.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>>
>>>>>>>>>> Arvid
>>>>>>>>>>
>>>>>>>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <ajainje...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi All,
>>>>>>>>>>>
>>>>>>>>>>> I have a use case where I am getting a different set of Avro
>>>>>>>>>>> records in Kafka. I am using the schema registry to store Avro 
>>>>>>>>>>> schema. One
>>>>>>>>>>> topic can also have different types of records.
>>>>>>>>>>>
>>>>>>>>>>> Now I have created a GenericRecord Stream using
>>>>>>>>>>> kafkaAvroDeseralizer by defining custom
>>>>>>>>>>> Deserializer class like this
>>>>>>>>>>>
>>>>>>>>>>> @Override
>>>>>>>>>>> public GenericRecord deserialize(
>>>>>>>>>>> byte[] messageKey, byte[] message, String topic, int partition,
>>>>>>>>>>> long offset) {
>>>>>>>>>>> checkInitialized();
>>>>>>>>>>> return (GenericRecord) inner.deserialize(topic, message);
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> private void checkInitialized() {
>>>>>>>>>>> if (inner == null) {
>>>>>>>>>>> Map<String, Object> props = new HashMap<>();
>>>>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>>>>>>>>> registryUrl);
>>>>>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>>>>>>>>> false);
>>>>>>>>>>> SchemaRegistryClient client =
>>>>>>>>>>> new CachedSchemaRegistryClient(
>>>>>>>>>>> registryUrl,
>>>>>>>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>>>>>>>>> inner = new KafkaAvroDeserializer(client, props);
>>>>>>>>>>> }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> And this is my consumer code :
>>>>>>>>>>>
>>>>>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>>>>>         .addSource(
>>>>>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>>>>                         new
>>>>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>>>>>
>>>>>>>>>>> Now I want to write this stream partition on
>>>>>>>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I
>>>>>>>>>>> can expose hive tables directly on top of this data.
>>>>>>>>>>> event_name is common field for all types of records that I am
>>>>>>>>>>> getting in Kafka.
>>>>>>>>>>> I am stuck as parquet writer needs a schema to write but my
>>>>>>>>>>> different records have different schemas  So how do I write this 
>>>>>>>>>>> stream in
>>>>>>>>>>> s3 in above partition format.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks & Regards,
>>>>>>>>>>> Anuj Jain
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Thanks & Regards,
>>>>>>>>> Anuj Jain
>>>>>>>>> Mob. : +91- 8588817877
>>>>>>>>> Skype : anuj.jain07
>>>>>>>>> <http://www.oracle.com/>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Thanks & Regards,
>>>>>>> Anuj Jain
>>>>>>> Mob. : +91- 8588817877
>>>>>>> Skype : anuj.jain07
>>>>>>> <http://www.oracle.com/>
>>>>>>>
>>>>>>>
>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Thanks & Regards,
>>>>> Anuj Jain
>>>>> Mob. : +91- 8588817877
>>>>> Skype : anuj.jain07
>>>>> <http://www.oracle.com/>
>>>>>
>>>>>
>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>
>>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anuj Jain
>>> Mob. : +91- 8588817877
>>> Skype : anuj.jain07
>>> <http://www.oracle.com/>
>>>
>>>
>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>
>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

Reply via email to