I'll try and add more details in a bit. If you have some suggestions on how to make the serialization stack more extensible, please let us know!
Some hooks exist, like TypeInfoFactories: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html#defining-type-information-using-a-factory But I think that hook does not work for Avro... On Tue, Mar 7, 2017 at 1:25 PM, Newport, Billy <billy.newp...@gs.com> wrote: > I need more details, flink does not appear to be really designed to add in > serializers in a ‘nice’ way as far as I can tell, it’s kind of hardcoded > for Kryo right now. > > > > > > *From:* Stephan Ewen [mailto:se...@apache.org] > *Sent:* Tuesday, March 07, 2017 6:21 AM > *To:* user@flink.apache.org > *Subject:* Re: Serialization performance > > > > Hi Billy! > > > > Out of curiosity: Were you able to hack some direct Avro support as I > described in the brief writeup, or do you need some more details? > > > > Stephan > > > > On Fri, Mar 3, 2017 at 2:38 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > > Hi Billy, > > on the Beam side, you probably have looked into writing your own Coder > (the equivalent of a TypeSerializer in Flink). If yes, did that not work > out for you? And if yes, why? > > > > Best, > > Aljoscha > > > > > > On Thu, Mar 2, 2017, at 22:02, Stephan Ewen wrote: > > Hi! > > > > I can write some more details later, here the short answer: > > > > - Your serializer would do into something like the AvroSerializer > > > > - When you instantiate the AvroSerializer in GenericTypeInfo. > createSerializer(ExecutionConfig), you pre-register the type of the > generic type, plus all types in "ExecutionConfig.getRegisteredKryoTypes()" > > (we abuse the Kryo type registration here as an Avro type registration > initially, would have to polish that later) > > > > - The registered types are classes, but since they are Avro types, you > should be able to get their schema (for Reflect Data or so) > > > > That way, Flink would internally forward all the registrations for you > (similar as it forwards Kryo registrations) and you don't have to manually > do that. > > > > Stephan > > > > > > On Thu, Mar 2, 2017 at 9:31 PM, Newport, Billy <billy.newp...@gs.com> > wrote: > > This is what we’re using as our serializer: > > > > Somewhere: > > > > env.addDefaultKryoSerializer(Record.*class*, GRKryoSerializer. > *class*); > > > > then > > > > *public* *class* GRKryoSerializer *extends* Serializer<GenericData.Record> > > { > > *static* *class* AvroStuff > > { > > Schema schema; > > String comment; > > *long* key; > > DatumReader<GenericRecord> reader; > > DatumWriter<GenericRecord> writer; > > } > > *static* Map<Long, AvroStuff> *schemaMap* = *new* > ConcurrentHashMap<>(); > > *static* Map<Schema, Long> *schemaToFingerprintMap* = *new* > ConcurrentHashMap<>(); > > *static* Logger *log* = Logger.*getLogger*(GRKryoSerializer.*class*. > getName()); > > > > > > *static* *public* *void* preregisterSchema(String comment, Schema > schema) > > { > > *if*(!*schemaToFingerprintMap*.containsKey(schema)){ > > *long* fingerprint = SchemaNormalization. > *parsingFingerprint64*(schema); > > AvroStuff stuff = *new* AvroStuff(); > > stuff.schema = schema; > > stuff.comment = comment; > > stuff.key = fingerprint; > > stuff.reader = *new* GenericDatumReader<>(schema); > > stuff.writer = *new* GenericDatumWriter<>(schema); > > *log*.info(String.*format*("Preregistering schema for %s > with fingerprint %d", comment, fingerprint)); > > *schemaMap*.put(fingerprint, stuff); > > *schemaToFingerprintMap*.put(schema, fingerprint); > > } > > } > > > > *public* GRKryoSerializer() { > > } > > > > *static* *public* *void* clearSchemaCache() > > { > > *schemaToFingerprintMap*.clear(); > > *schemaMap*.clear(); > > } > > > > *static* *public* AvroStuff getStuffFor(GenericRecord o) > > { > > *return* *getStuffFor*(o.getSchema()); > > } > > > > *static* *public* AvroStuff getStuffFor(Schema schema) > > { > > Long fingerprint = *schemaToFingerprintMap*.get(schema); > > *if*(fingerprint == *null*) > > { > > > > fingerprint = SchemaNormalization.*parsingFingerprint64*( > schema); > > *log*.info(String.*format*("No fingerprint. Generated %d > for schema %s", fingerprint, schema.toString(*true*))); > > *schemaToFingerprintMap*.put(schema, fingerprint); > > > > *throw* *new* RuntimeException("Unknown schema " + schema > .toString(*true*)); > > > > } > > *return* *schemaMap*.get(fingerprint); > > } > > > > @Override > > *public* *void* write(Kryo kryo, Output output, GenericData.Record > object) > > { > > AvroStuff stuff = *getStuffFor*(object); > > > > BinaryEncoder encoder = EncoderFactory.*get*().binaryEncoder( > output, *null*); > > *try* { > > // write the schema key not the schema > > encoder.writeLong(stuff.key); > > // write the binary version of the fields only > > stuff.writer.write(object, encoder); > > encoder.flush(); > > } *catch* (IOException e) > > { > > *throw* *new* RuntimeException(e); > > } > > } > > > > @Override > > *public* GenericData.Record read(Kryo kryo, Input input, > Class<GenericData.Record> type) > > { > > BinaryDecoder decoder = DecoderFactory.*get*(). > directBinaryDecoder(input, *null*); > > *long* fingerPrint; > > *try* { > > // read the key > > fingerPrint = decoder.readLong(); > > // find it > > AvroStuff stuff = *schemaMap*.get(fingerPrint); > > // inflate using correct preregistered inflator > > *return* (Record) stuff.reader.read(*null*, decoder); > > } *catch* (IOException e) { > > *throw* *new* RuntimeException(e); > > } > > } > > > > > > } > > > > We add an instance of one of these to all our Flink Rich operations: > > > > > > *public* *class* GRBuilder *implements* Serializable { > > *public* String getComment() { > > *return* comment; > > } > > > > *public* *void* setSchema(Schema schema) { > > *this*.schema = schema; > > } > > > > /** > > * > > */ > > *private* *static* *final* *long* *serialVersionUID* = > -3441080975441473751L; > > String schemaString; > > String comment; > > > > *transient* GenericRecordBuilder builder = *null*; > > *transient* Schema schema = *null*; > > > > *public* *void* registerSchema(){ > > GRKryoSerializer.*preregisterSchema*(comment, getSchema()); > > } > > > > *private* *void* readObject(ObjectInputStream input) > > *throws* IOException, ClassNotFoundException > > { > > realReadObject(input); > > } > > > > *private* *void* writeObject(ObjectOutputStream output) > > *throws* IOException, ClassNotFoundException > > { > > realWriteObject(output); > > } > > > > // Ensure on inflation, the schema is registered against > > // the hashcode locally so we can inflate that type > > > > *protected* *void* realReadObject(ObjectInputStream input) > > *throws* IOException, ClassNotFoundException > > { > > schemaString = input.readUTF(); > > comment = input.readUTF(); > > builder = *null*; > > schema = *null*; > > GRKryoSerializer.*preregisterSchema*(comment, getSchema()); > > } > > > > *protected* *void* realWriteObject(ObjectOutputStream output) > > *throws* IOException, ClassNotFoundException > > { > > output.writeUTF(schemaString); > > output.writeUTF(comment); > > } > > > > *public* GRBuilder() > > { > > } > > > > *public* GRBuilder(String comment , Schema s){ > > schemaString = s.toString(); > > builder = *null*; > > *this*.comment = comment; > > > > GRKryoSerializer.*preregisterSchema*(comment, s); > > } > > > > *public* *synchronized* GenericRecordBuilder getBuilder() > > { > > *if*(builder == *null*) > > { > > builder = *new* GenericRecordBuilder(getSchema()); > > } > > *return* builder; > > } > > > > *public* *synchronized* Schema getSchema() > > { > > *if*(schema == *null*) > > { > > Schema.Parser p = *new* Schema.Parser(); > > schema = p.parse(schemaString); > > } > > *return* schema; > > } > > } > > > > Our Mappers and such use the GRBuilder on the FlatMap rich class for > example to get a Builder to create the output records for collection. We > need to have A GRBUilder for each possible genericrecord schema as a > variable on a Map object. > > > > If we were torefactor this using the GenericTypeInfo or AvroSerializer, > how would you suggest doing it? > > > > Thanks > > > > > > *From:* Stephan Ewen [mailto:se...@apache.org] > *Sent:* Thursday, March 02, 2017 3:07 PM > *To:* user@flink.apache.org; Aljoscha Krettek > *Subject:* Re: Serialization performance > > > > Hi! > > > > Thanks for this writeup, very cool stuff ! > > > > For part (1) - serialization: I think that can be made a bit nicer. Avro > is a bit of an odd citizen in Flink, because Flink serialization is > actually schema aware, but does not integrate with Avro. That's why Avro > types go through Kryo. > > > > We should try and make Avro a first class citizen. > > - The first step is to have a proper AvroSerializer. We have implemented > one already, see "org.apache.flink.api.java.typeutils.runtime.AvroSerializer". > It works with the ReflectDatumReader/Writer, but would be a good base line > for all types of avro-based serializers in Flink.. > > > > - Then we need to figure out how the Avro Serializer is instantiated. We > could just let the "GenericTypeInfo" create an Avro serializer for Avro > types, and Kryo for all else. > > - The change would eventually have to be behind a config flag in the > ExecutionConfig (see "GenericTypeInfo.createSerializer()") to make sure > we do not break the default serialization format within a major release > version. > > > > > > A side note: If you actually use that through Beam, I am actually not sure > what will happen, because as far as I know, Beam uses its completely own > serialization system and Flink sees only byte coders from Beam. Aljoscha > can probably add more detail here. > > > > > > For part (2) - the filters: If I understand correctly, you "split" the > data into different result sets that go to different sinks? The DataStream > API has a "split/select" type of construct which would help there, the > DataSet API does not have something like that. If you look for peak > performance, the demux output format seems like a good workaround on the > DataSet side. > > > > > > Greetings, > > Stephan > > > > > > > > On Thu, Mar 2, 2017 at 8:01 PM, Newport, Billy <billy.newp...@gs.com> > wrote: > > We’ve been working on performance for the last while. We’re using flink > 1.2 right now. We are writing batch jobs which process avro and parquet > input files and produce parquet files. > > > > Flink serialization costs seem to be the most significant aspect of our > wall clock time. We have written a custom kryo serializer for GenericRecord > which is similar to the Spark one in that it reuses avro Datum reader and > writers but writes a hash fingerprint for the schema instead of the schema > itself. > > > > We have subclassed most of the Rich* classes in flink and now also pass to > the constructors a Builder class which has the avro schema. When flink > inflates these, the builders are inflated and preregister the avro schema > for the hash code in a static map which the inflation/writing code uses > later. > > > > This is working pretty well for us, it’s like 10-20x faster than just > using GenericRecords normally. The question is this: Is this the right way > to do this? If it is then we can contribute it and then how to modify beam > so that it uses this stuff under the covers, we can’t use beam at all right > now as far as I can tell because of the performance issues with > GenericRecord. > > > > The other performance optimization is basically removing filters which > again seem to double wall clock time. We wrote an embedded demux > outputformat which receives a Tuple<Enum,GenericRecord> and writes to a > different parquet file depending on Enum. This was 2x faster than a naïve 4 > filters going to 4 parquet outputformats. > > > > Do these optimizations seem unnecessary to some? Is there some trick we’re > missing? > > > > > > > > >