Hi Ravi,

Perfect, This is looking good.
Thanks for your help.

Regards,
Vijay

On Thu, Jul 30, 2020 at 5:39 AM Ravi Bhushan Ratnakar <
ravibhushanratna...@gmail.com> wrote:

> Hi Vijayendra,
>
> There is an issue with the CustomeAvroWriters.java which i shared with you
> earlier, i am sending you the fixed version, hope this will resolve the
> issue of reading it from the avro tool.
>
> Please use below supported possible string value for codecName
>
> null - for nullCodec
> deflate - for deflateCodec
> snappy - for snappyCodec
> bzip2 - for bzip2Codec
> xz - for xzCodec
>
>
> Regards,
> Ravi
>
> On Thu, Jul 30, 2020 at 8:21 AM Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> If it is possible, please share the sample output file.
>> Regards,
>> Ravi
>>
>> On Thu, Jul 30, 2020 at 3:17 AM Vijayendra Yadav <contact....@gmail.com>
>> wrote:
>>
>>> Hi Ravi,
>>>
>>> With CustomAvroWriters (SNAPPY) when I run on a cluster, it does create
>>> files, but files are not recognized as avro files by avro tools jar, when I
>>> try to deserialize it to json.
>>>
>>> Flink Logs shows:
>>> 2020-07-29 23:54:23,270 INFO com.hadoop.compression.lzo.LzoCodec -
>>> Successfully loaded & initialized native-lzo library [hadoop-lzo rev
>>> ff8f5709577defb6b78cdc1f98cfe129c4b6fe46]
>>> 2020-07-29 23:54:23,277 INFO org.apache.hadoop.io.compress.CodecPool - *Got
>>> brand-new compressor [.snappy]*
>>>
>>> 020-07-29 23:54:28,931 INFO
>>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
>>> la/gold/vdcs_gold/test/bob/raw/year=2020/month=07/day=29/hour=23/ip-10-223-69-238-2020-07-29-23-54-00-121-5e51c2df-1-0.avro
>>> with MPU ID
>>>
>>> *Avro tools:*
>>>
>>> java -jar avro-tools-1.7.4.jar *tojson*
>>> /tmp/test-s3-flink-new/raw/year\=2020/month\=07/day\=29/hour\=20/ubuntu-2020-07-29-20-35-50-746-87802bc3-2-2.avro
>>>
>>> *Exception in thread "main" java.io.IOException: Not an Avro data file*
>>>
>>>
>>> Am I missing something ?
>>>
>>> Regards,
>>> Vijay
>>>
>>>
>>>
>>> On Wed, Jul 29, 2020 at 2:08 PM Vijayendra Yadav <contact....@gmail.com>
>>> wrote:
>>>
>>>> Hi Ravi,
>>>>
>>>> Thanks for details. CustomAvrowriter was working for now.  Although its
>>>> failing for Snappy codec in local IDE with "java.lang.UnsatisfiedLinkError:
>>>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z"
>>>> I think I will have to try running it in an EMR/Hadoop environment to
>>>> get the SNAPPY library resolved.
>>>>
>>>> *About this another approach of AvroOutputFormat.*
>>>>
>>>> Does it fit in streamingfilesink API?
>>>>
>>>> StreamingFileSink.forBulkFormat(new Path(outPath),CustomAvroWriters
>>>>         .forGenericRecord(schema, codecName))
>>>>
>>>> Or its different api. Could you send one sample if you have one for
>>>> another sink api.
>>>>
>>>> Regards,
>>>> Vijay
>>>>
>>>> On Wed, Jul 29, 2020 at 12:45 PM Ravi Bhushan Ratnakar <
>>>> ravibhushanratna...@gmail.com> wrote:
>>>>
>>>>> There is another alternative which you could try like this
>>>>>
>>>>> val stream:DataStream[GenericRecord] = _
>>>>>
>>>>> val aof:AvroOutputFormat[GenericRecord] = new AvroOutputFormat(new 
>>>>> org.apache.flink.core.fs.Path(""),classOf[GenericRecord])
>>>>>
>>>>> aof.setSchema(schema)
>>>>>
>>>>> aof.setCodec(AvroOutputFormat.Codec.SNAPPY)
>>>>>
>>>>> stream:DataStream.writeUsingOutputFormat(aof)
>>>>>
>>>>> Regards,
>>>>>
>>>>> Ravi
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jul 29, 2020 at 9:12 PM Ravi Bhushan Ratnakar <
>>>>> ravibhushanratna...@gmail.com> wrote:
>>>>>
>>>>>> Hi Vijayendra,
>>>>>>
>>>>>> Currently AvroWriters doesn't support compression. If you want to use
>>>>>> compression then you need to have a custom implementation of AvroWriter
>>>>>> where you can add features of compression. Please find a sample
>>>>>> customization for AvroWriters where you could use compression. You can 
>>>>>> use
>>>>>> the example below.
>>>>>>
>>>>>> codeName = org.apache.hadoop.io.compress.SnappyCodec
>>>>>>
>>>>>> CustomAvroWriters.forGenericRecord(schema, codeName)
>>>>>>
>>>>>> Regards,
>>>>>> Ravi
>>>>>>
>>>>>> On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav <
>>>>>> contact....@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Team,
>>>>>>>
>>>>>>> Could you please provide a sample for Enabling Compression (Snappy)
>>>>>>> of Avro:
>>>>>>> DataStream[GenericRecord]
>>>>>>>
>>>>>>> AvroWriters.forGenericRecord(schema)
>>>>>>>
>>>>>>> Regards,
>>>>>>> Vijay
>>>>>>>
>>>>>>

Reply via email to