We use Spark to build graphs of events after querying cassandra. We use
mapPartition for both aggregating events and building two graphs per
partition. Graphs are returned as Tuple2 as follows : 

      val nodes = events.mapPartitions(part => {

      var nodeLeft : Node = null
      var nodeRight: Node = null

      part.foreach(seqEvents => {

        val (left, right) = // build graphs from seqEvents

        // Merge nodes
        nodeLeft = if( nodeLeft == null) left else nodeLeft.merge(left)
        nodeRight = if( nodeRight == null) right else nodeRight.merge(right)
      })

      if( nodeLeft == null || nodeRight == null) throw new
IllegalStateException("Left/Right node cannot be null")

      val l = (nodeLeft, nodeRight) :: Nil
      l.iterator
    })

    val graph = nodes.reduce((t1, t2) => {
      val (left1, right1) = t1
      val (left2, right2) = t2
      if( left1 == null ) throw new IllegalStateException("Left1 node cannot
be null")
      if( left2 == null ) throw new IllegalStateException("Left2 node cannot
be null") //  The exception is always thrown
       // Always TRUE
      if( right1 == null ) throw new IllegalStateException("Right1 node
cannot be null")
      if( right2 == null ) throw new IllegalStateException("Right2 node
cannot be null")
      (left1.merge(left2), right1.merge(right2))
    })


Tuples of graphs are serialized correctly (we use a custom serializer).
However, in the reduce method we get null instead of the right graph.

The problem seems to be cause by the Kryo Serialization. In fact, when we
used java serialization Tuples are deserialized correctly.

In addition, using two mapPartition/reduce, the first one for left graphs
and the second one for right graphs the objects are deserialized correctly
by kryo.

This is our serializer class : 

    public class NodeSerializer  extends Serializer<Node> {
    
    /**
     * {@inheritDoc}
     */
    @Override
    public void write(Kryo kryo, Output output, Node object) {
        
        LinkedList<Node> nQueue = new LinkedList<>();
        nQueue.add(object);
        
        LinkedList<Integer> sQueue = new LinkedList<>();

        Iterator<Node> it = nQueue.iterator();

        Integer eol = 1;
        while (it.hasNext()) {
            eol--;
            if( eol < 0) {
                eol = sQueue.poll();
                output.writeInt(eol);
                continue;
            }
            Node o = nQueue.poll();
            nQueue.addAll(o.getChildren());
            sQueue.add(o.getChildren().size());
            kryo.writeObject(output, o.getEvent());
        }
        output.writeInt(0); // EOF
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public Node read(Kryo kryo, Input input, Class<Node> type) {
        
        Event object = kryo.readObject(input, Event.class);
        Node root = new Node(object);
        Node leaf = root;
        int eol = input.readInt();

        LinkedList<Node> queue = new LinkedList<>();

        boolean eof = eol == 0;
        while( ! eof ) {
            if( eol == 0 ) {
                eol  = input.readInt();
                leaf = queue.poll();
                eof = eol == 0;
                continue;
            }
            object = kryo.readObject(input, Event.class);
            queue.add(leaf.add(object));
            eol--;
        }
        return root;
    }
}

Thanks you


 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kryo-read-method-never-called-before-reducing-tp22786.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to