[ https://issues.apache.org/jira/browse/FLINK-2714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14958705#comment-14958705 ]
ASF GitHub Bot commented on FLINK-2714: --------------------------------------- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1250#discussion_r42107077 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java --- @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.example.utils.TriangleCountData; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + + +/** + * This function returns number of triangles present in the input graph. + * A triangle consists of three edges that connect three vertices with each other. + * <p> + * <p> + * The basic algorithm works as follows: + * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices + * that are connected by two edges. Finally, all triads are filtered for which no third edge exists + * that closes the triangle. + * <p> + * <p> + * For a group of <i>n</i> edges that share a common vertex, the number of built triads is quadratic <i>((n*(n-1))/2)</i>. + * Therefore, an optimization of the algorithm is to group edges on the vertex with the smaller output degree to + * reduce the number of triads. + * This implementation extends the basic algorithm by computing output degrees of edge vertices and + * grouping on edges on the vertex with the smaller degree. + */ + +public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K,K,K>>> { + @Override + public DataSet<Tuple3<K,K,K>> run(Graph<K, VV, EV> input) throws Exception { + + DataSet<Edge<K, EV>> edges = input.getEdges(); + + // annotate edges with degrees + DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<K, EV>()) + .groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<K, EV>()) + .groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<K>()); + + // project edges by degrees + DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<K>()); + // project edges by vertex id + DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<K>()); + + DataSet<Tuple3<K,K,K>> triangles = edgesByDegree + // build triads + .groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING) + .reduceGroup(new TriadBuilder()) + // filter triads + .join(edgesById).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<K>()); + + return triangles; + } + + /** + * Emits for an edge the original edge and its switched version. + */ + private static final class EdgeDuplicator<K, EV> implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> { + + @Override + public void flatMap(Edge<K, EV> edge, Collector<Edge<K, EV>> out) throws Exception { + out.collect(edge); + Edge<K, EV> reversed = edge.reverse(); + out.collect(reversed); + } + } + + /** + * Counts the number of edges that share a common vertex. + * Emits one edge for each input edge with a degree annotation for the shared vertex. + * For each emitted edge, the first vertex is the vertex with the smaller id. + */ + private static final class DegreeCounter<K extends Comparable<K>, EV> --- End diff -- I believe you can get rid of the EV type argument, since you're not using the edge value anywhere. > Port the Flink DataSet Triangle Enumeration example to the Gelly library > ------------------------------------------------------------------------ > > Key: FLINK-2714 > URL: https://issues.apache.org/jira/browse/FLINK-2714 > Project: Flink > Issue Type: Task > Components: Gelly > Affects Versions: 0.10 > Environment: > Reporter: Andra Lungu > Assignee: Saumitra Shahapure > Priority: Trivial > Labels: newbie, starter > > Currently, the Gelly library contains one method for counting the number of > triangles in a graph: a gather-apply-scatter version. > This issue proposes the addition of a library method based on this Flink > example: > https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java -- This message was sent by Atlassian JIRA (v6.3.4#6332)