Hi!

I can write some more details later, here the short answer:

  - Your serializer would do into something like the AvroSerializer

  - When you instantiate the AvroSerializer in
GenericTypeInfo.createSerializer(ExecutionConfig), you pre-register the
type of the generic type, plus all types in
"ExecutionConfig.getRegisteredKryoTypes()"
    (we abuse the Kryo type registration here as an Avro type registration
initially, would have to polish that later)

  - The registered types are classes, but since they are Avro types, you
should be able to get their schema (for Reflect Data or so)

That way, Flink would internally forward all the registrations for you
(similar as it forwards Kryo registrations) and you don't have to manually
do that.

Stephan


On Thu, Mar 2, 2017 at 9:31 PM, Newport, Billy <billy.newp...@gs.com> wrote:

> This is what we’re using as our serializer:
>
>
>
> Somewhere:
>
>
>
>            env.addDefaultKryoSerializer(Record.*class*, GRKryoSerializer.
> *class*);
>
>
>
> then
>
>
>
> *public* *class* GRKryoSerializer *extends* Serializer<GenericData.Record>
>
> {
>
>      *static* *class* AvroStuff
>
>      {
>
>            Schema schema;
>
>            String comment;
>
>            *long* key;
>
>            DatumReader<GenericRecord> reader;
>
>            DatumWriter<GenericRecord> writer;
>
>      }
>
>      *static* Map<Long, AvroStuff> *schemaMap* = *new*
> ConcurrentHashMap<>();
>
>      *static* Map<Schema, Long> *schemaToFingerprintMap* = *new*
> ConcurrentHashMap<>();
>
>      *static* Logger *log* = Logger.*getLogger*(GRKryoSerializer.*class*.
> getName());
>
>
>
>
>
>      *static* *public* *void* preregisterSchema(String comment, Schema
> schema)
>
>      {
>
>            *if*(!*schemaToFingerprintMap*.containsKey(schema)){
>
>                 *long* fingerprint = SchemaNormalization.
> *parsingFingerprint64*(schema);
>
>                 AvroStuff stuff = *new* AvroStuff();
>
>                 stuff.schema = schema;
>
>                 stuff.comment = comment;
>
>                 stuff.key = fingerprint;
>
>                 stuff.reader = *new* GenericDatumReader<>(schema);
>
>                 stuff.writer = *new* GenericDatumWriter<>(schema);
>
>                 *log*.info(String.*format*("Preregistering schema for %s
> with fingerprint %d", comment, fingerprint));
>
>                 *schemaMap*.put(fingerprint, stuff);
>
>                 *schemaToFingerprintMap*.put(schema, fingerprint);
>
>            }
>
>      }
>
>
>
>      *public* GRKryoSerializer() {
>
>      }
>
>
>
>      *static* *public* *void* clearSchemaCache()
>
>      {
>
>            *schemaToFingerprintMap*.clear();
>
>            *schemaMap*.clear();
>
>      }
>
>
>
>      *static* *public* AvroStuff getStuffFor(GenericRecord o)
>
>      {
>
>            *return* *getStuffFor*(o.getSchema());
>
>      }
>
>
>
>      *static* *public* AvroStuff getStuffFor(Schema schema)
>
>      {
>
>            Long fingerprint = *schemaToFingerprintMap*.get(schema);
>
>            *if*(fingerprint == *null*)
>
>            {
>
>
>
>                 fingerprint = SchemaNormalization.*parsingFingerprint64*(
> schema);
>
>                 *log*.info(String.*format*("No fingerprint. Generated %d
> for schema %s", fingerprint, schema.toString(*true*)));
>
>                 *schemaToFingerprintMap*.put(schema, fingerprint);
>
>
>
>                 *throw* *new* RuntimeException("Unknown schema " + schema
> .toString(*true*));
>
>
>
>            }
>
>            *return* *schemaMap*.get(fingerprint);
>
>      }
>
>
>
>      @Override
>
>      *public* *void* write(Kryo kryo, Output output, GenericData.Record
> object)
>
>      {
>
>            AvroStuff stuff = *getStuffFor*(object);
>
>
>
>            BinaryEncoder encoder = EncoderFactory.*get*().binaryEncoder(
> output, *null*);
>
>            *try* {
>
>                 // write the schema key not the schema
>
>                 encoder.writeLong(stuff.key);
>
>                 // write the binary version of the fields only
>
>                 stuff.writer.write(object, encoder);
>
>                 encoder.flush();
>
>            } *catch* (IOException e)
>
>            {
>
>                 *throw* *new* RuntimeException(e);
>
>            }
>
>      }
>
>
>
>      @Override
>
>      *public* GenericData.Record read(Kryo kryo, Input input,
> Class<GenericData.Record> type)
>
>      {
>
>            BinaryDecoder decoder = DecoderFactory.*get*().
> directBinaryDecoder(input, *null*);
>
>            *long* fingerPrint;
>
>            *try* {
>
>                 // read the key
>
>                 fingerPrint = decoder.readLong();
>
>                 // find it
>
>                 AvroStuff stuff = *schemaMap*.get(fingerPrint);
>
>                 // inflate using correct preregistered inflator
>
>                 *return* (Record) stuff.reader.read(*null*, decoder);
>
>            } *catch* (IOException e) {
>
>                 *throw* *new* RuntimeException(e);
>
>            }
>
>      }
>
>
>
>
>
> }
>
>
>
> We add an instance of one of these to all our Flink Rich operations:
>
>
>
>
>
> *public* *class* GRBuilder *implements* Serializable {
>
>      *public* String getComment() {
>
>            *return* comment;
>
>      }
>
>
>
>      *public* *void* setSchema(Schema schema) {
>
>            *this*.schema = schema;
>
>      }
>
>
>
>      /**
>
>      *
>
>       */
>
>      *private* *static* *final* *long* *serialVersionUID* =
> -3441080975441473751L;
>
>      String schemaString;
>
>      String comment;
>
>
>
>      *transient* GenericRecordBuilder builder = *null*;
>
>      *transient* Schema schema = *null*;
>
>
>
>      *public* *void* registerSchema(){
>
>            GRKryoSerializer.*preregisterSchema*(comment, getSchema());
>
>      }
>
>
>
>      *private* *void* readObject(ObjectInputStream input)
>
>             *throws* IOException, ClassNotFoundException
>
>      {
>
>            realReadObject(input);
>
>      }
>
>
>
>      *private* *void* writeObject(ObjectOutputStream output)
>
>             *throws* IOException, ClassNotFoundException
>
>      {
>
>            realWriteObject(output);
>
>      }
>
>
>
>      // Ensure on inflation, the schema is registered against
>
>      // the hashcode locally so we can inflate that type
>
>
>
>      *protected* *void* realReadObject(ObjectInputStream input)
>
>             *throws* IOException, ClassNotFoundException
>
>      {
>
>            schemaString = input.readUTF();
>
>            comment = input.readUTF();
>
>            builder = *null*;
>
>            schema = *null*;
>
>            GRKryoSerializer.*preregisterSchema*(comment, getSchema());
>
>      }
>
>
>
>      *protected* *void* realWriteObject(ObjectOutputStream output)
>
>             *throws* IOException, ClassNotFoundException
>
>      {
>
>            output.writeUTF(schemaString);
>
>            output.writeUTF(comment);
>
>      }
>
>
>
>      *public* GRBuilder()
>
>      {
>
>      }
>
>
>
>      *public* GRBuilder(String comment , Schema s){
>
>            schemaString = s.toString();
>
>            builder = *null*;
>
>            *this*.comment = comment;
>
>
>
>            GRKryoSerializer.*preregisterSchema*(comment, s);
>
>      }
>
>
>
>      *public* *synchronized* GenericRecordBuilder getBuilder()
>
>      {
>
>            *if*(builder == *null*)
>
>            {
>
>                 builder = *new* GenericRecordBuilder(getSchema());
>
>            }
>
>            *return* builder;
>
>      }
>
>
>
>      *public* *synchronized* Schema getSchema()
>
>      {
>
>            *if*(schema == *null*)
>
>            {
>
>                 Schema.Parser p = *new* Schema.Parser();
>
>                 schema = p.parse(schemaString);
>
>            }
>
>            *return* schema;
>
>      }
>
> }
>
>
>
> Our Mappers and such use the GRBuilder on the FlatMap rich class for
> example to get a Builder to create the output records for collection. We
> need to have A GRBUilder for each possible genericrecord schema as a
> variable on a Map object.
>
>
>
> If we were torefactor this using the GenericTypeInfo or AvroSerializer,
> how would you suggest doing it?
>
>
>
> Thanks
>
>
>
>
>
> *From:* Stephan Ewen [mailto:se...@apache.org]
> *Sent:* Thursday, March 02, 2017 3:07 PM
> *To:* user@flink.apache.org; Aljoscha Krettek
> *Subject:* Re: Serialization performance
>
>
>
> Hi!
>
>
>
> Thanks for this writeup, very cool stuff !
>
>
>
> For part (1) - serialization: I think that can be made a bit nicer. Avro
> is a bit of an odd citizen in Flink, because Flink serialization is
> actually schema aware, but does not integrate with Avro. That's why Avro
> types go through Kryo.
>
>
>
> We should try and make Avro a first class citizen.
>
>   - The first step is to have a proper AvroSerializer. We have implemented
> one already, see "org.apache.flink.api.java.typeutils.runtime.AvroSerializer".
> It works with the ReflectDatumReader/Writer, but would be a good base line
> for all types of avro-based serializers in Flink..
>
>
>
>   - Then we need to figure out how the Avro Serializer is instantiated. We
> could just let the "GenericTypeInfo" create an Avro serializer for Avro
> types, and Kryo for all else.
>
>   - The change would eventually have to be behind a config flag in the
> ExecutionConfig (see "GenericTypeInfo.createSerializer()") to make sure
> we do not break the default serialization format within a major release
> version.
>
>
>
>
>
> A side note: If you actually use that through Beam, I am actually not sure
> what will happen, because as far as I know, Beam  uses its completely own
> serialization system and Flink sees only byte coders from Beam. Aljoscha
> can probably add more detail here.
>
>
>
>
>
> For part (2) - the filters: If I understand correctly, you "split" the
> data into different result sets that go to different sinks? The DataStream
> API has a "split/select" type of construct which would help there, the
> DataSet API does not have something like that. If you look for peak
> performance, the demux output format seems like a good workaround on the
> DataSet side.
>
>
>
>
>
> Greetings,
>
> Stephan
>
>
>
>
>
>
>
> On Thu, Mar 2, 2017 at 8:01 PM, Newport, Billy <billy.newp...@gs.com>
> wrote:
>
> We’ve been working on performance for the last while. We’re using flink
> 1.2 right now. We are writing batch jobs which process avro and parquet
> input files and produce parquet files.
>
>
>
> Flink serialization costs seem to be the most significant aspect of our
> wall clock time. We have written a custom kryo serializer for GenericRecord
> which is similar to the Spark one in that it reuses avro Datum reader and
> writers but writes a hash fingerprint for the schema instead of the schema
> itself.
>
>
>
> We have subclassed most of the Rich* classes in flink and now also pass to
> the constructors a Builder class which has the avro schema. When flink
> inflates these, the builders are inflated and preregister the avro schema
> for the hash code in a static map which the inflation/writing code uses
> later.
>
>
>
> This is working pretty well for us, it’s like 10-20x faster than just
> using GenericRecords normally. The question is this: Is this the right way
> to do this? If it is then we can contribute it and then how to modify beam
> so that it uses this stuff under the covers, we can’t use beam at all right
> now as far as I can tell because of the performance issues with
> GenericRecord.
>
>
>
> The other performance optimization is basically removing filters which
> again seem to double wall clock time. We wrote an embedded demux
> outputformat which receives a Tuple<Enum,GenericRecord> and writes to a
> different parquet file depending on Enum. This was 2x faster than a naïve 4
> filters going to 4 parquet outputformats.
>
>
>
> Do these optimizations seem unnecessary to some? Is there some trick we’re
> missing?
>
>
>
>
>

Reply via email to