+1 for the either type solution :-)

On Sat, Sep 17, 2016 at 10:49 AM, Martin Junghanns <m.jungha...@mailbox.org>

> Hi all,
> thanks for reporting the issue! I just looked into it. The
> VertexGroupReduce outputs two semantically different tuples: one for each
> vertex without the value (null) and one tuple representing the whole group
> including the value. As Till pointed out, this crashes if the value has no
> serializer for null values.
> Since we cannot have two different output types for the same GroupReduce
> function, I propose using a Either<NullValue, VV> here. If there are no
> objections I will take the issue and fix it.
> Again, thx for pointing it out.
> Best,
> Martin
> On 15.09.2016 20:10, Vasiliki Kalavri wrote:
>> Hi,
>> thanks for looking into this Till! I'm not quite sure what the algorithm
>> behavior should be when the vertex value is null (probably skip the
>> record?). Let's wait for Martin's input.
>> Cheers,
>> -V.
>> On 15 September 2016 at 19:19, Olga Golovneva <melcha...@gmail.com>
>> wrote:
>> Hi Till,
>>> Thanks a lot for your help! I'll try to use another variable type in the
>>> meantime.
>>> Best regards,
>>> Olga
>>> Best regards,
>>> Olga Golovneva
>>> On Thu, Sep 15, 2016 at 1:03 PM, Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>> Hi Olga,
>>>> it’s indeed an error in Flink’s Summarization algorithm. The problem is
>>> the
>>>> following: The vertex group value of the VertexGroupItem is null in the
>>>> VertexGroupReducer. This works in the SummarizationIT case because the
>>>> vertex value is of type String and the StringSerializer can deal with
>>> null
>>>> values.
>>>> However, in your case where you use longs, it fails, because the
>>>> LongSerializer cannot handle null values. You can verify this behaviour
>>> by
>>>> changing the vertex value type to String. Then everything should work
>>>> without a problem.
>>>> I’ve cc’ed Martin who can tell you probably more about the Summarization
>>>> algorithm. I’ve also opened a JIRA ticket [1] to fix this problem.
>>>> Thanks for reporting this bug.
>>>> [1] https://issues.apache.org/jira/browse/FLINK-4624
>>>> Cheers,
>>>> Till
>>>> ​
>>>> On Thu, Sep 15, 2016 at 5:05 PM, Olga Golovneva <melcha...@gmail.com>
>>>> wrote:
>>>> Hi Till,
>>>>> I've created a simple (Java) example to show you what's going on. The
>>>> code
>>>>> is in attachment and shown below. This example creates simple graph
>>>> with
>>>> Double EV and Long VV. Then it runs Summarization, that should compute
>>>> a
>>>> condensed version of the input graph by grouping vertices and edges
>>>> based
>>>> on their values. I run this code with IntelliJ IDEA. The code executes
>>>> fine
>>>>> until you want to see what is written in resulted edges (just uncomment
>>>>> line 46, edgesOut.print();). Then it throws the following Exception:
>>>>> _________EXCEPTION START_____________
>>>>> Exception in thread "main" org.apache.flink.runtime.
>>>> client.JobExecutionException:
>>>>> Job execution failed.
>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$
>>>>> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$
>>>>> mcV$sp(JobManager.scala:830)
>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$
>>>>> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
>>>> JobManager.scala:773)
>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$
>>>>> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
>>>> JobManager.scala:773)
>>>>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.
>>>>> liftedTree1$1(Future.scala:24)
>>>>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
>>>>> Future.scala:24)
>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
>>>>> AbstractDispatcher.scala:401)
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(
>>>> ForkJoinTask.java:260)
>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>>>> pollAndExecAll(ForkJoinPool.java:1253)
>>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>>>> runTask(ForkJoinPool.java:1346)
>>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>>>> ForkJoinPool.java:1979)
>>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>>>>> ForkJoinWorkerThread.java:107)
>>>>> Caused by: org.apache.flink.types.NullFieldException: Field 2 is null,
>>>>> but expected to hold a value.
>>>>> at org.apache.flink.api.java.typeutils.runtime.
>>>> TupleSerializer.serialize(
>>>>> TupleSerializer.java:126)
>>>>> at org.apache.flink.api.java.typeutils.runtime.
>>>> TupleSerializer.serialize(
>>>>> TupleSerializer.java:30)
>>>>> at org.apache.flink.runtime.plugable.SerializationDelegate.write(
>>>>> SerializationDelegate.java:56)
>>>>> at org.apache.flink.runtime.io.network.api.serialization.
>>>>> SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:83)
>>>>> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
>>>>> RecordWriter.java:85)
>>>>> at org.apache.flink.runtime.operators.shipping.
>>>> OutputCollector.collect(
>>>> OutputCollector.java:65)
>>>>> at org.apache.flink.runtime.operators.util.metrics.
>>>>> CountingCollector.collect(CountingCollector.java:35)
>>>>> at org.apache.flink.api.java.operators.translation.PlanFilterOperator$
>>>>> FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>>>>> at org.apache.flink.runtime.operators.chaining.
>>>>> ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
>>>>> at org.apache.flink.runtime.operators.util.metrics.
>>>>> CountingCollector.collect(CountingCollector.java:35)
>>>>> at org.apache.flink.graph.library.Summarization$
>>>> VertexGroupReducer.reduce(
>>>>> Summarization.java:323)
>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.
>>>>> run(GroupReduceDriver.java:131)
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(
>>>> BatchTask.java:486)
>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(
>>>> BatchTask.java:351)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:590)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.NullPointerException
>>>>> at org.apache.flink.api.common.typeutils.base.LongSerializer.
>>>>> serialize(LongSerializer.java:64)
>>>>> at org.apache.flink.api.common.typeutils.base.LongSerializer.
>>>>> serialize(LongSerializer.java:27)
>>>>> at org.apache.flink.api.java.typeutils.runtime.
>>>> TupleSerializer.serialize(
>>>>> TupleSerializer.java:124)
>>>>> ... 15 more
>>>>> _____________EXCEPTION END__________________
>>>>> It looks like the problem is in the following lines in Summarization:
>>>>> DataSet<Edge<K, EV>> edgesForGrouping = input.getEdges()
>>>>>        .join(vertexToRepresentativeMap)
>>>>>        .where(0)  // source vertex id
>>>>>        .equalTo(0) // vertex id
>>>>>        .with(new SourceVertexJoinFunction<K, EV>())
>>>>>        .join(vertexToRepresentativeMap)
>>>>>        .where(1)  // target vertex id
>>>>>        .equalTo(0) // vertex id
>>>>>        .with(new TargetVertexJoinFunction<K, EV>());
>>>>> If you try to print edges before this step, it works fine. But after
>>>> this
>>>> step my IDE gives the same exception.
>>>>> I would really appreciate any help.
>>>>> Thank you,
>>>>> Olga
>>>>> _________EXAMPLE START_____________________
>>>>> package org.apache.flink.graph.examples;
>>>>> import org.apache.flink.api.common.ProgramDescription;
>>>>> import org.apache.flink.api.java.DataSet;
>>>>> import org.apache.flink.api.java.ExecutionEnvironment;
>>>>> import org.apache.flink.graph.Edge;
>>>>> import org.apache.flink.graph.Graph;
>>>>> import org.apache.flink.graph.Vertex;
>>>>> import org.apache.flink.graph.library.Summarization;
>>>>> import java.util.LinkedList;
>>>>> import java.util.List;
>>>>> public class MySummarizationExample implements ProgramDescription {
>>>>>      @SuppressWarnings("serial")
>>>>>      public static void main(String [] args) throws Exception {
>>>>>          ExecutionEnvironment env = ExecutionEnvironment.
>>>> getExecutionEnvironment();
>>>>>          //Create graph
>>>>>          DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
>>>>>          DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
>>>>>          Graph<Long, Long, Double> graph = Graph.fromDataSet(vertices,
>>>> edges, env);
>>>>>          //emit input
>>>>>          System.out.println("Executing example with following
>>>> inputs:\n"+"Vertices:\n");
>>>>>          vertices.print();
>>>>>          System.out.println("Edges:\n");
>>>>>          edges.print();
>>>>>          Graph<Long, Summarization.VertexValue<Long>,
>>>> Summarization.EdgeValue<Double>> result = graph
>>>>>                  .run(new Summarization<Long, Long, Double>());
>>>>>          //now we want to read the output
>>>>>          DataSet<Edge<Long, Summarization.EdgeValue<Double>>> edgesOut
>>>> =
>>>> result.getEdges();
>>>>>          DataSet<Vertex<Long, Summarization.VertexValue<Long>>>
>>>> verticesOut = result.getVertices();
>>>>>          // emit result
>>>>>          System.out.println("Summarized graph:\n"+"Vertices:\n");
>>>>>          verticesOut.print();
>>>>>          System.out.println("Edges:\n");
>>>>>          edgesOut.print();
>>>>>      }
>>>>>      @Override
>>>>>      public String getDescription() {
>>>>>          return "Summarization Example";
>>>>>      }
>>>>>      //Define edges
>>>>>      private static DataSet<Edge<Long, Double>> getEdgeDataSet(
>>>> ExecutionEnvironment
>>>> env) {
>>>>>          Object[][] DEFAULT_EDGES = new Object[][] {
>>>>>                  new Object[]{1L, 2L, 1.0},
>>>>>                  new Object[]{1L, 4L, 3.0},
>>>>>                  new Object[]{2L, 3L, 6.0},
>>>>>                  new Object[]{2L, 4L, 5.0},
>>>>>                  new Object[]{2L, 5L, 1.0},
>>>>>                  new Object[]{3L, 5L, 5.0},
>>>>>                  new Object[]{3L, 6L, 2.0},
>>>>>                  new Object[]{4L, 5L, 1.0},
>>>>>                  new Object[]{5L, 6L, 4.0}
>>>>>          };
>>>>>          List<Edge<Long, Double>> edgeList = new LinkedList<Edge<Long,
>>>> Double>>();
>>>>>          for (Object[] edge : DEFAULT_EDGES) {
>>>>>              edgeList.add(new Edge<Long, Double>((Long) edge[0], (Long)
>>>> edge[1], (Double) edge[2]));
>>>>>          }
>>>>>          return env.fromCollection(edgeList);
>>>>>      }
>>>>>      //Define vertices
>>>>>      private static DataSet<Vertex<Long, Long>> getVertexDataSet(
>>>> ExecutionEnvironment
>>>> env) {
>>>>>          //We will summarize by <VV> = Long
>>>>>          Object[][] DEFAULT_VERTICES = new Object[][] {
>>>>>                  new Object[]{1L, 1L},
>>>>>                  new Object[]{2L, 1L},
>>>>>                  new Object[]{3L, 5L},
>>>>>                  new Object[]{4L, 5L},
>>>>>                  new Object[]{5L, 5L}
>>>>>          };
>>>>>          List<Vertex<Long, Long>> vertexList = new
>>>> LinkedList<Vertex<Long, Long>>();
>>>>>          for (Object[] vertex : DEFAULT_VERTICES) {
>>>>>              vertexList.add(new Vertex<Long, Long>((Long) vertex[0],
>>>> (Long) vertex[1]));
>>>>>          }
>>>>>          return env.fromCollection(vertexList);
>>>>>      }
>>>>> }
>>>>> _________EXAMPLE END_____________________
>>>>> Best regards,
>>>>> Olga Golovneva
>>>>> On Thu, Sep 15, 2016 at 9:16 AM, Till Rohrmann <trohrm...@apache.org>
>>>>> wrote:
>>>>> Hi Olga,
>>>>>> can you provide us with a little bit more details about the problem.
>>>>> The
>>>> full stack trace of the exception and the program you're trying to run
>>>>>> would be helpful.
>>>>>> Cheers,
>>>>>> Till
>>>>>> On Wed, Sep 14, 2016 at 9:49 PM, Olga Golovneva <melcha...@gmail.com>
>>>>>> wrote:
>>>>>> Hi devs,
>>>>>>> Do you know if there is an example (besides ITCase) of usage of
>>>>>>> Summarization Library in Gelly? I'm having some problems trying to
>>>>>> use
>>>> it
>>>>>>> in my code. Particularly, I cannot print output edges ( it throws
>>>>>> the
>>>> following exception: Exception in thread "main"
>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job
>>>>>> execution
>>>> failed.), while vertices are printed correctly.
>>>>>>> Best regards,
>>>>>>> Olga

Reply via email to