[ 
https://issues.apache.org/jira/browse/FLINK-2361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14630360#comment-14630360
 ] 

Andra Lungu commented on FLINK-2361:
------------------------------------

Hi, 

I tweaked the code to look like this:
DataSet<Edge<String, NullValue>> edges = getEdgesDataSet(env);

                DataSet<Vertex<String, Long>> vertices = edges.flatMap(new 
FlatMapFunction<Edge<String, NullValue>, Vertex<String, Long>>() {
                        @Override
                        public void flatMap(Edge<String, NullValue> edge, 
Collector<Vertex<String, Long>> collector) throws Exception {
                                collector.collect(new Vertex<String, 
Long>(edge.getSource(), Long.parseLong(edge.getSource())));
                                collector.collect(new Vertex<String, 
Long>(edge.getTarget(), Long.parseLong(edge.getTarget())));
                        }
                }).distinct();

                if (fileOutput) {
                        vertices.writeAsCsv(vertexInputPath, "\n", ",");
                        env.execute();
                }

                DataSet<Vertex<String, Long>> rereadVertices = 
env.readCsvFile(vertexInputPath)
                                
.fieldDelimiter(",").lineDelimiter("\n").ignoreComments("#")
                                .types(String.class, Long.class).map(new 
MapFunction<Tuple2<String, Long>, Vertex<String, Long>>() {
                                        @Override
                                        public Vertex<String, Long> 
map(Tuple2<String, Long> tuple2) throws Exception {
                                                return new Vertex<String, 
Long>(tuple2.f0, tuple2.f1);
                                        }
                                });

                Graph<String, Long, NullValue> graph = 
Graph.fromDataSet(rereadVertices, edges, env);

which is not what I would normally do, BTW... 

and I still get:
Caused by: java.lang.Exception: Target vertex '124518874' does not exist!.
        at 
org.apache.flink.graph.spargel.VertexCentricIteration$VertexUpdateUdfSimpleVV.coGroup(VertexCentricIteration.java:300)
        at 
org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:220)
        at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
        at 
org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139)
        at 
org.apache.flink.runtime.iterative.task.IterationTailPactTask.run(IterationTailPactTask.java:107)
        at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:722)

Lucky day :| At least it's missing a different vertex now...

> flatMap + distinct gives erroneous results for big data sets
> ------------------------------------------------------------
>
>                 Key: FLINK-2361
>                 URL: https://issues.apache.org/jira/browse/FLINK-2361
>             Project: Flink
>          Issue Type: Bug
>          Components: Gelly
>    Affects Versions: 0.10
>            Reporter: Andra Lungu
>
> When running the simple Connected Components algorithm (currently in Gelly) 
> on the twitter follower graph, with 1, 100 or 10000 iterations, I get the 
> following error:
> Caused by: java.lang.Exception: Target vertex '657282846' does not exist!.
>       at 
> org.apache.flink.graph.spargel.VertexCentricIteration$VertexUpdateUdfSimpleVV.coGroup(VertexCentricIteration.java:300)
>       at 
> org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:220)
>       at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>       at 
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139)
>       at 
> org.apache.flink.runtime.iterative.task.IterationTailPactTask.run(IterationTailPactTask.java:107)
>       at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:722)
> Now this is very bizzare as the DataSet of vertices is produced from the 
> DataSet of edges... Which means there cannot be a an edge with an invalid 
> target id... The method calls flatMap to isolate the src and trg ids and 
> distinct to ensure their uniqueness.  
> The algorithm works fine for smaller data sets... 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to