Hi Till,

I've created a simple (Java) example to show you what's going on. The code
is in attachment and shown below. This example creates simple graph with
Double EV and Long VV. Then it runs Summarization, that should compute a
condensed version of the input graph by grouping vertices and edges based
on their values. I run this code with IntelliJ IDEA. The code executes fine
until you want to see what is written in resulted edges (just uncomment
line 46, edgesOut.print();). Then it throws the following Exception:

_________EXCEPTION START_____________
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:830)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:773)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:773)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.types.NullFieldException: Field 2 is null, but
expected to hold a value.
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:126)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
at
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
at
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:83)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:85)
at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at
org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
at
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at
org.apache.flink.graph.library.Summarization$VertexGroupReducer.reduce(Summarization.java:323)
at
org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:590)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at
org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:64)
at
org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:27)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
... 15 more

_____________EXCEPTION END__________________

It looks like the problem is in the following lines in Summarization:

DataSet<Edge<K, EV>> edgesForGrouping = input.getEdges()
      .join(vertexToRepresentativeMap)
      .where(0)  // source vertex id
      .equalTo(0) // vertex id
      .with(new SourceVertexJoinFunction<K, EV>())
      .join(vertexToRepresentativeMap)
      .where(1)  // target vertex id
      .equalTo(0) // vertex id
      .with(new TargetVertexJoinFunction<K, EV>());


If you try to print edges before this step, it works fine. But after this
step my IDE gives the same exception.

I would really appreciate any help.

Thank you,
Olga

_________EXAMPLE START_____________________

package org.apache.flink.graph.examples;

import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.library.Summarization;
import java.util.LinkedList;
import java.util.List;

public class MySummarizationExample implements ProgramDescription {

    @SuppressWarnings("serial")
    public static void main(String [] args) throws Exception {

        ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

        //Create graph
        DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
        DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
        Graph<Long, Long, Double> graph = Graph.fromDataSet(vertices,
edges, env);

        //emit input
        System.out.println("Executing example with following
inputs:\n"+"Vertices:\n");
        vertices.print();
        System.out.println("Edges:\n");
        edges.print();

        Graph<Long, Summarization.VertexValue<Long>,
Summarization.EdgeValue<Double>> result = graph
                .run(new Summarization<Long, Long, Double>());

        //now we want to read the output
        DataSet<Edge<Long, Summarization.EdgeValue<Double>>> edgesOut
= result.getEdges();
        DataSet<Vertex<Long, Summarization.VertexValue<Long>>>
verticesOut = result.getVertices();

        // emit result
        System.out.println("Summarized graph:\n"+"Vertices:\n");
        verticesOut.print();
        System.out.println("Edges:\n");
        edgesOut.print();
    }

    @Override
    public String getDescription() {
        return "Summarization Example";
    }

    //Define edges
    private static DataSet<Edge<Long, Double>>
getEdgeDataSet(ExecutionEnvironment env) {
        Object[][] DEFAULT_EDGES = new Object[][] {
                new Object[]{1L, 2L, 1.0},
                new Object[]{1L, 4L, 3.0},
                new Object[]{2L, 3L, 6.0},
                new Object[]{2L, 4L, 5.0},
                new Object[]{2L, 5L, 1.0},
                new Object[]{3L, 5L, 5.0},
                new Object[]{3L, 6L, 2.0},
                new Object[]{4L, 5L, 1.0},
                new Object[]{5L, 6L, 4.0}
        };
        List<Edge<Long, Double>> edgeList = new LinkedList<Edge<Long,
Double>>();
        for (Object[] edge : DEFAULT_EDGES) {
            edgeList.add(new Edge<Long, Double>((Long) edge[0], (Long)
edge[1], (Double) edge[2]));
        }
        return env.fromCollection(edgeList);
    }
    //Define vertices
    private static DataSet<Vertex<Long, Long>>
getVertexDataSet(ExecutionEnvironment env) {
        //We will summarize by <VV> = Long
        Object[][] DEFAULT_VERTICES = new Object[][] {
                new Object[]{1L, 1L},
                new Object[]{2L, 1L},
                new Object[]{3L, 5L},
                new Object[]{4L, 5L},
                new Object[]{5L, 5L}
        };
        List<Vertex<Long, Long>> vertexList = new
LinkedList<Vertex<Long, Long>>();
        for (Object[] vertex : DEFAULT_VERTICES) {
            vertexList.add(new Vertex<Long, Long>((Long) vertex[0],
(Long) vertex[1]));
        }
        return env.fromCollection(vertexList);
    }
}

_________EXAMPLE END_____________________


Best regards,
Olga Golovneva

On Thu, Sep 15, 2016 at 9:16 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Olga,
>
> can you provide us with a little bit more details about the problem. The
> full stack trace of the exception and the program you're trying to run
> would be helpful.
>
> Cheers,
> Till
>
> On Wed, Sep 14, 2016 at 9:49 PM, Olga Golovneva <melcha...@gmail.com>
> wrote:
>
> > Hi devs,
> >
> > Do you know if there is an example (besides ITCase) of usage of
> > Summarization Library in Gelly? I'm having some problems trying to use it
> > in my code. Particularly, I cannot print output edges ( it throws the
> > following exception: Exception in thread "main"
> > org.apache.flink.runtime.client.JobExecutionException: Job execution
> > failed.), while vertices are printed correctly.
> >
> > Best regards,
> > Olga
> >
>

Reply via email to