+1 for the either type solution :-)

On Sat, Sep 17, 2016 at 10:49 AM, Martin Junghanns <m.jungha...@mailbox.org>
wrote:

> Hi all,
>
> thanks for reporting the issue! I just looked into it. The
> VertexGroupReduce outputs two semantically different tuples: one for each
> vertex without the value (null) and one tuple representing the whole group
> including the value. As Till pointed out, this crashes if the value has no
> serializer for null values.
>
> Since we cannot have two different output types for the same GroupReduce
> function, I propose using a Either<NullValue, VV> here. If there are no
> objections I will take the issue and fix it.
>
> Again, thx for pointing it out.
>
> Best,
>
> Martin
>
>
> On 15.09.2016 20:10, Vasiliki Kalavri wrote:
>
>> Hi,
>>
>> thanks for looking into this Till! I'm not quite sure what the algorithm
>> behavior should be when the vertex value is null (probably skip the
>> record?). Let's wait for Martin's input.
>>
>> Cheers,
>> -V.
>>
>> On 15 September 2016 at 19:19, Olga Golovneva <melcha...@gmail.com>
>> wrote:
>>
>> Hi Till,
>>>
>>> Thanks a lot for your help! I'll try to use another variable type in the
>>> meantime.
>>>
>>> Best regards,
>>> Olga
>>>
>>>
>>> Best regards,
>>> Olga Golovneva
>>>
>>> On Thu, Sep 15, 2016 at 1:03 PM, Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>> Hi Olga,
>>>>
>>>> it’s indeed an error in Flink’s Summarization algorithm. The problem is
>>>>
>>> the
>>>
>>>> following: The vertex group value of the VertexGroupItem is null in the
>>>> VertexGroupReducer. This works in the SummarizationIT case because the
>>>> vertex value is of type String and the StringSerializer can deal with
>>>>
>>> null
>>>
>>>> values.
>>>>
>>>> However, in your case where you use longs, it fails, because the
>>>> LongSerializer cannot handle null values. You can verify this behaviour
>>>>
>>> by
>>>
>>>> changing the vertex value type to String. Then everything should work
>>>> without a problem.
>>>>
>>>> I’ve cc’ed Martin who can tell you probably more about the Summarization
>>>> algorithm. I’ve also opened a JIRA ticket [1] to fix this problem.
>>>>
>>>> Thanks for reporting this bug.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-4624
>>>>
>>>> Cheers,
>>>> Till
>>>> ​
>>>>
>>>> On Thu, Sep 15, 2016 at 5:05 PM, Olga Golovneva <melcha...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi Till,
>>>>>
>>>>> I've created a simple (Java) example to show you what's going on. The
>>>>>
>>>> code
>>>>
>>>>> is in attachment and shown below. This example creates simple graph
>>>>>
>>>> with
>>>
>>>> Double EV and Long VV. Then it runs Summarization, that should compute
>>>>>
>>>> a
>>>
>>>> condensed version of the input graph by grouping vertices and edges
>>>>>
>>>> based
>>>
>>>> on their values. I run this code with IntelliJ IDEA. The code executes
>>>>>
>>>> fine
>>>>
>>>>> until you want to see what is written in resulted edges (just uncomment
>>>>> line 46, edgesOut.print();). Then it throws the following Exception:
>>>>>
>>>>> _________EXCEPTION START_____________
>>>>> Exception in thread "main" org.apache.flink.runtime.
>>>>>
>>>> client.JobExecutionException:
>>>>
>>>>> Job execution failed.
>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$
>>>>> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$
>>>>> mcV$sp(JobManager.scala:830)
>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$
>>>>> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
>>>>>
>>>> JobManager.scala:773)
>>>>
>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$
>>>>> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
>>>>>
>>>> JobManager.scala:773)
>>>>
>>>>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.
>>>>> liftedTree1$1(Future.scala:24)
>>>>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
>>>>> Future.scala:24)
>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
>>>>> AbstractDispatcher.scala:401)
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(
>>>>>
>>>> ForkJoinTask.java:260)
>>>
>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>>>> pollAndExecAll(ForkJoinPool.java:1253)
>>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>>>> runTask(ForkJoinPool.java:1346)
>>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>>>> ForkJoinPool.java:1979)
>>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>>>>> ForkJoinWorkerThread.java:107)
>>>>> Caused by: org.apache.flink.types.NullFieldException: Field 2 is null,
>>>>> but expected to hold a value.
>>>>> at org.apache.flink.api.java.typeutils.runtime.
>>>>>
>>>> TupleSerializer.serialize(
>>>>
>>>>> TupleSerializer.java:126)
>>>>> at org.apache.flink.api.java.typeutils.runtime.
>>>>>
>>>> TupleSerializer.serialize(
>>>>
>>>>> TupleSerializer.java:30)
>>>>> at org.apache.flink.runtime.plugable.SerializationDelegate.write(
>>>>> SerializationDelegate.java:56)
>>>>> at org.apache.flink.runtime.io.network.api.serialization.
>>>>> SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:83)
>>>>> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
>>>>> RecordWriter.java:85)
>>>>> at org.apache.flink.runtime.operators.shipping.
>>>>>
>>>> OutputCollector.collect(
>>>
>>>> OutputCollector.java:65)
>>>>> at org.apache.flink.runtime.operators.util.metrics.
>>>>> CountingCollector.collect(CountingCollector.java:35)
>>>>> at org.apache.flink.api.java.operators.translation.PlanFilterOperator$
>>>>> FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>>>>> at org.apache.flink.runtime.operators.chaining.
>>>>> ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
>>>>> at org.apache.flink.runtime.operators.util.metrics.
>>>>> CountingCollector.collect(CountingCollector.java:35)
>>>>> at org.apache.flink.graph.library.Summarization$
>>>>>
>>>> VertexGroupReducer.reduce(
>>>>
>>>>> Summarization.java:323)
>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.
>>>>> run(GroupReduceDriver.java:131)
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(
>>>>>
>>>> BatchTask.java:486)
>>>
>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(
>>>>>
>>>> BatchTask.java:351)
>>>>
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:590)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.NullPointerException
>>>>> at org.apache.flink.api.common.typeutils.base.LongSerializer.
>>>>> serialize(LongSerializer.java:64)
>>>>> at org.apache.flink.api.common.typeutils.base.LongSerializer.
>>>>> serialize(LongSerializer.java:27)
>>>>> at org.apache.flink.api.java.typeutils.runtime.
>>>>>
>>>> TupleSerializer.serialize(
>>>>
>>>>> TupleSerializer.java:124)
>>>>> ... 15 more
>>>>>
>>>>> _____________EXCEPTION END__________________
>>>>>
>>>>> It looks like the problem is in the following lines in Summarization:
>>>>>
>>>>> DataSet<Edge<K, EV>> edgesForGrouping = input.getEdges()
>>>>>        .join(vertexToRepresentativeMap)
>>>>>        .where(0)  // source vertex id
>>>>>        .equalTo(0) // vertex id
>>>>>        .with(new SourceVertexJoinFunction<K, EV>())
>>>>>        .join(vertexToRepresentativeMap)
>>>>>        .where(1)  // target vertex id
>>>>>        .equalTo(0) // vertex id
>>>>>        .with(new TargetVertexJoinFunction<K, EV>());
>>>>>
>>>>>
>>>>> If you try to print edges before this step, it works fine. But after
>>>>>
>>>> this
>>>
>>>> step my IDE gives the same exception.
>>>>>
>>>>> I would really appreciate any help.
>>>>>
>>>>> Thank you,
>>>>> Olga
>>>>>
>>>>> _________EXAMPLE START_____________________
>>>>>
>>>>> package org.apache.flink.graph.examples;
>>>>>
>>>>> import org.apache.flink.api.common.ProgramDescription;
>>>>> import org.apache.flink.api.java.DataSet;
>>>>> import org.apache.flink.api.java.ExecutionEnvironment;
>>>>> import org.apache.flink.graph.Edge;
>>>>> import org.apache.flink.graph.Graph;
>>>>> import org.apache.flink.graph.Vertex;
>>>>> import org.apache.flink.graph.library.Summarization;
>>>>> import java.util.LinkedList;
>>>>> import java.util.List;
>>>>>
>>>>> public class MySummarizationExample implements ProgramDescription {
>>>>>
>>>>>      @SuppressWarnings("serial")
>>>>>      public static void main(String [] args) throws Exception {
>>>>>
>>>>>          ExecutionEnvironment env = ExecutionEnvironment.
>>>>>
>>>> getExecutionEnvironment();
>>>>
>>>>>          //Create graph
>>>>>          DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
>>>>>          DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
>>>>>          Graph<Long, Long, Double> graph = Graph.fromDataSet(vertices,
>>>>>
>>>> edges, env);
>>>>
>>>>>          //emit input
>>>>>          System.out.println("Executing example with following
>>>>>
>>>> inputs:\n"+"Vertices:\n");
>>>>
>>>>>          vertices.print();
>>>>>          System.out.println("Edges:\n");
>>>>>          edges.print();
>>>>>
>>>>>          Graph<Long, Summarization.VertexValue<Long>,
>>>>>
>>>> Summarization.EdgeValue<Double>> result = graph
>>>>
>>>>>                  .run(new Summarization<Long, Long, Double>());
>>>>>
>>>>>          //now we want to read the output
>>>>>          DataSet<Edge<Long, Summarization.EdgeValue<Double>>> edgesOut
>>>>>
>>>> =
>>>
>>>> result.getEdges();
>>>>
>>>>>          DataSet<Vertex<Long, Summarization.VertexValue<Long>>>
>>>>>
>>>> verticesOut = result.getVertices();
>>>>
>>>>>          // emit result
>>>>>          System.out.println("Summarized graph:\n"+"Vertices:\n");
>>>>>          verticesOut.print();
>>>>>          System.out.println("Edges:\n");
>>>>>          edgesOut.print();
>>>>>      }
>>>>>
>>>>>      @Override
>>>>>      public String getDescription() {
>>>>>          return "Summarization Example";
>>>>>      }
>>>>>
>>>>>      //Define edges
>>>>>      private static DataSet<Edge<Long, Double>> getEdgeDataSet(
>>>>>
>>>> ExecutionEnvironment
>>>
>>>> env) {
>>>>
>>>>>          Object[][] DEFAULT_EDGES = new Object[][] {
>>>>>                  new Object[]{1L, 2L, 1.0},
>>>>>                  new Object[]{1L, 4L, 3.0},
>>>>>                  new Object[]{2L, 3L, 6.0},
>>>>>                  new Object[]{2L, 4L, 5.0},
>>>>>                  new Object[]{2L, 5L, 1.0},
>>>>>                  new Object[]{3L, 5L, 5.0},
>>>>>                  new Object[]{3L, 6L, 2.0},
>>>>>                  new Object[]{4L, 5L, 1.0},
>>>>>                  new Object[]{5L, 6L, 4.0}
>>>>>          };
>>>>>          List<Edge<Long, Double>> edgeList = new LinkedList<Edge<Long,
>>>>>
>>>> Double>>();
>>>>
>>>>>          for (Object[] edge : DEFAULT_EDGES) {
>>>>>              edgeList.add(new Edge<Long, Double>((Long) edge[0], (Long)
>>>>>
>>>> edge[1], (Double) edge[2]));
>>>>
>>>>>          }
>>>>>          return env.fromCollection(edgeList);
>>>>>      }
>>>>>      //Define vertices
>>>>>      private static DataSet<Vertex<Long, Long>> getVertexDataSet(
>>>>>
>>>> ExecutionEnvironment
>>>
>>>> env) {
>>>>
>>>>>          //We will summarize by <VV> = Long
>>>>>          Object[][] DEFAULT_VERTICES = new Object[][] {
>>>>>                  new Object[]{1L, 1L},
>>>>>                  new Object[]{2L, 1L},
>>>>>                  new Object[]{3L, 5L},
>>>>>                  new Object[]{4L, 5L},
>>>>>                  new Object[]{5L, 5L}
>>>>>          };
>>>>>          List<Vertex<Long, Long>> vertexList = new
>>>>>
>>>> LinkedList<Vertex<Long, Long>>();
>>>>
>>>>>          for (Object[] vertex : DEFAULT_VERTICES) {
>>>>>              vertexList.add(new Vertex<Long, Long>((Long) vertex[0],
>>>>>
>>>> (Long) vertex[1]));
>>>>
>>>>>          }
>>>>>          return env.fromCollection(vertexList);
>>>>>      }
>>>>> }
>>>>>
>>>>> _________EXAMPLE END_____________________
>>>>>
>>>>>
>>>>> Best regards,
>>>>> Olga Golovneva
>>>>>
>>>>> On Thu, Sep 15, 2016 at 9:16 AM, Till Rohrmann <trohrm...@apache.org>
>>>>> wrote:
>>>>>
>>>>> Hi Olga,
>>>>>>
>>>>>> can you provide us with a little bit more details about the problem.
>>>>>>
>>>>> The
>>>
>>>> full stack trace of the exception and the program you're trying to run
>>>>>> would be helpful.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Wed, Sep 14, 2016 at 9:49 PM, Olga Golovneva <melcha...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hi devs,
>>>>>>>
>>>>>>> Do you know if there is an example (besides ITCase) of usage of
>>>>>>> Summarization Library in Gelly? I'm having some problems trying to
>>>>>>>
>>>>>> use
>>>
>>>> it
>>>>>>
>>>>>>> in my code. Particularly, I cannot print output edges ( it throws
>>>>>>>
>>>>>> the
>>>
>>>> following exception: Exception in thread "main"
>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job
>>>>>>>
>>>>>> execution
>>>
>>>> failed.), while vertices are printed correctly.
>>>>>>>
>>>>>>> Best regards,
>>>>>>> Olga
>>>>>>>
>>>>>>>
>>>>>
>

Reply via email to