Hi Ravi,

With CustomAvroWriters (SNAPPY) when I run on a cluster, it does create
files, but files are not recognized as avro files by avro tools jar, when I
try to deserialize it to json.

Flink Logs shows:
2020-07-29 23:54:23,270 INFO com.hadoop.compression.lzo.LzoCodec -
Successfully loaded & initialized native-lzo library [hadoop-lzo rev
ff8f5709577defb6b78cdc1f98cfe129c4b6fe46]
2020-07-29 23:54:23,277 INFO org.apache.hadoop.io.compress.CodecPool - *Got
brand-new compressor [.snappy]*

020-07-29 23:54:28,931 INFO
org.apache.flink.fs.s3.common.writer.S3Committer - Committing
la/gold/vdcs_gold/test/bob/raw/year=2020/month=07/day=29/hour=23/ip-10-223-69-238-2020-07-29-23-54-00-121-5e51c2df-1-0.avro
with MPU ID

*Avro tools:*

java -jar avro-tools-1.7.4.jar *tojson*
/tmp/test-s3-flink-new/raw/year\=2020/month\=07/day\=29/hour\=20/ubuntu-2020-07-29-20-35-50-746-87802bc3-2-2.avro

*Exception in thread "main" java.io.IOException: Not an Avro data file*


Am I missing something ?

Regards,
Vijay



On Wed, Jul 29, 2020 at 2:08 PM Vijayendra Yadav <contact....@gmail.com>
wrote:

> Hi Ravi,
>
> Thanks for details. CustomAvrowriter was working for now.  Although its
> failing for Snappy codec in local IDE with "java.lang.UnsatisfiedLinkError:
> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z"
> I think I will have to try running it in an EMR/Hadoop environment to get
> the SNAPPY library resolved.
>
> *About this another approach of AvroOutputFormat.*
>
> Does it fit in streamingfilesink API?
>
> StreamingFileSink.forBulkFormat(new Path(outPath),CustomAvroWriters
>         .forGenericRecord(schema, codecName))
>
> Or its different api. Could you send one sample if you have one for
> another sink api.
>
> Regards,
> Vijay
>
> On Wed, Jul 29, 2020 at 12:45 PM Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> There is another alternative which you could try like this
>>
>> val stream:DataStream[GenericRecord] = _
>>
>> val aof:AvroOutputFormat[GenericRecord] = new AvroOutputFormat(new 
>> org.apache.flink.core.fs.Path(""),classOf[GenericRecord])
>>
>> aof.setSchema(schema)
>>
>> aof.setCodec(AvroOutputFormat.Codec.SNAPPY)
>>
>> stream:DataStream.writeUsingOutputFormat(aof)
>>
>> Regards,
>>
>> Ravi
>>
>>
>>
>> On Wed, Jul 29, 2020 at 9:12 PM Ravi Bhushan Ratnakar <
>> ravibhushanratna...@gmail.com> wrote:
>>
>>> Hi Vijayendra,
>>>
>>> Currently AvroWriters doesn't support compression. If you want to use
>>> compression then you need to have a custom implementation of AvroWriter
>>> where you can add features of compression. Please find a sample
>>> customization for AvroWriters where you could use compression. You can use
>>> the example below.
>>>
>>> codeName = org.apache.hadoop.io.compress.SnappyCodec
>>>
>>> CustomAvroWriters.forGenericRecord(schema, codeName)
>>>
>>> Regards,
>>> Ravi
>>>
>>> On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav <contact....@gmail.com>
>>> wrote:
>>>
>>>> Hi Team,
>>>>
>>>> Could you please provide a sample for Enabling Compression (Snappy) of
>>>> Avro:
>>>> DataStream[GenericRecord]
>>>>
>>>> AvroWriters.forGenericRecord(schema)
>>>>
>>>> Regards,
>>>> Vijay
>>>>
>>>

Reply via email to