Hi Ravi, With CustomAvroWriters (SNAPPY) when I run on a cluster, it does create files, but files are not recognized as avro files by avro tools jar, when I try to deserialize it to json.
Flink Logs shows: 2020-07-29 23:54:23,270 INFO com.hadoop.compression.lzo.LzoCodec - Successfully loaded & initialized native-lzo library [hadoop-lzo rev ff8f5709577defb6b78cdc1f98cfe129c4b6fe46] 2020-07-29 23:54:23,277 INFO org.apache.hadoop.io.compress.CodecPool - *Got brand-new compressor [.snappy]* 020-07-29 23:54:28,931 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing la/gold/vdcs_gold/test/bob/raw/year=2020/month=07/day=29/hour=23/ip-10-223-69-238-2020-07-29-23-54-00-121-5e51c2df-1-0.avro with MPU ID *Avro tools:* java -jar avro-tools-1.7.4.jar *tojson* /tmp/test-s3-flink-new/raw/year\=2020/month\=07/day\=29/hour\=20/ubuntu-2020-07-29-20-35-50-746-87802bc3-2-2.avro *Exception in thread "main" java.io.IOException: Not an Avro data file* Am I missing something ? Regards, Vijay On Wed, Jul 29, 2020 at 2:08 PM Vijayendra Yadav <contact....@gmail.com> wrote: > Hi Ravi, > > Thanks for details. CustomAvrowriter was working for now. Although its > failing for Snappy codec in local IDE with "java.lang.UnsatisfiedLinkError: > org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z" > I think I will have to try running it in an EMR/Hadoop environment to get > the SNAPPY library resolved. > > *About this another approach of AvroOutputFormat.* > > Does it fit in streamingfilesink API? > > StreamingFileSink.forBulkFormat(new Path(outPath),CustomAvroWriters > .forGenericRecord(schema, codecName)) > > Or its different api. Could you send one sample if you have one for > another sink api. > > Regards, > Vijay > > On Wed, Jul 29, 2020 at 12:45 PM Ravi Bhushan Ratnakar < > ravibhushanratna...@gmail.com> wrote: > >> There is another alternative which you could try like this >> >> val stream:DataStream[GenericRecord] = _ >> >> val aof:AvroOutputFormat[GenericRecord] = new AvroOutputFormat(new >> org.apache.flink.core.fs.Path(""),classOf[GenericRecord]) >> >> aof.setSchema(schema) >> >> aof.setCodec(AvroOutputFormat.Codec.SNAPPY) >> >> stream:DataStream.writeUsingOutputFormat(aof) >> >> Regards, >> >> Ravi >> >> >> >> On Wed, Jul 29, 2020 at 9:12 PM Ravi Bhushan Ratnakar < >> ravibhushanratna...@gmail.com> wrote: >> >>> Hi Vijayendra, >>> >>> Currently AvroWriters doesn't support compression. If you want to use >>> compression then you need to have a custom implementation of AvroWriter >>> where you can add features of compression. Please find a sample >>> customization for AvroWriters where you could use compression. You can use >>> the example below. >>> >>> codeName = org.apache.hadoop.io.compress.SnappyCodec >>> >>> CustomAvroWriters.forGenericRecord(schema, codeName) >>> >>> Regards, >>> Ravi >>> >>> On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav <contact....@gmail.com> >>> wrote: >>> >>>> Hi Team, >>>> >>>> Could you please provide a sample for Enabling Compression (Snappy) of >>>> Avro: >>>> DataStream[GenericRecord] >>>> >>>> AvroWriters.forGenericRecord(schema) >>>> >>>> Regards, >>>> Vijay >>>> >>>