Hi Max,

thanks for the detailed answer-
That was exactly what I have been looking for.
I switched the serialization from kryo to use the Value interface instead, 
keeping everything constant basically halved the execution time - nice.

One note - iterating over the array in serial fashion works, but smells 
inefficient.
Would it make sense to do bulk reading using some byte buffer - or is that not 
needed as the DataInputView is backed by the memory manager and that would 
cause issues?

Johannes
________________________________________
Von: Max Michels <m...@apache.org>
Gesendet: Dienstag, 24. Februar 2015 18:11
An: dev@flink.apache.org
Betreff: Re: Operating on Serialized Data

Apparently, the mailing list doesn't allow attachments.

Here the example with syntax highlighting:
https://gist.github.com/mxm/d1929b4b69dda87d5c37


public class CustomSerializer {

   public static class Vector implements Value {

      private transient double[] doubleValues;

      public Vector() {
      }

      public Vector(double[] doubleValues) {
         this.doubleValues = doubleValues;
      }

      public double getElement(int position) {
         return doubleValues[position];
      }

      public void setElement(double value, int position) {
         doubleValues[position] = value;
      }

      public void multiply(int factor) {
         for (int i = 0; i < doubleValues.length; i++) {
            doubleValues[i] *= factor;
         }
      }

      @Override
      public void write(DataOutputView out) throws IOException {
         out.writeInt(doubleValues.length);
         for (double value : doubleValues) {
            out.writeDouble(value);
         }
      }

      @Override
      public void read(DataInputView in) throws IOException {
         int length = in.readInt();
         double[] array = new double[length];
         for (int i = 0; i < length; i++) {
            array[i] = in.readDouble();
         }
         this.doubleValues = array;
      }

      @Override
      public String toString() {
         return "Vector{" +
               "doubleValues=" + Arrays.toString(doubleValues) +
               '}';
      }
   }

   public static void main(String[] args) throws Exception {
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      Vector[] vectorList = new Vector[1024];

      // create some sample data
      for (int v = 0; v < vectorList.length; v++) {
         double[] arr = new double[128];
         for (int i = 0; i < arr.length; i++) {
            arr[i] = i * 1.23 * v;
         }
         vectorList[v] = new Vector(arr);
      }

      // create data set
      DataSet<Vector> source = env.fromElements(vectorList);

      // multiply all vectors by 2
      DataSet<Vector> ds = source.map(new MapFunction<Vector, Vector>() {
         private static final long serialVersionUID = -1511665386949403921L;

         @Override
         public Vector map(Vector value) throws Exception {
            value.multiply(2);
            return value;
         }
      });

      ds.print();

      env.execute();

   }
}


On Tue, Feb 24, 2015 at 5:43 PM, Max Michels <m...@apache.org> wrote:
> Hi Johannes,
>
> Thanks for your question. You can try to implement the Value interface
> for your Vector PoJo. It has to have an empty constructor and
> implement the write and read methods of the interface for
> serialization.
>
> Based on your description, I've implemented an example to demonstrate
> the use of the Value interface. It would be interesting to hear from
> you whether you could decrease the serialization time using this
> serialization method.
>
> Best regards,
> Max
>
>
> On Tue, Feb 24, 2015 at 11:13 AM, Kirschnick, Johannes
> <johannes.kirschn...@tu-berlin.de> wrote:
>> Hi list,
>>
>>
>> I have a general question on as to whether it's possible to significantly 
>> speed up the processing by cutting down on the serialization costs during 
>> iterations.
>>
>>
>> The basic setup that I have are a couple of vectors that are repeatedly 
>> mutated (added & multiplied) as part of an iterative run within a reducer.
>>
>> A vector is basically "just" an array of doubles - all of the same size.
>>
>>
>> I noticed during simple profiling that roughly 50% of the execution time is 
>> spent on serializing the data in using the 
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers in Kryo.
>>
>>
>> I know that any custom operation would would varant custom processing, but 
>> given the serialization contributes such a large amount of processing time 
>> to the overall runtime it might very well be worthwhile
>>
>>
>> Is that currently exposed in any fashion to the user code, or are there some 
>> hooks I could look into?
>>
>>
>> Thanks
>>
>> Johannes

Reply via email to