I'll try and add more details in a bit.

If you have some suggestions on how to make the serialization stack more
extensible, please let us know!

Some hooks exist, like TypeInfoFactories:

But I think that hook does not work for Avro...

On Tue, Mar 7, 2017 at 1:25 PM, Newport, Billy <billy.newp...@gs.com> wrote:

> I need more details, flink does not appear to be really designed to add in
> serializers in a ‘nice’ way as far as I can tell, it’s kind of hardcoded
> for Kryo right now.
> *From:* Stephan Ewen [mailto:se...@apache.org]
> *Sent:* Tuesday, March 07, 2017 6:21 AM
> *To:* user@flink.apache.org
> *Subject:* Re: Serialization performance
> Hi Billy!
> Out of curiosity: Were you able to hack some direct Avro support as I
> described in the brief writeup, or do you need some more details?
> Stephan
> On Fri, Mar 3, 2017 at 2:38 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
> Hi Billy,
> on the Beam side, you probably have looked into writing your own Coder
> (the equivalent of a TypeSerializer in Flink). If yes, did that not work
> out for you? And if yes, why?
> Best,
> Aljoscha
> On Thu, Mar 2, 2017, at 22:02, Stephan Ewen wrote:
> Hi!
> I can write some more details later, here the short answer:
>   - Your serializer would do into something like the AvroSerializer
>   - When you instantiate the AvroSerializer in GenericTypeInfo.
> createSerializer(ExecutionConfig), you pre-register the type of the
> generic type, plus all types in "ExecutionConfig.getRegisteredKryoTypes()"
>     (we abuse the Kryo type registration here as an Avro type registration
> initially, would have to polish that later)
>   - The registered types are classes, but since they are Avro types, you
> should be able to get their schema (for Reflect Data or so)
> That way, Flink would internally forward all the registrations for you
> (similar as it forwards Kryo registrations) and you don't have to manually
> do that.
> Stephan
> On Thu, Mar 2, 2017 at 9:31 PM, Newport, Billy <billy.newp...@gs.com>
> wrote:
> This is what we’re using as our serializer:
> Somewhere:
>            env.addDefaultKryoSerializer(Record.*class*, GRKryoSerializer.
> *class*);
> then
> *public* *class* GRKryoSerializer *extends* Serializer<GenericData.Record>
> {
>      *static* *class* AvroStuff
>      {
>            Schema schema;
>            String comment;
>            *long* key;
>            DatumReader<GenericRecord> reader;
>            DatumWriter<GenericRecord> writer;
>      }
>      *static* Map<Long, AvroStuff> *schemaMap* = *new*
> ConcurrentHashMap<>();
>      *static* Map<Schema, Long> *schemaToFingerprintMap* = *new*
> ConcurrentHashMap<>();
>      *static* Logger *log* = Logger.*getLogger*(GRKryoSerializer.*class*.
> getName());
>      *static* *public* *void* preregisterSchema(String comment, Schema
> schema)
>      {
>            *if*(!*schemaToFingerprintMap*.containsKey(schema)){
>                 *long* fingerprint = SchemaNormalization.
> *parsingFingerprint64*(schema);
>                 AvroStuff stuff = *new* AvroStuff();
>                 stuff.schema = schema;
>                 stuff.comment = comment;
>                 stuff.key = fingerprint;
>                 stuff.reader = *new* GenericDatumReader<>(schema);
>                 stuff.writer = *new* GenericDatumWriter<>(schema);
>                 *log*.info(String.*format*("Preregistering schema for %s
> with fingerprint %d", comment, fingerprint));
>                 *schemaMap*.put(fingerprint, stuff);
>                 *schemaToFingerprintMap*.put(schema, fingerprint);
>            }
>      }
>      *public* GRKryoSerializer() {
>      }
>      *static* *public* *void* clearSchemaCache()
>      {
>            *schemaToFingerprintMap*.clear();
>            *schemaMap*.clear();
>      }
>      *static* *public* AvroStuff getStuffFor(GenericRecord o)
>      {
>            *return* *getStuffFor*(o.getSchema());
>      }
>      *static* *public* AvroStuff getStuffFor(Schema schema)
>      {
>            Long fingerprint = *schemaToFingerprintMap*.get(schema);
>            *if*(fingerprint == *null*)
>            {
>                 fingerprint = SchemaNormalization.*parsingFingerprint64*(
> schema);
>                 *log*.info(String.*format*("No fingerprint. Generated %d
> for schema %s", fingerprint, schema.toString(*true*)));
>                 *schemaToFingerprintMap*.put(schema, fingerprint);
>                 *throw* *new* RuntimeException("Unknown schema " + schema
> .toString(*true*));
>            }
>            *return* *schemaMap*.get(fingerprint);
>      }
>      @Override
>      *public* *void* write(Kryo kryo, Output output, GenericData.Record
> object)
>      {
>            AvroStuff stuff = *getStuffFor*(object);
>            BinaryEncoder encoder = EncoderFactory.*get*().binaryEncoder(
> output, *null*);
>            *try* {
>                 // write the schema key not the schema
>                 encoder.writeLong(stuff.key);
>                 // write the binary version of the fields only
>                 stuff.writer.write(object, encoder);
>                 encoder.flush();
>            } *catch* (IOException e)
>            {
>                 *throw* *new* RuntimeException(e);
>            }
>      }
>      @Override
>      *public* GenericData.Record read(Kryo kryo, Input input,
> Class<GenericData.Record> type)
>      {
>            BinaryDecoder decoder = DecoderFactory.*get*().
> directBinaryDecoder(input, *null*);
>            *long* fingerPrint;
>            *try* {
>                 // read the key
>                 fingerPrint = decoder.readLong();
>                 // find it
>                 AvroStuff stuff = *schemaMap*.get(fingerPrint);
>                 // inflate using correct preregistered inflator
>                 *return* (Record) stuff.reader.read(*null*, decoder);
>            } *catch* (IOException e) {
>                 *throw* *new* RuntimeException(e);
>            }
>      }
> }
> We add an instance of one of these to all our Flink Rich operations:
> *public* *class* GRBuilder *implements* Serializable {
>      *public* String getComment() {
>            *return* comment;
>      }
>      *public* *void* setSchema(Schema schema) {
>            *this*.schema = schema;
>      }
>      /**
>      *
>       */
>      *private* *static* *final* *long* *serialVersionUID* =
> -3441080975441473751L;
>      String schemaString;
>      String comment;
>      *transient* GenericRecordBuilder builder = *null*;
>      *transient* Schema schema = *null*;
>      *public* *void* registerSchema(){
>            GRKryoSerializer.*preregisterSchema*(comment, getSchema());
>      }
>      *private* *void* readObject(ObjectInputStream input)
>             *throws* IOException, ClassNotFoundException
>      {
>            realReadObject(input);
>      }
>      *private* *void* writeObject(ObjectOutputStream output)
>             *throws* IOException, ClassNotFoundException
>      {
>            realWriteObject(output);
>      }
>      // Ensure on inflation, the schema is registered against
>      // the hashcode locally so we can inflate that type
>      *protected* *void* realReadObject(ObjectInputStream input)
>             *throws* IOException, ClassNotFoundException
>      {
>            schemaString = input.readUTF();
>            comment = input.readUTF();
>            builder = *null*;
>            schema = *null*;
>            GRKryoSerializer.*preregisterSchema*(comment, getSchema());
>      }
>      *protected* *void* realWriteObject(ObjectOutputStream output)
>             *throws* IOException, ClassNotFoundException
>      {
>            output.writeUTF(schemaString);
>            output.writeUTF(comment);
>      }
>      *public* GRBuilder()
>      {
>      }
>      *public* GRBuilder(String comment , Schema s){
>            schemaString = s.toString();
>            builder = *null*;
>            *this*.comment = comment;
>            GRKryoSerializer.*preregisterSchema*(comment, s);
>      }
>      *public* *synchronized* GenericRecordBuilder getBuilder()
>      {
>            *if*(builder == *null*)
>            {
>                 builder = *new* GenericRecordBuilder(getSchema());
>            }
>            *return* builder;
>      }
>      *public* *synchronized* Schema getSchema()
>      {
>            *if*(schema == *null*)
>            {
>                 Schema.Parser p = *new* Schema.Parser();
>                 schema = p.parse(schemaString);
>            }
>            *return* schema;
>      }
> }
> Our Mappers and such use the GRBuilder on the FlatMap rich class for
> example to get a Builder to create the output records for collection. We
> need to have A GRBUilder for each possible genericrecord schema as a
> variable on a Map object.
> If we were torefactor this using the GenericTypeInfo or AvroSerializer,
> how would you suggest doing it?
> Thanks
> *From:* Stephan Ewen [mailto:se...@apache.org]
> *Sent:* Thursday, March 02, 2017 3:07 PM
> *To:* user@flink.apache.org; Aljoscha Krettek
> *Subject:* Re: Serialization performance
> Hi!
> Thanks for this writeup, very cool stuff !
> For part (1) - serialization: I think that can be made a bit nicer. Avro
> is a bit of an odd citizen in Flink, because Flink serialization is
> actually schema aware, but does not integrate with Avro. That's why Avro
> types go through Kryo.
> We should try and make Avro a first class citizen.
>   - The first step is to have a proper AvroSerializer. We have implemented
> one already, see "org.apache.flink.api.java.typeutils.runtime.AvroSerializer".
> It works with the ReflectDatumReader/Writer, but would be a good base line
> for all types of avro-based serializers in Flink..
>   - Then we need to figure out how the Avro Serializer is instantiated. We
> could just let the "GenericTypeInfo" create an Avro serializer for Avro
> types, and Kryo for all else.
>   - The change would eventually have to be behind a config flag in the
> ExecutionConfig (see "GenericTypeInfo.createSerializer()") to make sure
> we do not break the default serialization format within a major release
> version.
> A side note: If you actually use that through Beam, I am actually not sure
> what will happen, because as far as I know, Beam  uses its completely own
> serialization system and Flink sees only byte coders from Beam. Aljoscha
> can probably add more detail here.
> For part (2) - the filters: If I understand correctly, you "split" the
> data into different result sets that go to different sinks? The DataStream
> API has a "split/select" type of construct which would help there, the
> DataSet API does not have something like that. If you look for peak
> performance, the demux output format seems like a good workaround on the
> DataSet side.
> Greetings,
> Stephan
> On Thu, Mar 2, 2017 at 8:01 PM, Newport, Billy <billy.newp...@gs.com>
> wrote:
> We’ve been working on performance for the last while. We’re using flink
> 1.2 right now. We are writing batch jobs which process avro and parquet
> input files and produce parquet files.
> Flink serialization costs seem to be the most significant aspect of our
> wall clock time. We have written a custom kryo serializer for GenericRecord
> which is similar to the Spark one in that it reuses avro Datum reader and
> writers but writes a hash fingerprint for the schema instead of the schema
> itself.
> We have subclassed most of the Rich* classes in flink and now also pass to
> the constructors a Builder class which has the avro schema. When flink
> inflates these, the builders are inflated and preregister the avro schema
> for the hash code in a static map which the inflation/writing code uses
> later.
> This is working pretty well for us, it’s like 10-20x faster than just
> using GenericRecords normally. The question is this: Is this the right way
> to do this? If it is then we can contribute it and then how to modify beam
> so that it uses this stuff under the covers, we can’t use beam at all right
> now as far as I can tell because of the performance issues with
> GenericRecord.
> The other performance optimization is basically removing filters which
> again seem to double wall clock time. We wrote an embedded demux
> outputformat which receives a Tuple<Enum,GenericRecord> and writes to a
> different parquet file depending on Enum. This was 2x faster than a naïve 4
> filters going to 4 parquet outputformats.
> Do these optimizations seem unnecessary to some? Is there some trick we’re
> missing?

Reply via email to