Hi Arvid,

I have implemented the code with envelope schema as you suggested but now I
am facing issues with the consumer . I have written code like this:

FlinkKafkaConsumer010 kafkaConsumer010 = new
FlinkKafkaConsumer010(KAFKA_TOPICS,
                new
KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
                properties);

And the Deserialization class looks like this :

pblic class KafkaGenericAvroDeserializationSchema implements
KeyedDeserializationSchema<GenericRecord> {

    private final String registryUrl;
    private transient KafkaAvroDeserializer inner;

    public KafkaGenericAvroDeserializationSchema(String registryUrl) {
        this.registryUrl = registryUrl;
    }

    @Override
    public GenericRecord deserialize(byte[] messageKey, byte[] message,
String topic, int partition, long offset) {
        checkInitialized();
        return (GenericRecord) inner.deserialize(topic, message);
    }

    @Override
    public boolean isEndOfStream(GenericRecord nextElement) {
        return false;
    }

    @Override
    public TypeInformation<GenericRecord> getProducedType() {
        return TypeExtractor.getForClass(GenericRecord.class);
    }

    private void checkInitialized() {
        if (inner == null) {
            Map<String, Object> props = new HashMap<>();

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
registryUrl);

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
            SchemaRegistryClient client =
                    new CachedSchemaRegistryClient(
                            registryUrl,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
            inner = new KafkaAvroDeserializer(client, props);
        }
    }
}


It's working locally on my machine but when I deployed it on yarn cluster I
am getting below exception:


java.lang.Exception: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
    at org.apache.flink.streaming.runtime.tasks.
SourceStreamTask$LegacySourceFunctionThread
.checkThrowSourceExecutionException(SourceStreamTask.java:212)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
.performDefaultAction(SourceStreamTask.java:132)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
.java:298)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:403)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
727)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
705)
    at org.apache.flink.streaming.api.operators.
StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:
104)
    at org.apache.flink.streaming.api.operators.
StreamSourceContexts$NonTimestampContext.collectWithTimestamp(
StreamSourceContexts.java:111)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher
.emitRecordWithTimestamp(AbstractFetcher.java:398)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher
.emitRecord(Kafka010Fetcher.java:91)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher
.runFetchLoop(Kafka09Fetcher.java:156)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
.run(FlinkKafkaConsumerBase.java:715)
    at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.
SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
instance of class: org.apache.avro.Schema$LockableArrayList
Serialization trace:
types (org.apache.avro.Schema$UnionSchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
    at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
136)
    at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.create(
CollectionSerializer.java:89)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
CollectionSerializer.java:93)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:143)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(
KryoSerializer.java:262)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
    ... 13 more
Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.
Instantiators$$anonfun$normalJava$1 can not access a member of class
org.apache.avro.Schema$LockableArrayList with modifiers "public"

Please help me to resolve this issue.

Thanks,
Anuj





On Mon, Jan 20, 2020 at 9:42 PM aj <ajainje...@gmail.com> wrote:

> Thanks, Arvid for all the clarification. I will work on the approach you
> suggested.
>
> Thanks,
> Anuj
>
> On Sat, Jan 18, 2020 at 10:17 PM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Anuj,
>>
>> I think that there may be a fundamental misunderstanding about the role
>> of a schema registry in Kafka. So let me first clarify that.
>> In each Avro/Parquet file, all records have the same schema. The schema
>> is stored within the file, such that we can always retrieve the writer
>> schema for the records.
>> When Avro was first applied to Kafka, there was the basic question on how
>> the writer schema for any record is known to the consumer. Storing the
>> complete schema on each record would mean that each record would be much
>> larger than needed. Hence, they added the schema registry that assigns a
>> unique id to schema, which is then embedded into the records.
>> Now, whenever I update a schema in my producer, I would have old records
>> with the old schema id and new records with the new schema id.
>> In my consumer, I'd use a fixed reader schema, such that my application
>> would not need to worry if the record is written with old or new schema; my
>> consumer would only see records with the reader schema.
>>
>> Given that background information, you see that in general, it's
>> impossible with a generic approach to write the parquet with the same
>> schema as it has been written in Kafka: the parquet schema needs to be
>> supplied statically during query compilation while the actual used Avro
>> schema in Kafka is only known when actually consuming data.
>>
>> But looking further down the road:
>> * since you need one schema to write the parquet files, you'd need to
>> decide: do you want to write with the new or the old schema in case of a
>> schema update? That should also be the reader schema of your application
>> for a given event type.
>> * this decision has further implications: your application need to
>> extract exactly one specific version of the schema from the schema registry
>> at query compilation. That could be either a specific schema id or the
>> latest schema for the event type.
>> * that means that the output schema is locked until you restart your
>> application and fetch a new latest schema in case of an update.
>> * at that point, it might just be easier to use the approach that I
>> outlined previously by bundling a specific schema with your application.
>>
>> If you want to extract the latest schema for a subject:
>>
>> var registryClient = new CachedSchemaRegistryClient(<schemaRepoUrls>, 1000);
>> var versions = registryClient.getAllVersions(<subject>);
>> var schema = registryClient.getSchemaById(versions.get(versions.size() - 1));
>>
>>
>> On Sat, Jan 18, 2020 at 5:22 PM aj <ajainje...@gmail.com> wrote:
>>
>>> Thanks, Arvid.
>>>
>>> I do not fully understand the above approach,
>>> so currently, I am thinking to go with the envelope approach that you
>>> suggested.
>>>
>>> One more question I have if I do not want to keep schema in my consumer
>>> project even its a single envelope schema. I want it to be fetched from the
>>> schema registry and pass to my parquet-sink so that I always use the same
>>> schema that is used by the producer.  Can you provide a sample code how can
>>> i infer the schema from the generic record or get it from schema registry?
>>>
>>>
>>> Regards,
>>> Anuj
>>>
>>>
>>>
>>> On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com> wrote:
>>>
>>>> (Readded user mailing list)
>>>>
>>>> Hi Anuj,
>>>>
>>>> since I'd still recommend going with distinct sources/sinks, let me try
>>>> to solve your issues in this mail. If that doesn't work out, I'd address
>>>> your concerns about the envelope format later.
>>>>
>>>> In Flink, you can have several subtopologies in the same application.
>>>>
>>>> Thus, for each event type, you can add
>>>> AvroSource(eventType) -> generic transformation/validation ->
>>>> AvroSink(eventType)
>>>> for each event.
>>>>
>>>> I'd put all avro schema in one project and use an avro plugin to
>>>> generate the respective Java Classes. Then I'd simply create a map of Avro
>>>> Schema (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name
>>>> (event-a, event-b, ...).
>>>> Next, I'd iterate over the list to add the respective subtopologies to
>>>> env.
>>>> Finally, execute everything.
>>>>
>>>> You have one project where all validations reside. But you'd have
>>>> almost no overhead to process a given source of eventType. The downside of
>>>> that approach is of course, that each new event type would require a
>>>> redeployment, but that seems like what you'd want to do anyhow.
>>>>
>>>> Best,
>>>>
>>>> Arvid
>>>>
>>>> On Sat, Jan 18, 2020 at 2:08 PM aj <ajainje...@gmail.com> wrote:
>>>>
>>>>> Thanks, Arvid.
>>>>>
>>>>> 1. I like your approach as I can write a single consumer and put the
>>>>> data in S3 in parquet format. The only challenge is there are extra 
>>>>> columns
>>>>> that always going to be null as at a time I will get one type of event.
>>>>>
>>>>> 2. if I go with a separate schema I am not sure how I can solve it
>>>>> using a single generalize consumer. Till now what my understanding is I
>>>>> have to write a consumer for each type of event. Each consumer will read
>>>>> the whole data then filter the respective events from this and then I can
>>>>> pass this stream to sink. But this does not look scalable solution as the
>>>>> new events keep growing then I have to write a consumer for each new type.
>>>>>
>>>>>
>>>>> DataStreamSource<GenericRecord> input = env
>>>>>         .addSource(
>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>                         new
>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>                         config).setStartFromEarliest());
>>>>>
>>>>> Example :
>>>>>
>>>>> * 1st Consumer:*
>>>>>                   DataStreamSource<GenericRecord> input =
>>>>> env.addSource(
>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>                         new
>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>                         config).setStartFromEarliest());
>>>>> *                 DataStream<GenericRecord> aInput =
>>>>> input.filter("event_name"= "a")*
>>>>>
>>>>> * 2nd Consumer:*
>>>>>   DataStreamSource<GenericRecord> input = env.addSource(
>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>                         new
>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>                         config).setStartFromEarliest());
>>>>> *                 DataStream<GenericRecord> bInput =
>>>>> input.filter("event_name"= "b")*
>>>>>
>>>>>
>>>>> Can you help me How I solve this using a single consumer as I do not
>>>>> want to write a separate consumer for each type of schema?
>>>>>
>>>>> For example, this is my consumer that contains different types of
>>>>> records.
>>>>>
>>>>> DataStreamSource<GenericRecord> input = env
>>>>>         .addSource(
>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>                         new
>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>                         config).setStartFromEarliest());
>>>>>
>>>>> Now I can not write this stream directly as there is no common schema
>>>>> of records in this stream. So possible way I am thinking is
>>>>>
>>>>> 1. Can I create multiple streams from this stream using the key by on 
>>>>> *"event_name"
>>>>> *and then write each stream separately.
>>>>>
>>>>> Just wanna know is this possible ??
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Anuj
>>>>>
>>>>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Anuj,
>>>>>>
>>>>>> I originally understood that you would like to store data in the same
>>>>>> Kafka topic and also want to store it in the same parquet file. In the
>>>>>> past, I mostly used schema registry with Kafka Streams, where you could
>>>>>> only store a schema for a key and value respectively. To use different
>>>>>> record types in the same kafka topic, you had to disable schema
>>>>>> compatibility checks and just stored the schemas as different versions
>>>>>> under the same subject.
>>>>>>
>>>>>> Your approach is much better. You can ensure full schema
>>>>>> compatibility. Nevertheless, it still shares the same drawback that
>>>>>> consumption is much harder (using GenericRecord is proof of that) if you
>>>>>> want to read/write everything into the same place. Also you will never be
>>>>>> able to write one consistent file, as they can only have one schema (both
>>>>>> on Avro and Parquet).
>>>>>> So you only have two options:
>>>>>> * keep schemas separated, but then you also need to write separate
>>>>>> files per record type.
>>>>>> * have a common schema (either my outlined approach or any other
>>>>>> wrapper schema).
>>>>>> The approach with a common schema makes only sense if you want to
>>>>>> write it into one table/kafka topic.
>>>>>>
>>>>>> However, in the last mail you pointed out that you actually want to
>>>>>> store the record types separately. Then, you should keep everything
>>>>>> separated. Then you should have a sink for each type each getting the
>>>>>> respective schema. Note that you'd need to fetch the schema manually from
>>>>>> the schema registry when creating the query as you would need to pass it 
>>>>>> to
>>>>>> the sink.
>>>>>>
>>>>>> Btw, do you actually have a need to write all events into one Kafka
>>>>>> topic? The only real use case is to preserve the time order per key.
>>>>>> Everything else is much more complicated then storing events 
>>>>>> individually.
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Arvid
>>>>>>
>>>>>>
>>>>>> On Thu, Jan 16, 2020 at 3:39 PM aj <ajainje...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Arvid,
>>>>>>> Thanks for the quick response. I am new to this Avro design so can
>>>>>>> you please help me understand and design for my use case.
>>>>>>>
>>>>>>> I have use case like this :
>>>>>>>
>>>>>>> 1. we have an app where a lot of action happened from the user side.
>>>>>>> 2. for each action we collect some set of information that
>>>>>>> defined using some key-value pairs. This information we want to define 
>>>>>>> as
>>>>>>> proper schemas so that we maintain the proper format and not push random
>>>>>>> data.
>>>>>>> 3. So we are defining for each action a schema and register in the
>>>>>>> schema registry using  topic+record.name as the subject .
>>>>>>> 4. So I do not think the producer side has any issue as whenever we
>>>>>>> push the event to Kafka we register a new schema with the above subject.
>>>>>>>
>>>>>>> Example :
>>>>>>>
>>>>>>> {
>>>>>>> event_name : "a"
>>>>>>> "timestamp":
>>>>>>> "properties"  :[
>>>>>>>   "key-1 : "val-1"
>>>>>>>    "key-2 : "val-2"
>>>>>>> ]
>>>>>>> }
>>>>>>>
>>>>>>> {
>>>>>>> event_name : "b"
>>>>>>> "timestamp":
>>>>>>> "properties"  :[
>>>>>>>   "key-3 : "val-3"
>>>>>>>    "key-4 : "val-4"
>>>>>>> ]
>>>>>>> }
>>>>>>>
>>>>>>> Now I  have a consumer that will parse the data by fetching the
>>>>>>> schema from schema registry and deserialize in the generic record 
>>>>>>> streams.
>>>>>>>
>>>>>>> Why you think it will break as I am always deserializing with writer
>>>>>>> schema only.
>>>>>>>
>>>>>>> As you suggested to keep an envelope Avro schema and not separate
>>>>>>> schema for each type of event that I am generating. I have some doubts
>>>>>>> about that:
>>>>>>>
>>>>>>> 1. How I enforce a schema on each event as it subtypes in the main
>>>>>>> schema. so when I am getting a JSON event of type "a" how I enforce and
>>>>>>> convert it to subschema type of "a" and push to Kafka.
>>>>>>> 2. I want to create a separate hive table for each of the events so
>>>>>>> when I write this data and lets says I have 20 events than for 19 
>>>>>>> columns I
>>>>>>> am getting null values always in data.
>>>>>>>
>>>>>>> Please help me in doing this right way. It will be a great help and
>>>>>>> learning for me.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Anuj
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Anuj,
>>>>>>>>
>>>>>>>> you should always avoid having records with different schemas in
>>>>>>>> the same topic/dataset. You will break the compatibility features of 
>>>>>>>> the
>>>>>>>> schema registry and your consumer/producer code is always hard to 
>>>>>>>> maintain.
>>>>>>>>
>>>>>>>> A common and scalable way to avoid it is to use some kind of
>>>>>>>> envelope format.
>>>>>>>>
>>>>>>>> {
>>>>>>>>   "namespace": "example",
>>>>>>>>   "name": "Envelope",
>>>>>>>>   "type": "record",
>>>>>>>>   "fields": [
>>>>>>>>     {
>>>>>>>>       "name": "type1",
>>>>>>>>       "type": ["null", {
>>>>>>>>         "type": "record",
>>>>>>>>         "fields": [ ... ]
>>>>>>>>       }],
>>>>>>>>       "default": null
>>>>>>>>     },
>>>>>>>>     {
>>>>>>>>       "name": "type2",
>>>>>>>>       "type": ["null", {
>>>>>>>>         "type": "record",
>>>>>>>>         "fields": [ ... ]
>>>>>>>>       }],
>>>>>>>>       "default": null
>>>>>>>>     }
>>>>>>>>   ]
>>>>>>>> }
>>>>>>>>
>>>>>>>> This envelope is evolvable (arbitrary addition/removal of wrapped
>>>>>>>> types, which by themselves can be evolved), and adds only a little 
>>>>>>>> overhead
>>>>>>>> (1 byte per subtype). The downside is that you cannot enforce that 
>>>>>>>> exactly
>>>>>>>> one of the subtypes is set.
>>>>>>>>
>>>>>>>> This schema is fully compatible with the schema registry, so no
>>>>>>>> need to parse anything manually.
>>>>>>>>
>>>>>>>> This schema can easily be used with Parquet. If you can't change
>>>>>>>> the input format anymore, you can at least use that approach on your 
>>>>>>>> output.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>>
>>>>>>>> Arvid
>>>>>>>>
>>>>>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <ajainje...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I have a use case where I am getting a different set of Avro
>>>>>>>>> records in Kafka. I am using the schema registry to store Avro 
>>>>>>>>> schema. One
>>>>>>>>> topic can also have different types of records.
>>>>>>>>>
>>>>>>>>> Now I have created a GenericRecord Stream using
>>>>>>>>> kafkaAvroDeseralizer by defining custom
>>>>>>>>> Deserializer class like this
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>> public GenericRecord deserialize(
>>>>>>>>> byte[] messageKey, byte[] message, String topic, int partition,
>>>>>>>>> long offset) {
>>>>>>>>> checkInitialized();
>>>>>>>>> return (GenericRecord) inner.deserialize(topic, message);
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> private void checkInitialized() {
>>>>>>>>> if (inner == null) {
>>>>>>>>> Map<String, Object> props = new HashMap<>();
>>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>>>>>>> registryUrl);
>>>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>>>>>>> false);
>>>>>>>>> SchemaRegistryClient client =
>>>>>>>>> new CachedSchemaRegistryClient(
>>>>>>>>> registryUrl,
>>>>>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>>>>>>> inner = new KafkaAvroDeserializer(client, props);
>>>>>>>>> }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> And this is my consumer code :
>>>>>>>>>
>>>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>>>         .addSource(
>>>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>>                         new
>>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>>>
>>>>>>>>> Now I want to write this stream partition on
>>>>>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I
>>>>>>>>> can expose hive tables directly on top of this data.
>>>>>>>>> event_name is common field for all types of records that I am
>>>>>>>>> getting in Kafka.
>>>>>>>>> I am stuck as parquet writer needs a schema to write but my
>>>>>>>>> different records have different schemas  So how do I write this 
>>>>>>>>> stream in
>>>>>>>>> s3 in above partition format.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks & Regards,
>>>>>>>>> Anuj Jain
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Thanks & Regards,
>>>>>>> Anuj Jain
>>>>>>> Mob. : +91- 8588817877
>>>>>>> Skype : anuj.jain07
>>>>>>> <http://www.oracle.com/>
>>>>>>>
>>>>>>>
>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Thanks & Regards,
>>>>> Anuj Jain
>>>>> Mob. : +91- 8588817877
>>>>> Skype : anuj.jain07
>>>>> <http://www.oracle.com/>
>>>>>
>>>>>
>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>
>>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anuj Jain
>>> Mob. : +91- 8588817877
>>> Skype : anuj.jain07
>>> <http://www.oracle.com/>
>>>
>>>
>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>
>>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to