Hi,I did not want to send this proposal out before the I have some initial
benchmarks, but this issue was mentioned on the mailing list (
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html),
and I wanted to make this information available to be able to incorporate
this into that discussion. I have written this draft with the help of Gábor
Gévay and Márton Balassi and I am open to every suggestion.


The proposal draft:
Code Generation in Serializers and Comparators of Apache Flink

I am doing my last semester of my MSc studies and I’m a former GSoC student
in the LLVM project. I plan to improve the serialization code in Flink
during this summer. The current implementation of the serializers can be a
performance bottleneck in some scenarios. These performance problems were
also reported on the mailing list recently [1]. I plan to implement code
generation into the serializers to improve the performance (as Stephan Ewen
also suggested.)

TODO: I plan to include some preliminary benchmarks in this section.
Performance problems with the current serializers

   1.

   PojoSerializer uses reflection for accessing the fields, which is slow
   (eg. [2])


   -

   This is also a serious problem for the comparators


   1.

   When deserializing fields of primitive types (eg. int), the reusing
   overload of the corresponding field serializers cannot really do any reuse,
   because boxed primitive types are immutable in Java. This results in lots
   of object creations. [3][7]
   2.

   The loop to call the field serializers makes virtual function calls,
   that cannot be speculatively devirtualized by the JVM or predicted by the
   CPU, because different serializer subclasses are invoked for the different
   fields. (And the loop cannot be unrolled, because the number of iterations
   is not a compile time constant.) See also the following discussion on the
   mailing list [1].
   3.

   A POJO field can have the value null, so the serializer inserts 1 byte
   null tags, which wastes space. (Also, the type extractor logic does not
   distinguish between primitive types and their boxed versions, so even an
   int field has a null tag.)
   4.

   Subclass tags also add a byte at the beginning of every POJO
   5.

   getLength() does not know the size in most cases [4]
   Knowing the size of a type when serialized has numerous performance
   benefits throughout Flink:
   1.

      Sorters can do in-place, when the type is small [5]
      2.

      Chaining hash tables do not need resizes, because they know how many
      buckets to allocate upfront [6]
      3.

      Different hash table architectures could be used, eg. open addressing
      with linear probing instead of some chaining
      4.

      It is possible to deserialize, modify, and then serialize back a
      record to its original place, because it cannot happen that the modified
      version does not fit in the place allocated there for the old
version (see
      CompactingHashTable and ReduceHashTable for concrete instances of this
      problem)


Note, that 2. and 3. are problems with not just the PojoSerializer, but
also with the TupleSerializer.
Solution approaches

   1.

   Run time code generation for every POJO


   -

      1. and 3 . would be automatically solved, if the serializers for
      POJOs would be generated on-the-fly (by, for example, Javassist)
      -

      2. also needs code generation, and also some extra effort in the type
      extractor to distinguish between primitive types and their boxed versions
      -

      could be used for PojoComparator as well (which could greatly
      increase the performance of sorting)


   1.

   Annotations on POJOs (by the users)


   -

      Concretely:
      -

         annotate fields that will never be nulls -> no null tag needed
         before every field!
         -

         make a POJO final -> no subclass tag needed
         -

         annotating a POJO that it will not be null -> no top level null
         tag needed
         -

      These would also help with the getLength problem (6.), because the
      length is often not known because currently anything can be null or a
      subclass can appear anywhere
      -

      These annotations could be done without code generation, but then
      they would add some overhead when there are no annotations
present, so this
      would work better together with the code generation
      -

      Tuples would become a special case of POJOs, where nothing can be
      null, and no subclass can appear, so maybe we could eliminate the
      TupleSerializer
      -

      We could annotate some internal types in Flink libraries (Gelly
      (Vertex, Edge), FlinkML)


TODO: what is the situation with Scala case classes? Run time code
generation is probably easier in Scala? (with quasiquotes)

About me

I am in the last year of my Computer Science MSc studies at Eotvos Lorand
University in Budapest, and planning to start a PhD in the autumn. I have
been working for almost three years at Ericsson on static analysis tools
for C++. In 2014 I participated in GSoC, working on the LLVM project, and I
am a frequent contributor ever since. The next summer I was interning at
Apple.

I learned about the Flink project not too long ago and I like it so far.
The last few weeks I was working on some tickets to familiarize myself with
the codebase:

https://issues.apache.org/jira/browse/FLINK-3422

https://issues.apache.org/jira/browse/FLINK-3322

https://issues.apache.org/jira/browse/FLINK-3457

My CV is available here: http://xazax.web.elte.hu/files/resume.pdf
References

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html

[2]
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369

[3]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java#L73

[4]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java#L98

[5]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java

[6]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java#L861
[7] https://issues.apache.org/jira/browse/FLINK-3277


Best Regards,

Gábor

Reply via email to