We use Spark to build graphs of events after querying cassandra. We use mapPartition for both aggregating events and building two graphs per partition. Graphs are returned as Tuple2 as follows :
val nodes = events.mapPartitions(part => { var nodeLeft : Node = null var nodeRight: Node = null part.foreach(seqEvents => { val (left, right) = // build graphs from seqEvents // Merge nodes nodeLeft = if( nodeLeft == null) left else nodeLeft.merge(left) nodeRight = if( nodeRight == null) right else nodeRight.merge(right) }) if( nodeLeft == null || nodeRight == null) throw new IllegalStateException("Left/Right node cannot be null") val l = (nodeLeft, nodeRight) :: Nil l.iterator }) val graph = nodes.reduce((t1, t2) => { val (left1, right1) = t1 val (left2, right2) = t2 if( left1 == null ) throw new IllegalStateException("Left1 node cannot be null") if( left2 == null ) throw new IllegalStateException("Left2 node cannot be null") // The exception is always thrown // Always TRUE if( right1 == null ) throw new IllegalStateException("Right1 node cannot be null") if( right2 == null ) throw new IllegalStateException("Right2 node cannot be null") (left1.merge(left2), right1.merge(right2)) }) Tuples of graphs are serialized correctly (we use a custom serializer). However, in the reduce method we get null instead of the right graph. The problem seems to be cause by the Kryo Serialization. In fact, when we used java serialization Tuples are deserialized correctly. In addition, using two mapPartition/reduce, the first one for left graphs and the second one for right graphs the objects are deserialized correctly by kryo. This is our serializer class : public class NodeSerializer extends Serializer<Node> { /** * {@inheritDoc} */ @Override public void write(Kryo kryo, Output output, Node object) { LinkedList<Node> nQueue = new LinkedList<>(); nQueue.add(object); LinkedList<Integer> sQueue = new LinkedList<>(); Iterator<Node> it = nQueue.iterator(); Integer eol = 1; while (it.hasNext()) { eol--; if( eol < 0) { eol = sQueue.poll(); output.writeInt(eol); continue; } Node o = nQueue.poll(); nQueue.addAll(o.getChildren()); sQueue.add(o.getChildren().size()); kryo.writeObject(output, o.getEvent()); } output.writeInt(0); // EOF } /** * {@inheritDoc} */ @Override public Node read(Kryo kryo, Input input, Class<Node> type) { Event object = kryo.readObject(input, Event.class); Node root = new Node(object); Node leaf = root; int eol = input.readInt(); LinkedList<Node> queue = new LinkedList<>(); boolean eof = eol == 0; while( ! eof ) { if( eol == 0 ) { eol = input.readInt(); leaf = queue.poll(); eof = eol == 0; continue; } object = kryo.readObject(input, Event.class); queue.add(leaf.add(object)); eol--; } return root; } } Thanks you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kryo-read-method-never-called-before-reducing-tp22786.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org