+1 for the either type solution :-) On Sat, Sep 17, 2016 at 10:49 AM, Martin Junghanns <m.jungha...@mailbox.org> wrote:
> Hi all, > > thanks for reporting the issue! I just looked into it. The > VertexGroupReduce outputs two semantically different tuples: one for each > vertex without the value (null) and one tuple representing the whole group > including the value. As Till pointed out, this crashes if the value has no > serializer for null values. > > Since we cannot have two different output types for the same GroupReduce > function, I propose using a Either<NullValue, VV> here. If there are no > objections I will take the issue and fix it. > > Again, thx for pointing it out. > > Best, > > Martin > > > On 15.09.2016 20:10, Vasiliki Kalavri wrote: > >> Hi, >> >> thanks for looking into this Till! I'm not quite sure what the algorithm >> behavior should be when the vertex value is null (probably skip the >> record?). Let's wait for Martin's input. >> >> Cheers, >> -V. >> >> On 15 September 2016 at 19:19, Olga Golovneva <melcha...@gmail.com> >> wrote: >> >> Hi Till, >>> >>> Thanks a lot for your help! I'll try to use another variable type in the >>> meantime. >>> >>> Best regards, >>> Olga >>> >>> >>> Best regards, >>> Olga Golovneva >>> >>> On Thu, Sep 15, 2016 at 1:03 PM, Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>> Hi Olga, >>>> >>>> it’s indeed an error in Flink’s Summarization algorithm. The problem is >>>> >>> the >>> >>>> following: The vertex group value of the VertexGroupItem is null in the >>>> VertexGroupReducer. This works in the SummarizationIT case because the >>>> vertex value is of type String and the StringSerializer can deal with >>>> >>> null >>> >>>> values. >>>> >>>> However, in your case where you use longs, it fails, because the >>>> LongSerializer cannot handle null values. You can verify this behaviour >>>> >>> by >>> >>>> changing the vertex value type to String. Then everything should work >>>> without a problem. >>>> >>>> I’ve cc’ed Martin who can tell you probably more about the Summarization >>>> algorithm. I’ve also opened a JIRA ticket [1] to fix this problem. >>>> >>>> Thanks for reporting this bug. >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-4624 >>>> >>>> Cheers, >>>> Till >>>> >>>> >>>> On Thu, Sep 15, 2016 at 5:05 PM, Olga Golovneva <melcha...@gmail.com> >>>> wrote: >>>> >>>> Hi Till, >>>>> >>>>> I've created a simple (Java) example to show you what's going on. The >>>>> >>>> code >>>> >>>>> is in attachment and shown below. This example creates simple graph >>>>> >>>> with >>> >>>> Double EV and Long VV. Then it runs Summarization, that should compute >>>>> >>>> a >>> >>>> condensed version of the input graph by grouping vertices and edges >>>>> >>>> based >>> >>>> on their values. I run this code with IntelliJ IDEA. The code executes >>>>> >>>> fine >>>> >>>>> until you want to see what is written in resulted edges (just uncomment >>>>> line 46, edgesOut.print();). Then it throws the following Exception: >>>>> >>>>> _________EXCEPTION START_____________ >>>>> Exception in thread "main" org.apache.flink.runtime. >>>>> >>>> client.JobExecutionException: >>>> >>>>> Job execution failed. >>>>> at org.apache.flink.runtime.jobmanager.JobManager$$ >>>>> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$ >>>>> mcV$sp(JobManager.scala:830) >>>>> at org.apache.flink.runtime.jobmanager.JobManager$$ >>>>> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply( >>>>> >>>> JobManager.scala:773) >>>> >>>>> at org.apache.flink.runtime.jobmanager.JobManager$$ >>>>> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply( >>>>> >>>> JobManager.scala:773) >>>> >>>>> at scala.concurrent.impl.Future$PromiseCompletingRunnable. >>>>> liftedTree1$1(Future.scala:24) >>>>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run( >>>>> Future.scala:24) >>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >>>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( >>>>> AbstractDispatcher.scala:401) >>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec( >>>>> >>>> ForkJoinTask.java:260) >>> >>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. >>>>> pollAndExecAll(ForkJoinPool.java:1253) >>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. >>>>> runTask(ForkJoinPool.java:1346) >>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker( >>>>> ForkJoinPool.java:1979) >>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( >>>>> ForkJoinWorkerThread.java:107) >>>>> Caused by: org.apache.flink.types.NullFieldException: Field 2 is null, >>>>> but expected to hold a value. >>>>> at org.apache.flink.api.java.typeutils.runtime. >>>>> >>>> TupleSerializer.serialize( >>>> >>>>> TupleSerializer.java:126) >>>>> at org.apache.flink.api.java.typeutils.runtime. >>>>> >>>> TupleSerializer.serialize( >>>> >>>>> TupleSerializer.java:30) >>>>> at org.apache.flink.runtime.plugable.SerializationDelegate.write( >>>>> SerializationDelegate.java:56) >>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>> SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:83) >>>>> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit( >>>>> RecordWriter.java:85) >>>>> at org.apache.flink.runtime.operators.shipping. >>>>> >>>> OutputCollector.collect( >>> >>>> OutputCollector.java:65) >>>>> at org.apache.flink.runtime.operators.util.metrics. >>>>> CountingCollector.collect(CountingCollector.java:35) >>>>> at org.apache.flink.api.java.operators.translation.PlanFilterOperator$ >>>>> FlatMapFilter.flatMap(PlanFilterOperator.java:51) >>>>> at org.apache.flink.runtime.operators.chaining. >>>>> ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) >>>>> at org.apache.flink.runtime.operators.util.metrics. >>>>> CountingCollector.collect(CountingCollector.java:35) >>>>> at org.apache.flink.graph.library.Summarization$ >>>>> >>>> VertexGroupReducer.reduce( >>>> >>>>> Summarization.java:323) >>>>> at org.apache.flink.runtime.operators.GroupReduceDriver. >>>>> run(GroupReduceDriver.java:131) >>>>> at org.apache.flink.runtime.operators.BatchTask.run( >>>>> >>>> BatchTask.java:486) >>> >>>> at org.apache.flink.runtime.operators.BatchTask.invoke( >>>>> >>>> BatchTask.java:351) >>>> >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:590) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> Caused by: java.lang.NullPointerException >>>>> at org.apache.flink.api.common.typeutils.base.LongSerializer. >>>>> serialize(LongSerializer.java:64) >>>>> at org.apache.flink.api.common.typeutils.base.LongSerializer. >>>>> serialize(LongSerializer.java:27) >>>>> at org.apache.flink.api.java.typeutils.runtime. >>>>> >>>> TupleSerializer.serialize( >>>> >>>>> TupleSerializer.java:124) >>>>> ... 15 more >>>>> >>>>> _____________EXCEPTION END__________________ >>>>> >>>>> It looks like the problem is in the following lines in Summarization: >>>>> >>>>> DataSet<Edge<K, EV>> edgesForGrouping = input.getEdges() >>>>> .join(vertexToRepresentativeMap) >>>>> .where(0) // source vertex id >>>>> .equalTo(0) // vertex id >>>>> .with(new SourceVertexJoinFunction<K, EV>()) >>>>> .join(vertexToRepresentativeMap) >>>>> .where(1) // target vertex id >>>>> .equalTo(0) // vertex id >>>>> .with(new TargetVertexJoinFunction<K, EV>()); >>>>> >>>>> >>>>> If you try to print edges before this step, it works fine. But after >>>>> >>>> this >>> >>>> step my IDE gives the same exception. >>>>> >>>>> I would really appreciate any help. >>>>> >>>>> Thank you, >>>>> Olga >>>>> >>>>> _________EXAMPLE START_____________________ >>>>> >>>>> package org.apache.flink.graph.examples; >>>>> >>>>> import org.apache.flink.api.common.ProgramDescription; >>>>> import org.apache.flink.api.java.DataSet; >>>>> import org.apache.flink.api.java.ExecutionEnvironment; >>>>> import org.apache.flink.graph.Edge; >>>>> import org.apache.flink.graph.Graph; >>>>> import org.apache.flink.graph.Vertex; >>>>> import org.apache.flink.graph.library.Summarization; >>>>> import java.util.LinkedList; >>>>> import java.util.List; >>>>> >>>>> public class MySummarizationExample implements ProgramDescription { >>>>> >>>>> @SuppressWarnings("serial") >>>>> public static void main(String [] args) throws Exception { >>>>> >>>>> ExecutionEnvironment env = ExecutionEnvironment. >>>>> >>>> getExecutionEnvironment(); >>>> >>>>> //Create graph >>>>> DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env); >>>>> DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env); >>>>> Graph<Long, Long, Double> graph = Graph.fromDataSet(vertices, >>>>> >>>> edges, env); >>>> >>>>> //emit input >>>>> System.out.println("Executing example with following >>>>> >>>> inputs:\n"+"Vertices:\n"); >>>> >>>>> vertices.print(); >>>>> System.out.println("Edges:\n"); >>>>> edges.print(); >>>>> >>>>> Graph<Long, Summarization.VertexValue<Long>, >>>>> >>>> Summarization.EdgeValue<Double>> result = graph >>>> >>>>> .run(new Summarization<Long, Long, Double>()); >>>>> >>>>> //now we want to read the output >>>>> DataSet<Edge<Long, Summarization.EdgeValue<Double>>> edgesOut >>>>> >>>> = >>> >>>> result.getEdges(); >>>> >>>>> DataSet<Vertex<Long, Summarization.VertexValue<Long>>> >>>>> >>>> verticesOut = result.getVertices(); >>>> >>>>> // emit result >>>>> System.out.println("Summarized graph:\n"+"Vertices:\n"); >>>>> verticesOut.print(); >>>>> System.out.println("Edges:\n"); >>>>> edgesOut.print(); >>>>> } >>>>> >>>>> @Override >>>>> public String getDescription() { >>>>> return "Summarization Example"; >>>>> } >>>>> >>>>> //Define edges >>>>> private static DataSet<Edge<Long, Double>> getEdgeDataSet( >>>>> >>>> ExecutionEnvironment >>> >>>> env) { >>>> >>>>> Object[][] DEFAULT_EDGES = new Object[][] { >>>>> new Object[]{1L, 2L, 1.0}, >>>>> new Object[]{1L, 4L, 3.0}, >>>>> new Object[]{2L, 3L, 6.0}, >>>>> new Object[]{2L, 4L, 5.0}, >>>>> new Object[]{2L, 5L, 1.0}, >>>>> new Object[]{3L, 5L, 5.0}, >>>>> new Object[]{3L, 6L, 2.0}, >>>>> new Object[]{4L, 5L, 1.0}, >>>>> new Object[]{5L, 6L, 4.0} >>>>> }; >>>>> List<Edge<Long, Double>> edgeList = new LinkedList<Edge<Long, >>>>> >>>> Double>>(); >>>> >>>>> for (Object[] edge : DEFAULT_EDGES) { >>>>> edgeList.add(new Edge<Long, Double>((Long) edge[0], (Long) >>>>> >>>> edge[1], (Double) edge[2])); >>>> >>>>> } >>>>> return env.fromCollection(edgeList); >>>>> } >>>>> //Define vertices >>>>> private static DataSet<Vertex<Long, Long>> getVertexDataSet( >>>>> >>>> ExecutionEnvironment >>> >>>> env) { >>>> >>>>> //We will summarize by <VV> = Long >>>>> Object[][] DEFAULT_VERTICES = new Object[][] { >>>>> new Object[]{1L, 1L}, >>>>> new Object[]{2L, 1L}, >>>>> new Object[]{3L, 5L}, >>>>> new Object[]{4L, 5L}, >>>>> new Object[]{5L, 5L} >>>>> }; >>>>> List<Vertex<Long, Long>> vertexList = new >>>>> >>>> LinkedList<Vertex<Long, Long>>(); >>>> >>>>> for (Object[] vertex : DEFAULT_VERTICES) { >>>>> vertexList.add(new Vertex<Long, Long>((Long) vertex[0], >>>>> >>>> (Long) vertex[1])); >>>> >>>>> } >>>>> return env.fromCollection(vertexList); >>>>> } >>>>> } >>>>> >>>>> _________EXAMPLE END_____________________ >>>>> >>>>> >>>>> Best regards, >>>>> Olga Golovneva >>>>> >>>>> On Thu, Sep 15, 2016 at 9:16 AM, Till Rohrmann <trohrm...@apache.org> >>>>> wrote: >>>>> >>>>> Hi Olga, >>>>>> >>>>>> can you provide us with a little bit more details about the problem. >>>>>> >>>>> The >>> >>>> full stack trace of the exception and the program you're trying to run >>>>>> would be helpful. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Wed, Sep 14, 2016 at 9:49 PM, Olga Golovneva <melcha...@gmail.com> >>>>>> wrote: >>>>>> >>>>>> Hi devs, >>>>>>> >>>>>>> Do you know if there is an example (besides ITCase) of usage of >>>>>>> Summarization Library in Gelly? I'm having some problems trying to >>>>>>> >>>>>> use >>> >>>> it >>>>>> >>>>>>> in my code. Particularly, I cannot print output edges ( it throws >>>>>>> >>>>>> the >>> >>>> following exception: Exception in thread "main" >>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job >>>>>>> >>>>>> execution >>> >>>> failed.), while vertices are printed correctly. >>>>>>> >>>>>>> Best regards, >>>>>>> Olga >>>>>>> >>>>>>> >>>>> >