Hi Vijayendra, Currently AvroWriters doesn't support compression. If you want to use compression then you need to have a custom implementation of AvroWriter where you can add features of compression. Please find a sample customization for AvroWriters where you could use compression. You can use the example below.
codeName = org.apache.hadoop.io.compress.SnappyCodec CustomAvroWriters.forGenericRecord(schema, codeName) Regards, Ravi On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav <contact....@gmail.com> wrote: > Hi Team, > > Could you please provide a sample for Enabling Compression (Snappy) of > Avro: > DataStream[GenericRecord] > > AvroWriters.forGenericRecord(schema) > > Regards, > Vijay >
package org.apache.flink.formats.avro; import java.io.IOException; import java.io.OutputStream; import java.util.function.Function; import org.apache.avro.Schema; import org.apache.avro.Schema.Parser; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificRecordBase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionOutputStream; public class CustomAvroWriters { public static <T extends SpecificRecordBase> AvroWriterFactory<T> forSpecificRecord(Class<T> type) { String schemaString = SpecificData.get().getSchema(type).toString(); AvroBuilder<T> builder = (out) -> { return createAvroDataFileWriter(schemaString, SpecificDatumWriter::new, out); }; return new AvroWriterFactory(builder); } public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema) { String schemaString = schema.toString(); AvroBuilder<GenericRecord> builder = (out) -> { return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out); }; return new AvroWriterFactory(builder); } public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema, String codecName) { String schemaString = schema.toString(); AvroBuilder<GenericRecord> builder = (out) -> { return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out, codecName); }; return new AvroWriterFactory(builder); } public static <T> AvroWriterFactory<T> forReflectRecord(Class<T> type) { String schemaString = ReflectData.get().getSchema(type).toString(); AvroBuilder<T> builder = (out) -> { return createAvroDataFileWriter(schemaString, ReflectDatumWriter::new, out); }; return new AvroWriterFactory(builder); } private static <T> DataFileWriter<T> createAvroDataFileWriter(String schemaString, Function<Schema, DatumWriter<T>> datumWriterFactory, OutputStream out) throws IOException { Schema schema = (new Parser()).parse(schemaString); DatumWriter<T> datumWriter = (DatumWriter)datumWriterFactory.apply(schema); DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter); dataFileWriter.create(schema, out); return dataFileWriter; } private static <T> DataFileWriter<T> createAvroDataFileWriter(String schemaString, Function<Schema, DatumWriter<T>> datumWriterFactory, OutputStream out, String codecName) throws IOException { Schema schema = (new Parser()).parse(schemaString); DatumWriter<T> datumWriter = (DatumWriter)datumWriterFactory.apply(schema); DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter); CompressionCodec codec = getCompressionCodec(codecName); CompressionOutputStream cos = codec.createOutputStream(out); dataFileWriter.create(schema, cos); return dataFileWriter; } private CustomAvroWriters() { } private static CompressionCodec getCompressionCodec(String codecName) { CompressionCodecFactory codecFactory = new CompressionCodecFactory(new Configuration()); CompressionCodec codec = codecFactory.getCodecByName(codecName); if (codec == null) { try { codec = (CompressionCodec) Class.forName(codecName).newInstance(); }catch(Exception ex) { throw new RuntimeException("Codec " + codecName + " not found.",ex); } } return codec; } }