Re: [Compression] Flink DataStream[GenericRecord] Avrowriters

2020-07-30 Thread Vijayendra Yadav
Hi Ravi, Perfect, This is looking good. Thanks for your help. Regards, Vijay On Thu, Jul 30, 2020 at 5:39 AM Ravi Bhushan Ratnakar < ravibhushanratna...@gmail.com> wrote: > Hi Vijayendra, > > There is an issue with the CustomeAvroWriters.java which i shared with you > earlier, i am sending you

Re: [Compression] Flink DataStream[GenericRecord] Avrowriters

2020-07-30 Thread Ravi Bhushan Ratnakar
Hi Vijayendra, There is an issue with the CustomeAvroWriters.java which i shared with you earlier, i am sending you the fixed version, hope this will resolve the issue of reading it from the avro tool. Please use below supported possible string value for codecName null - for nullCodec deflate -

Re: [Compression] Flink DataStream[GenericRecord] Avrowriters

2020-07-29 Thread Vijayendra Yadav
Hi Ravi, With CustomAvroWriters (SNAPPY) when I run on a cluster, it does create files, but files are not recognized as avro files by avro tools jar, when I try to deserialize it to json. Flink Logs shows: 2020-07-29 23:54:23,270 INFO com.hadoop.compression.lzo.LzoCodec - Successfully loaded & in

Re: [Compression] Flink DataStream[GenericRecord] Avrowriters

2020-07-29 Thread Vijayendra Yadav
Hi Ravi, Thanks for details. CustomAvrowriter was working for now. Although its failing for Snappy codec in local IDE with "java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z" I think I will have to try running it in an EMR/Hadoop environment to get the

Re: [Compression] Flink DataStream[GenericRecord] Avrowriters

2020-07-29 Thread Ravi Bhushan Ratnakar
There is another alternative which you could try like this val stream:DataStream[GenericRecord] = _ val aof:AvroOutputFormat[GenericRecord] = new AvroOutputFormat(new org.apache.flink.core.fs.Path(""),classOf[GenericRecord]) aof.setSchema(schema) aof.setCodec(AvroOutputFormat.Codec.SNAPPY) str

Re: [Compression] Flink DataStream[GenericRecord] Avrowriters

2020-07-29 Thread Ravi Bhushan Ratnakar
Hi Vijayendra, Currently AvroWriters doesn't support compression. If you want to use compression then you need to have a custom implementation of AvroWriter where you can add features of compression. Please find a sample customization for AvroWriters where you could use compression. You can use th

[Compression] Flink DataStream[GenericRecord] Avrowriters

2020-07-29 Thread Vijayendra Yadav
Hi Team, Could you please provide a sample for Enabling Compression (Snappy) of Avro: DataStream[GenericRecord] AvroWriters.forGenericRecord(schema) Regards, Vijay