Hi Max, thanks for the detailed answer- That was exactly what I have been looking for. I switched the serialization from kryo to use the Value interface instead, keeping everything constant basically halved the execution time - nice.
One note - iterating over the array in serial fashion works, but smells inefficient. Would it make sense to do bulk reading using some byte buffer - or is that not needed as the DataInputView is backed by the memory manager and that would cause issues? Johannes ________________________________________ Von: Max Michels <m...@apache.org> Gesendet: Dienstag, 24. Februar 2015 18:11 An: dev@flink.apache.org Betreff: Re: Operating on Serialized Data Apparently, the mailing list doesn't allow attachments. Here the example with syntax highlighting: https://gist.github.com/mxm/d1929b4b69dda87d5c37 public class CustomSerializer { public static class Vector implements Value { private transient double[] doubleValues; public Vector() { } public Vector(double[] doubleValues) { this.doubleValues = doubleValues; } public double getElement(int position) { return doubleValues[position]; } public void setElement(double value, int position) { doubleValues[position] = value; } public void multiply(int factor) { for (int i = 0; i < doubleValues.length; i++) { doubleValues[i] *= factor; } } @Override public void write(DataOutputView out) throws IOException { out.writeInt(doubleValues.length); for (double value : doubleValues) { out.writeDouble(value); } } @Override public void read(DataInputView in) throws IOException { int length = in.readInt(); double[] array = new double[length]; for (int i = 0; i < length; i++) { array[i] = in.readDouble(); } this.doubleValues = array; } @Override public String toString() { return "Vector{" + "doubleValues=" + Arrays.toString(doubleValues) + '}'; } } public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Vector[] vectorList = new Vector[1024]; // create some sample data for (int v = 0; v < vectorList.length; v++) { double[] arr = new double[128]; for (int i = 0; i < arr.length; i++) { arr[i] = i * 1.23 * v; } vectorList[v] = new Vector(arr); } // create data set DataSet<Vector> source = env.fromElements(vectorList); // multiply all vectors by 2 DataSet<Vector> ds = source.map(new MapFunction<Vector, Vector>() { private static final long serialVersionUID = -1511665386949403921L; @Override public Vector map(Vector value) throws Exception { value.multiply(2); return value; } }); ds.print(); env.execute(); } } On Tue, Feb 24, 2015 at 5:43 PM, Max Michels <m...@apache.org> wrote: > Hi Johannes, > > Thanks for your question. You can try to implement the Value interface > for your Vector PoJo. It has to have an empty constructor and > implement the write and read methods of the interface for > serialization. > > Based on your description, I've implemented an example to demonstrate > the use of the Value interface. It would be interesting to hear from > you whether you could decrease the serialization time using this > serialization method. > > Best regards, > Max > > > On Tue, Feb 24, 2015 at 11:13 AM, Kirschnick, Johannes > <johannes.kirschn...@tu-berlin.de> wrote: >> Hi list, >> >> >> I have a general question on as to whether it's possible to significantly >> speed up the processing by cutting down on the serialization costs during >> iterations. >> >> >> The basic setup that I have are a couple of vectors that are repeatedly >> mutated (added & multiplied) as part of an iterative run within a reducer. >> >> A vector is basically "just" an array of doubles - all of the same size. >> >> >> I noticed during simple profiling that roughly 50% of the execution time is >> spent on serializing the data in using the >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers in Kryo. >> >> >> I know that any custom operation would would varant custom processing, but >> given the serialization contributes such a large amount of processing time >> to the overall runtime it might very well be worthwhile >> >> >> Is that currently exposed in any fashion to the user code, or are there some >> hooks I could look into? >> >> >> Thanks >> >> Johannes