This is what we’re using as our serializer:

Somewhere:

           env.addDefaultKryoSerializer(Record.class, GRKryoSerializer.class);

then

public class GRKryoSerializer extends Serializer<GenericData.Record>
{
     static class AvroStuff
     {
           Schema schema;
           String comment;
           long key;
           DatumReader<GenericRecord> reader;
           DatumWriter<GenericRecord> writer;
     }
     static Map<Long, AvroStuff> schemaMap = new ConcurrentHashMap<>();
     static Map<Schema, Long> schemaToFingerprintMap = new 
ConcurrentHashMap<>();
     static Logger log = Logger.getLogger(GRKryoSerializer.class.getName());


     static public void preregisterSchema(String comment, Schema schema)
     {
           if(!schemaToFingerprintMap.containsKey(schema)){
                long fingerprint = 
SchemaNormalization.parsingFingerprint64(schema);
                AvroStuff stuff = new AvroStuff();
                stuff.schema = schema;
                stuff.comment = comment;
                stuff.key = fingerprint;
                stuff.reader = new GenericDatumReader<>(schema);
                stuff.writer = new GenericDatumWriter<>(schema);
                log.info(String.format("Preregistering schema for %s with 
fingerprint %d", comment, fingerprint));
                schemaMap.put(fingerprint, stuff);
                schemaToFingerprintMap.put(schema, fingerprint);
           }
     }

     public GRKryoSerializer() {
     }

     static public void clearSchemaCache()
     {
           schemaToFingerprintMap.clear();
           schemaMap.clear();
     }

     static public AvroStuff getStuffFor(GenericRecord o)
     {
           return getStuffFor(o.getSchema());
     }

     static public AvroStuff getStuffFor(Schema schema)
     {
           Long fingerprint = schemaToFingerprintMap.get(schema);
           if(fingerprint == null)
           {

                fingerprint = SchemaNormalization.parsingFingerprint64(schema);
                log.info(String.format("No fingerprint. Generated %d for schema 
%s", fingerprint, schema.toString(true)));
                schemaToFingerprintMap.put(schema, fingerprint);

                throw new RuntimeException("Unknown schema " + 
schema.toString(true));

           }
           return schemaMap.get(fingerprint);
     }

     @Override
     public void write(Kryo kryo, Output output, GenericData.Record object)
     {
           AvroStuff stuff = getStuffFor(object);

           BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(output, 
null);
           try {
                // write the schema key not the schema
                encoder.writeLong(stuff.key);
                // write the binary version of the fields only
                stuff.writer.write(object, encoder);
                encoder.flush();
           } catch (IOException e)
           {
                throw new RuntimeException(e);
           }
     }

     @Override
     public GenericData.Record read(Kryo kryo, Input input, 
Class<GenericData.Record> type)
     {
           BinaryDecoder decoder = 
DecoderFactory.get().directBinaryDecoder(input, null);
           long fingerPrint;
           try {
                // read the key
                fingerPrint = decoder.readLong();
                // find it
                AvroStuff stuff = schemaMap.get(fingerPrint);
                // inflate using correct preregistered inflator
                return (Record) stuff.reader.read(null, decoder);
           } catch (IOException e) {
                throw new RuntimeException(e);
           }
     }


}

We add an instance of one of these to all our Flink Rich operations:


public class GRBuilder implements Serializable {
     public String getComment() {
           return comment;
     }

     public void setSchema(Schema schema) {
           this.schema = schema;
     }

     /**
     *
      */
     private static final long serialVersionUID = -3441080975441473751L;
     String schemaString;
     String comment;

     transient GenericRecordBuilder builder = null;
     transient Schema schema = null;

     public void registerSchema(){
           GRKryoSerializer.preregisterSchema(comment, getSchema());
     }

     private void readObject(ObjectInputStream input)
            throws IOException, ClassNotFoundException
     {
           realReadObject(input);
     }

     private void writeObject(ObjectOutputStream output)
            throws IOException, ClassNotFoundException
     {
           realWriteObject(output);
     }

     // Ensure on inflation, the schema is registered against
     // the hashcode locally so we can inflate that type

     protected void realReadObject(ObjectInputStream input)
            throws IOException, ClassNotFoundException
     {
           schemaString = input.readUTF();
           comment = input.readUTF();
           builder = null;
           schema = null;
           GRKryoSerializer.preregisterSchema(comment, getSchema());
     }

     protected void realWriteObject(ObjectOutputStream output)
            throws IOException, ClassNotFoundException
     {
           output.writeUTF(schemaString);
           output.writeUTF(comment);
     }

     public GRBuilder()
     {
     }

     public GRBuilder(String comment , Schema s){
           schemaString = s.toString();
           builder = null;
           this.comment = comment;

           GRKryoSerializer.preregisterSchema(comment, s);
     }

     public synchronized GenericRecordBuilder getBuilder()
     {
           if(builder == null)
           {
                builder = new GenericRecordBuilder(getSchema());
           }
           return builder;
     }

     public synchronized Schema getSchema()
     {
           if(schema == null)
           {
                Schema.Parser p = new Schema.Parser();
                schema = p.parse(schemaString);
           }
           return schema;
     }
}

Our Mappers and such use the GRBuilder on the FlatMap rich class for example to 
get a Builder to create the output records for collection. We need to have A 
GRBUilder for each possible genericrecord schema as a variable on a Map object.

If we were torefactor this using the GenericTypeInfo or AvroSerializer, how 
would you suggest doing it?

Thanks


From: Stephan Ewen [mailto:se...@apache.org]
Sent: Thursday, March 02, 2017 3:07 PM
To: user@flink.apache.org; Aljoscha Krettek
Subject: Re: Serialization performance

Hi!

Thanks for this writeup, very cool stuff !

For part (1) - serialization: I think that can be made a bit nicer. Avro is a 
bit of an odd citizen in Flink, because Flink serialization is actually schema 
aware, but does not integrate with Avro. That's why Avro types go through Kryo.

We should try and make Avro a first class citizen.
  - The first step is to have a proper AvroSerializer. We have implemented one 
already, see "org.apache.flink.api.java.typeutils.runtime.AvroSerializer". It 
works with the ReflectDatumReader/Writer, but would be a good base line for all 
types of avro-based serializers in Flink..

  - Then we need to figure out how the Avro Serializer is instantiated. We 
could just let the "GenericTypeInfo" create an Avro serializer for Avro types, 
and Kryo for all else.
  - The change would eventually have to be behind a config flag in the 
ExecutionConfig (see "GenericTypeInfo.createSerializer()") to make sure we do 
not break the default serialization format within a major release version.


A side note: If you actually use that through Beam, I am actually not sure what 
will happen, because as far as I know, Beam  uses its completely own 
serialization system and Flink sees only byte coders from Beam. Aljoscha can 
probably add more detail here.


For part (2) - the filters: If I understand correctly, you "split" the data 
into different result sets that go to different sinks? The DataStream API has a 
"split/select" type of construct which would help there, the DataSet API does 
not have something like that. If you look for peak performance, the demux 
output format seems like a good workaround on the DataSet side.


Greetings,
Stephan



On Thu, Mar 2, 2017 at 8:01 PM, Newport, Billy 
<billy.newp...@gs.com<mailto:billy.newp...@gs.com>> wrote:
We’ve been working on performance for the last while. We’re using flink 1.2 
right now. We are writing batch jobs which process avro and parquet input files 
and produce parquet files.

Flink serialization costs seem to be the most significant aspect of our wall 
clock time. We have written a custom kryo serializer for GenericRecord which is 
similar to the Spark one in that it reuses avro Datum reader and writers but 
writes a hash fingerprint for the schema instead of the schema itself.

We have subclassed most of the Rich* classes in flink and now also pass to the 
constructors a Builder class which has the avro schema. When flink inflates 
these, the builders are inflated and preregister the avro schema for the hash 
code in a static map which the inflation/writing code uses later.

This is working pretty well for us, it’s like 10-20x faster than just using 
GenericRecords normally. The question is this: Is this the right way to do 
this? If it is then we can contribute it and then how to modify beam so that it 
uses this stuff under the covers, we can’t use beam at all right now as far as 
I can tell because of the performance issues with GenericRecord.

The other performance optimization is basically removing filters which again 
seem to double wall clock time. We wrote an embedded demux outputformat which 
receives a Tuple<Enum,GenericRecord> and writes to a different parquet file 
depending on Enum. This was 2x faster than a naïve 4 filters going to 4 parquet 
outputformats.

Do these optimizations seem unnecessary to some? Is there some trick we’re 
missing?


Reply via email to