Re: Flink ParquetAvroWriters Sink

2020-01-28 Thread aj
I am able to resolve this issue by setting classloader.resolve-order as parent-first. On Wed, Jan 22, 2020, 23:13 aj wrote: > Hi Arvid, > > I have implemented the code with envelope schema as you suggested but now > I am facing issues with the consumer . I have written code like this: > > FlinkK

Re: Flink ParquetAvroWriters Sink

2020-01-23 Thread aj
Hi Arvid, I am not clear with this " Note that I still recommend to just bundle the schema with your Flink application and not reinvent the wheel." Can you please help with some sample code on how it should be written. Or can we connect some way so that I can understand with you . On Thu, Jan 2

Re: Flink ParquetAvroWriters Sink

2020-01-23 Thread Arvid Heise
The issue is that your are not providing any meaningful type information, so that Flink has to resort to Kryo. You need to extract the schema during query compilation (in your main) and pass it to your deserialization schema. public TypeInformation getProducedType() { return (TypeInformation

Re: Flink ParquetAvroWriters Sink

2020-01-22 Thread aj
Hi Arvid, I want to keep generic records only and I do not want to keep the schema definition on the consumer side and should be resolve from the schema registry only. I am following the below post https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-m

Re: Flink ParquetAvroWriters Sink

2020-01-22 Thread Arvid Heise
Hi Anuj, I recommend using the ConfluentRegistryAvroDeserializationSchema [1] with a specific record that has been generated with the Avro Maven Plugin [2] or Avro Gradle Plugin [3]. That should result into almost no code and maximal maintainability. [1] https://ci.apache.org/projects/flink/flink

Re: Flink ParquetAvroWriters Sink

2020-01-22 Thread aj
Hi Arvid, I have implemented the code with envelope schema as you suggested but now I am facing issues with the consumer . I have written code like this: FlinkKafkaConsumer010 kafkaConsumer010 = new FlinkKafkaConsumer010(KAFKA_TOPICS, new KafkaGenericAvroDeserializationSchema(sche

Re: Flink ParquetAvroWriters Sink

2020-01-20 Thread aj
Thanks, Arvid for all the clarification. I will work on the approach you suggested. Thanks, Anuj On Sat, Jan 18, 2020 at 10:17 PM Arvid Heise wrote: > Hi Anuj, > > I think that there may be a fundamental misunderstanding about the role of > a schema registry in Kafka. So let me first clarify th

Re: Flink ParquetAvroWriters Sink

2020-01-18 Thread Arvid Heise
Hi Anuj, I think that there may be a fundamental misunderstanding about the role of a schema registry in Kafka. So let me first clarify that. In each Avro/Parquet file, all records have the same schema. The schema is stored within the file, such that we can always retrieve the writer schema for th

Re: Flink ParquetAvroWriters Sink

2020-01-18 Thread aj
Thanks, Arvid. I do not fully understand the above approach, so currently, I am thinking to go with the envelope approach that you suggested. One more question I have if I do not want to keep schema in my consumer project even its a single envelope schema. I want it to be fetched from the schema

Re: Flink ParquetAvroWriters Sink

2020-01-18 Thread Arvid Heise
(Readded user mailing list) Hi Anuj, since I'd still recommend going with distinct sources/sinks, let me try to solve your issues in this mail. If that doesn't work out, I'd address your concerns about the envelope format later. In Flink, you can have several subtopologies in the same applicatio

Re: Flink ParquetAvroWriters Sink

2020-01-17 Thread Arvid Heise
Hi Anuj, I originally understood that you would like to store data in the same Kafka topic and also want to store it in the same parquet file. In the past, I mostly used schema registry with Kafka Streams, where you could only store a schema for a key and value respectively. To use different recor

Re: Flink ParquetAvroWriters Sink

2020-01-16 Thread aj
Hi Arvid, Thanks for the quick response. I am new to this Avro design so can you please help me understand and design for my use case. I have use case like this : 1. we have an app where a lot of action happened from the user side. 2. for each action we collect some set of information that define

Re: Flink ParquetAvroWriters Sink

2020-01-16 Thread Arvid Heise
Hi Anuj, you should always avoid having records with different schemas in the same topic/dataset. You will break the compatibility features of the schema registry and your consumer/producer code is always hard to maintain. A common and scalable way to avoid it is to use some kind of envelope form

Flink ParquetAvroWriters Sink

2020-01-16 Thread aj
Hi All, I have a use case where I am getting a different set of Avro records in Kafka. I am using the schema registry to store Avro schema. One topic can also have different types of records. Now I have created a GenericRecord Stream using kafkaAvroDeseralizer by defining custom Deserializer clas