Hi Ravi,

Thanks for details. CustomAvrowriter was working for now.  Although its
failing for Snappy codec in local IDE with "java.lang.UnsatisfiedLinkError:
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z"
I think I will have to try running it in an EMR/Hadoop environment to get
the SNAPPY library resolved.

*About this another approach of AvroOutputFormat.*

Does it fit in streamingfilesink API?

StreamingFileSink.forBulkFormat(new Path(outPath),CustomAvroWriters
        .forGenericRecord(schema, codecName))

Or its different api. Could you send one sample if you have one for another
sink api.

Regards,
Vijay

On Wed, Jul 29, 2020 at 12:45 PM Ravi Bhushan Ratnakar <
ravibhushanratna...@gmail.com> wrote:

> There is another alternative which you could try like this
>
> val stream:DataStream[GenericRecord] = _
>
> val aof:AvroOutputFormat[GenericRecord] = new AvroOutputFormat(new 
> org.apache.flink.core.fs.Path(""),classOf[GenericRecord])
>
> aof.setSchema(schema)
>
> aof.setCodec(AvroOutputFormat.Codec.SNAPPY)
>
> stream:DataStream.writeUsingOutputFormat(aof)
>
> Regards,
>
> Ravi
>
>
>
> On Wed, Jul 29, 2020 at 9:12 PM Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> Hi Vijayendra,
>>
>> Currently AvroWriters doesn't support compression. If you want to use
>> compression then you need to have a custom implementation of AvroWriter
>> where you can add features of compression. Please find a sample
>> customization for AvroWriters where you could use compression. You can use
>> the example below.
>>
>> codeName = org.apache.hadoop.io.compress.SnappyCodec
>>
>> CustomAvroWriters.forGenericRecord(schema, codeName)
>>
>> Regards,
>> Ravi
>>
>> On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav <contact....@gmail.com>
>> wrote:
>>
>>> Hi Team,
>>>
>>> Could you please provide a sample for Enabling Compression (Snappy) of
>>> Avro:
>>> DataStream[GenericRecord]
>>>
>>> AvroWriters.forGenericRecord(schema)
>>>
>>> Regards,
>>> Vijay
>>>
>>

Reply via email to