Hi Ravi, Perfect, This is looking good. Thanks for your help.
Regards, Vijay On Thu, Jul 30, 2020 at 5:39 AM Ravi Bhushan Ratnakar < ravibhushanratna...@gmail.com> wrote: > Hi Vijayendra, > > There is an issue with the CustomeAvroWriters.java which i shared with you > earlier, i am sending you the fixed version, hope this will resolve the > issue of reading it from the avro tool. > > Please use below supported possible string value for codecName > > null - for nullCodec > deflate - for deflateCodec > snappy - for snappyCodec > bzip2 - for bzip2Codec > xz - for xzCodec > > > Regards, > Ravi > > On Thu, Jul 30, 2020 at 8:21 AM Ravi Bhushan Ratnakar < > ravibhushanratna...@gmail.com> wrote: > >> If it is possible, please share the sample output file. >> Regards, >> Ravi >> >> On Thu, Jul 30, 2020 at 3:17 AM Vijayendra Yadav <contact....@gmail.com> >> wrote: >> >>> Hi Ravi, >>> >>> With CustomAvroWriters (SNAPPY) when I run on a cluster, it does create >>> files, but files are not recognized as avro files by avro tools jar, when I >>> try to deserialize it to json. >>> >>> Flink Logs shows: >>> 2020-07-29 23:54:23,270 INFO com.hadoop.compression.lzo.LzoCodec - >>> Successfully loaded & initialized native-lzo library [hadoop-lzo rev >>> ff8f5709577defb6b78cdc1f98cfe129c4b6fe46] >>> 2020-07-29 23:54:23,277 INFO org.apache.hadoop.io.compress.CodecPool - *Got >>> brand-new compressor [.snappy]* >>> >>> 020-07-29 23:54:28,931 INFO >>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing >>> la/gold/vdcs_gold/test/bob/raw/year=2020/month=07/day=29/hour=23/ip-10-223-69-238-2020-07-29-23-54-00-121-5e51c2df-1-0.avro >>> with MPU ID >>> >>> *Avro tools:* >>> >>> java -jar avro-tools-1.7.4.jar *tojson* >>> /tmp/test-s3-flink-new/raw/year\=2020/month\=07/day\=29/hour\=20/ubuntu-2020-07-29-20-35-50-746-87802bc3-2-2.avro >>> >>> *Exception in thread "main" java.io.IOException: Not an Avro data file* >>> >>> >>> Am I missing something ? >>> >>> Regards, >>> Vijay >>> >>> >>> >>> On Wed, Jul 29, 2020 at 2:08 PM Vijayendra Yadav <contact....@gmail.com> >>> wrote: >>> >>>> Hi Ravi, >>>> >>>> Thanks for details. CustomAvrowriter was working for now. Although its >>>> failing for Snappy codec in local IDE with "java.lang.UnsatisfiedLinkError: >>>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z" >>>> I think I will have to try running it in an EMR/Hadoop environment to >>>> get the SNAPPY library resolved. >>>> >>>> *About this another approach of AvroOutputFormat.* >>>> >>>> Does it fit in streamingfilesink API? >>>> >>>> StreamingFileSink.forBulkFormat(new Path(outPath),CustomAvroWriters >>>> .forGenericRecord(schema, codecName)) >>>> >>>> Or its different api. Could you send one sample if you have one for >>>> another sink api. >>>> >>>> Regards, >>>> Vijay >>>> >>>> On Wed, Jul 29, 2020 at 12:45 PM Ravi Bhushan Ratnakar < >>>> ravibhushanratna...@gmail.com> wrote: >>>> >>>>> There is another alternative which you could try like this >>>>> >>>>> val stream:DataStream[GenericRecord] = _ >>>>> >>>>> val aof:AvroOutputFormat[GenericRecord] = new AvroOutputFormat(new >>>>> org.apache.flink.core.fs.Path(""),classOf[GenericRecord]) >>>>> >>>>> aof.setSchema(schema) >>>>> >>>>> aof.setCodec(AvroOutputFormat.Codec.SNAPPY) >>>>> >>>>> stream:DataStream.writeUsingOutputFormat(aof) >>>>> >>>>> Regards, >>>>> >>>>> Ravi >>>>> >>>>> >>>>> >>>>> On Wed, Jul 29, 2020 at 9:12 PM Ravi Bhushan Ratnakar < >>>>> ravibhushanratna...@gmail.com> wrote: >>>>> >>>>>> Hi Vijayendra, >>>>>> >>>>>> Currently AvroWriters doesn't support compression. If you want to use >>>>>> compression then you need to have a custom implementation of AvroWriter >>>>>> where you can add features of compression. Please find a sample >>>>>> customization for AvroWriters where you could use compression. You can >>>>>> use >>>>>> the example below. >>>>>> >>>>>> codeName = org.apache.hadoop.io.compress.SnappyCodec >>>>>> >>>>>> CustomAvroWriters.forGenericRecord(schema, codeName) >>>>>> >>>>>> Regards, >>>>>> Ravi >>>>>> >>>>>> On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav < >>>>>> contact....@gmail.com> wrote: >>>>>> >>>>>>> Hi Team, >>>>>>> >>>>>>> Could you please provide a sample for Enabling Compression (Snappy) >>>>>>> of Avro: >>>>>>> DataStream[GenericRecord] >>>>>>> >>>>>>> AvroWriters.forGenericRecord(schema) >>>>>>> >>>>>>> Regards, >>>>>>> Vijay >>>>>>> >>>>>>