[ https://issues.apache.org/jira/browse/FLINK-1528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14351544#comment-14351544 ]
ASF GitHub Bot commented on FLINK-1528: --------------------------------------- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/420#discussion_r25995666 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LocalClusteringCoefficientExample.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.EdgesFunction; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.NeighborsFunctionWithVertexValue; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.LocalClusteringCoefficientData; +import org.apache.flink.types.NullValue; + +import java.util.HashSet; + +public class LocalClusteringCoefficientExample implements ProgramDescription { + + // -------------------------------------------------------------------------------------------- + // Program + // -------------------------------------------------------------------------------------------- + + public static void main (String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env); + Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(edges, env); + + // Get the neighbors of each vertex in a HashSet + DataSet<Tuple2<Long, HashSet<Long>>> neighborhoods = graph + .reduceOnEdges(new NeighborhoodEdgesFunction(), EdgeDirection.OUT); + + // Construct a new graph where the neighborhood is the vertex value + Graph<Long, HashSet<Long>, NullValue> newGraph = graph + .mapVertices(new EmptyVertexMapFunction()) + .joinWithVertices(neighborhoods, new NeighborhoodVertexMapFunction()); + + // Calculate clustering coefficient + DataSet<Tuple2<Long, Double>> clusteringCoefficients = newGraph + .reduceOnNeighbors(new ClusteringCoefficientNeighborsFunction(), EdgeDirection.OUT); + + // Emit results + if(fileOutput) { + clusteringCoefficients.writeAsCsv(outputPath, "\n", ","); + } else { + clusteringCoefficients.print(); + } + + env.execute("Local Clustering Coefficient Example"); + } + + // -------------------------------------------------------------------------------------------- + // Clustering Coefficient Functions + // -------------------------------------------------------------------------------------------- + + private static final class NeighborhoodEdgesFunction + implements EdgesFunction<Long, NullValue, Tuple2<Long, HashSet<Long>>> { + + @Override + public Tuple2<Long, HashSet<Long>> iterateEdges( + Iterable<Tuple2<Long, Edge<Long, NullValue>>> edges) throws Exception { + + Long vertexId = null; + HashSet<Long> neighbors = new HashSet<Long>(); + + for (Tuple2<Long, Edge<Long, NullValue>> edge : edges) { + vertexId = edge.f0; + neighbors.add(edge.f1.getTarget()); + } + + return new Tuple2<Long, HashSet<Long>>(vertexId, neighbors); + } + } + + private static final class EmptyVertexMapFunction + implements MapFunction<Vertex<Long, NullValue>, HashSet<Long>> { + + @Override + public HashSet<Long> map(Vertex<Long, NullValue> arg) throws Exception { + return new HashSet<Long>(); + } + } + + private static final class NeighborhoodVertexMapFunction + implements MapFunction<Tuple2<HashSet<Long>, HashSet<Long>>, HashSet<Long>> { + + @Override + public HashSet<Long> map(Tuple2<HashSet<Long>, HashSet<Long>> arg) throws Exception { + return arg.f1; + } + } + + private static final class ClusteringCoefficientNeighborsFunction + implements NeighborsFunctionWithVertexValue<Long, HashSet<Long>, NullValue, Tuple2<Long, Double>> { + + @Override + public Tuple2<Long, Double> iterateNeighbors(Vertex<Long, HashSet<Long>> vertex, + Iterable<Tuple2<Edge<Long, NullValue>, Vertex<Long, HashSet<Long>>>> neighbors) throws Exception { + + int deg = vertex.getValue().size(); + int e = 0; + + // Calculate common neighbor count (e) + for (Tuple2<Edge<Long, NullValue>, Vertex<Long, HashSet<Long>>> neighbor : neighbors) { + // Iterate neighbor's neighbors + for (Long nn : neighbor.f1.getValue()) { + if (vertex.getValue().contains(nn)) { + e++; + } + } + } + + // We assume an undirected graph, so we need to divide e here + e /= 2; --- End diff -- I don't get the reasoning behind this.. Do you assume an undirected graph as input? or you convert the input to undirected? in any case, I think we should add a comment in the beginning of the example to define the behavior. > Add local clustering coefficient library method and example > ----------------------------------------------------------- > > Key: FLINK-1528 > URL: https://issues.apache.org/jira/browse/FLINK-1528 > Project: Flink > Issue Type: Task > Components: Gelly > Reporter: Vasia Kalavri > Assignee: Daniel Bali > > Add a gelly library method and example to compute the local clustering > coefficient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)