Hi Billy, on the Beam side, you probably have looked into writing your own Coder (the equivalent of a TypeSerializer in Flink). If yes, did that not work out for you? And if yes, why?
Best, Aljoscha On Thu, Mar 2, 2017, at 22:02, Stephan Ewen wrote: > Hi! > > I can write some more details later, here the short answer: > > - Your serializer would do into something like the AvroSerializer > > - When you instantiate the AvroSerializer in > GenericTypeInfo.createSerializer(ExecutionConfig), you pre- > register the type of the generic type, plus all types in > "ExecutionConfig.getRegisteredKryoTypes()" > (we abuse the Kryo type registration here as an Avro type > registration initially, would have to polish that later) > > - The registered types are classes, but since they are Avro types, > you should be able to get their schema (for Reflect Data or so) > > That way, Flink would internally forward all the registrations for you > (similar as it forwards Kryo registrations) and you don't have to > manually do that. > > Stephan > > > On Thu, Mar 2, 2017 at 9:31 PM, Newport, Billy > <billy.newp...@gs.com> wrote: >> This is what we’re using as our serializer:____ >> __ __ >> Somewhere:____ >> __ __ >> env.addDefaultKryoSerializer(Record.*class*, >> GRKryoSerializer.*class*);____ >> __ __ >> then____ >> __ __ >> *public* *class* GRKryoSerializer *extends* >> Serializer<GenericData.Record>____ >> {____ >> *static* *class* AvroStuff____ >> {____ >> Schema schema;____ >> String comment;____ >> *long* key;____ >> DatumReader<GenericRecord> reader;____ >> DatumWriter<GenericRecord> writer;____ >> }____ >> *static* Map<Long, AvroStuff> *schemaMap* = *new* >> ConcurrentHashMap<>();____ >> *static* Map<Schema, Long> *schemaToFingerprintMap* = *new* >> ConcurrentHashMap<>();____ >> *static* Logger *log* = >> Logger.*getLogger*(GRKryoSerializer.*class*.getName());____ >> ____ >> ____ >> *static* *public* *void* preregisterSchema(String comment, >> Schema schema)____ >> {____ >> *if*(!*schemaToFingerprintMap*.containsKey(schema)){____ >> *long* fingerprint = SchemaNormalization.*parsingFin- >> gerprint64*(schema);____ >> AvroStuff stuff = *new* AvroStuff();____ >> stuff.schema = schema;____ >> stuff.comment = comment;____ >> stuff.key = fingerprint;____ >> stuff.reader = *new* >> GenericDatumReader<>(schema);____ >> stuff.writer = *new* >> GenericDatumWriter<>(schema);____ >> *log*.info(String.*format*("Preregistering schema for >> %s with fingerprint %d", comment, fingerprint));____ >> *schemaMap*.put(fingerprint, stuff);____ >> *schemaToFingerprintMap*.put(schema, >> fingerprint);____ >> }____ >> }____ >> ____ >> *public* GRKryoSerializer() {____ >> }____ >> __ __ >> *static* *public* *void* clearSchemaCache()____ >> {____ >> *schemaToFingerprintMap*.clear();____ >> *schemaMap*.clear();____ >> }____ >> ____ >> *static* *public* AvroStuff getStuffFor(GenericRecord o)____ >> {____ >> *return* *getStuffFor*(o.getSchema());____ >> }____ >> ____ >> *static* *public* AvroStuff getStuffFor(Schema schema)____ >> {____ >> Long fingerprint = >> *schemaToFingerprintMap*.get(schema);____ >> *if*(fingerprint == *null*)____ >> {____ >> ____ >> fingerprint = SchemaNormalization.*parsingFingerprin- >> t64*(schema);____ >> *log*.info(String.*format*("No fingerprint. Generated >> %d for schema %s", fingerprint, >> schema.toString(*true*)));____ >> *schemaToFingerprintMap*.put(schema, >> fingerprint);____ >> ____ >> *throw* *new* RuntimeException("Unknown schema " + >> schema.toString(*true*));____ >> ____ >> }____ >> *return* *schemaMap*.get(fingerprint);____ >> }____ >> ____ >> @Override____ >> *public* *void* write(Kryo kryo, Output output, >> GenericData.Record object) ____ >> {____ >> AvroStuff stuff = *getStuffFor*(object);____ >> ____ >> BinaryEncoder encoder = >> EncoderFactory.*get*().binaryEncoder(output, *null*);____ >> *try* {____ >> // write the schema key not the schema____ >> encoder.writeLong(stuff.key);____ >> // write the binary version of the fields only____ >> stuff.writer.write(object, encoder);____ >> encoder.flush();____ >> } *catch* (IOException e) ____ >> {____ >> *throw* *new* RuntimeException(e);____ >> }____ >> }____ >> __ __ >> @Override____ >> *public* GenericData.Record read(Kryo kryo, Input input, >> Class<GenericData.Record> type) ____ >> {____ >> BinaryDecoder decoder = >> DecoderFactory.*get*().directBinaryDecoder(input, >> *null*);____ >> *long* fingerPrint;____ >> *try* {____ >> // read the key____ >> fingerPrint = decoder.readLong();____ >> // find it____ >> AvroStuff stuff = *schemaMap*.get(fingerPrint);____ >> // inflate using correct preregistered inflator____ >> *return* (Record) stuff.reader.read(*null*, >> decoder);____ >> } *catch* (IOException e) {____ >> *throw* *new* RuntimeException(e);____ >> }____ >> }____ >> ____ >> ____ >> }____ >> __ __ >> We add an instance of one of these to all our Flink Rich >> operations:____ >> __ __ >> __ __ >> *public* *class* GRBuilder *implements* Serializable {____ >> *public* String getComment() {____ >> *return* comment;____ >> }____ >> __ __ >> *public* *void* setSchema(Schema schema) {____ >> *this*.schema = schema;____ >> }____ >> __ __ >> /**____ >> * ____ >> */____ >> *private* *static* *final* *long* **serialVersionUID** = - >> 3441080975441473751L;____ >> String schemaString;____ >> String comment;____ >> ____ >> *transient* GenericRecordBuilder builder = *null*;____ >> *transient* Schema schema = *null*;____ >> ____ >> *public* *void* registerSchema(){____ >> GRKryoSerializer.*preregisterSchema*(comment, >> getSchema());____ >> }____ >> ____ >> *private* *void* readObject(ObjectInputStream input)____ >> *throws* IOException, ClassNotFoundException ____ >> {____ >> realReadObject(input);____ >> }____ >> ____ >> *private* *void* writeObject(ObjectOutputStream output)____ >> *throws* IOException, ClassNotFoundException ____ >> {____ >> realWriteObject(output);____ >> }____ >> ____ >> // Ensure on inflation, the schema is registered against____ >> // the hashcode locally so we can inflate that type____ >> __ __ >> *protected* *void* realReadObject(ObjectInputStream input)____ >> *throws* IOException, ClassNotFoundException ____ >> {____ >> schemaString = input.readUTF();____ >> comment = input.readUTF();____ >> builder = *null*;____ >> schema = *null*;____ >> GRKryoSerializer.*preregisterSchema*(comment, >> getSchema());____ >> }____ >> ____ >> *protected* *void* realWriteObject(ObjectOutputStream >> output)____ >> *throws* IOException, ClassNotFoundException ____ >> {____ >> output.writeUTF(schemaString);____ >> output.writeUTF(comment);____ >> }____ >> ____ >> *public* GRBuilder() ____ >> {____ >> }____ >> __ __ >> *public* GRBuilder(String comment , Schema s){____ >> schemaString = s.toString();____ >> builder = *null*;____ >> *this*.comment = comment;____ >> ____ >> GRKryoSerializer.*preregisterSchema*(comment, s);____ >> }____ >> ____ >> *public* *synchronized* GenericRecordBuilder getBuilder()____ >> {____ >> *if*(builder == *null*)____ >> {____ >> builder = *new* >> GenericRecordBuilder(getSchema());____ >> }____ >> *return* builder;____ >> }____ >> ____ >> *public* *synchronized* Schema getSchema()____ >> {____ >> *if*(schema == *null*)____ >> {____ >> Schema.Parser p = *new* Schema.Parser();____ >> schema = p.parse(schemaString);____ >> }____ >> *return* schema;____ >> }____ >> }____ >> __ __ >> Our Mappers and such use the GRBuilder on the FlatMap rich class for >> example to get a Builder to create the output records for collection. >> We need to have A GRBUilder for each possible genericrecord schema as >> a variable on a Map object.____ >> __ __ >> If we were torefactor this using the GenericTypeInfo or >> AvroSerializer, how would you suggest doing it?____ >> __ __ >> Thanks____ >> __ __ >> __ __ >> *From:* Stephan Ewen [mailto:se...@apache.org] *Sent:* Thursday, >> March 02, 2017 3:07 PM *To:* user@flink.apache.org; Aljoscha Krettek >> *Subject:* Re: Serialization performance____ >> __ __ >> Hi!____ >> __ __ >> Thanks for this writeup, very cool stuff !____ >> __ __ >> For part (1) - serialization: I think that can be made a bit nicer. >> Avro is a bit of an odd citizen in Flink, because Flink serialization >> is actually schema aware, but does not integrate with Avro. That's >> why Avro types go through Kryo.____ >> __ __ >> We should try and make Avro a first class citizen.____ >> - The first step is to have a proper AvroSerializer. We have >> implemented one already, see >> "org.apache.flink.api.java.typeutils.runtime.AvroSerializer". It >> works with the ReflectDatumReader/Writer, but would be a good >> base line for all types of avro-based serializers in Flink..____ >> __ __ >> - Then we need to figure out how the Avro Serializer is >> instantiated. We could just let the "GenericTypeInfo" create an >> Avro serializer for Avro types, and Kryo for all else.____ >> - The change would eventually have to be behind a config flag in >> the ExecutionConfig (see "GenericTypeInfo.createSerializer()") to >> make sure we do not break the default serialization format within >> a major release version.____ >> __ __ >> __ __ >> A side note: If you actually use that through Beam, I am actually not >> sure what will happen, because as far as I know, Beam uses its >> completely own serialization system and Flink sees only byte coders >> from Beam. Aljoscha can probably add more detail here.____ >> __ __ >> __ __ >> For part (2) - the filters: If I understand correctly, you "split" >> the data into different result sets that go to different sinks? The >> DataStream API has a "split/select" type of construct which would >> help there, the DataSet API does not have something like that. If you >> look for peak performance, the demux output format seems like a good >> workaround on the DataSet side.____ >> __ __ >> __ __ >> Greetings,____ >> Stephan____ >> __ __ >> __ __ >> __ __ >> On Thu, Mar 2, 2017 at 8:01 PM, Newport, Billy <billy.newp...@gs.com> >> wrote:____ >> We’ve been working on performance for the last while. We’re using >> flink 1.2 right now. We are writing batch jobs which process avro and >> parquet input files and produce parquet files.____ >> ____ >> Flink serialization costs seem to be the most significant aspect of >> our wall clock time. We have written a custom kryo serializer for >> GenericRecord which is similar to the Spark one in that it reuses >> avro Datum reader and writers but writes a hash fingerprint for the >> schema instead of the schema itself.____ >> ____ >> We have subclassed most of the Rich* classes in flink and now also >> pass to the constructors a Builder class which has the avro schema. >> When flink inflates these, the builders are inflated and preregister >> the avro schema for the hash code in a static map which the >> inflation/writing code uses later.____ >> ____ >> This is working pretty well for us, it’s like 10-20x faster than just >> using GenericRecords normally. The question is this: Is this the >> right way to do this? If it is then we can contribute it and then how >> to modify beam so that it uses this stuff under the covers, we can’t >> use beam at all right now as far as I can tell because of the >> performance issues with GenericRecord.____ >> ____ >> The other performance optimization is basically removing filters >> which again seem to double wall clock time. We wrote an embedded >> demux outputformat which receives a Tuple<Enum,GenericRecord> and >> writes to a different parquet file depending on Enum. This was 2x >> faster than a naïve 4 filters going to 4 parquet outputformats.____ >> ____ >> Do these optimizations seem unnecessary to some? Is there some trick >> we’re missing?____ >> ____ >> __ __