
ASF GitHub Bot commented on FLINK-1528:

Github user vasia commented on a diff in the pull request:

    --- Diff: 
    @@ -0,0 +1,205 @@
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.graph.example;
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.EdgesFunction;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.example.utils.LocalClusteringCoefficientData;
    +import org.apache.flink.types.NullValue;
    +import java.util.HashSet;
    +public class LocalClusteringCoefficientExample implements 
ProgramDescription {
    +   // 
    +   //  Program
    +   // 
    +   public static void main (String [] args) throws Exception {
    +           if(!parseParameters(args)) {
    +                   return;
    +           }
    +           ExecutionEnvironment env = 
    +           DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
    +           Graph<Long, NullValue, NullValue> graph = 
Graph.fromDataSet(edges, env);
    +           // Get the neighbors of each vertex in a HashSet
    +           DataSet<Tuple2<Long, HashSet<Long>>> neighborhoods = graph
    +                           .reduceOnEdges(new NeighborhoodEdgesFunction(), 
    +           // Construct a new graph where the neighborhood is the vertex 
    +           Graph<Long, HashSet<Long>, NullValue> newGraph = graph
    +                           .mapVertices(new EmptyVertexMapFunction())
    +                           .joinWithVertices(neighborhoods, new 
    +           // Calculate clustering coefficient
    +           DataSet<Tuple2<Long, Double>> clusteringCoefficients = newGraph
    +                           .reduceOnNeighbors(new 
ClusteringCoefficientNeighborsFunction(), EdgeDirection.OUT);
    +           // Emit results
    +           if(fileOutput) {
    +                   clusteringCoefficients.writeAsCsv(outputPath, "\n", 
    +           } else {
    +                   clusteringCoefficients.print();
    +           }
    +           env.execute("Local Clustering Coefficient Example");
    +   }
    +   // 
    +   //  Clustering Coefficient Functions
    +   // 
    +   private static final class NeighborhoodEdgesFunction
    +                   implements EdgesFunction<Long, NullValue, Tuple2<Long, 
HashSet<Long>>> {
    +           @Override
    +           public Tuple2<Long, HashSet<Long>> iterateEdges(
    +                           Iterable<Tuple2<Long, Edge<Long, NullValue>>> 
edges) throws Exception {
    +                   Long vertexId = null;
    +                   HashSet<Long> neighbors = new HashSet<Long>();
    +                   for (Tuple2<Long, Edge<Long, NullValue>> edge : edges) {
    +                           vertexId = edge.f0;
    +                           neighbors.add(edge.f1.getTarget());
    +                   }
    +                   return new Tuple2<Long, HashSet<Long>>(vertexId, 
    +           }
    +   }
    +   private static final class EmptyVertexMapFunction
    +                   implements MapFunction<Vertex<Long, NullValue>, 
HashSet<Long>> {
    +           @Override
    +           public HashSet<Long> map(Vertex<Long, NullValue> arg) throws 
Exception {
    +                   return new HashSet<Long>();
    +           }
    +   }
    +   private static final class NeighborhoodVertexMapFunction
    +                   implements MapFunction<Tuple2<HashSet<Long>, 
HashSet<Long>>, HashSet<Long>> {
    +           @Override
    +           public HashSet<Long> map(Tuple2<HashSet<Long>, HashSet<Long>> 
arg) throws Exception {
    +                   return arg.f1;
    +           }
    +   }
    +   private static final class ClusteringCoefficientNeighborsFunction
    +                   implements NeighborsFunctionWithVertexValue<Long, 
HashSet<Long>, NullValue, Tuple2<Long, Double>> {
    +           @Override
    +           public Tuple2<Long, Double> iterateNeighbors(Vertex<Long, 
HashSet<Long>> vertex,
    +                           Iterable<Tuple2<Edge<Long, NullValue>, 
Vertex<Long, HashSet<Long>>>> neighbors) throws Exception {
    +                   int deg = vertex.getValue().size();
    +                   int e = 0;
    +                   // Calculate common neighbor count (e)
    +                   for (Tuple2<Edge<Long, NullValue>, Vertex<Long, 
HashSet<Long>>> neighbor : neighbors) {
    +                           // Iterate neighbor's neighbors
    +                           for (Long nn : neighbor.f1.getValue()) {
    +                                   if (vertex.getValue().contains(nn)) {
    +                                           e++;
    +                                   }
    +                           }
    +                   }
    +                   // We assume an undirected graph, so we need to divide 
e here
    +                   e /= 2;
    --- End diff --
    I don't get the reasoning behind this.. Do you assume an undirected graph 
as input? or you convert the input to undirected? in any case, I think we 
should add a comment in the beginning of the example to define the behavior.

> Add local clustering coefficient library method and example
> -----------------------------------------------------------
>                 Key: FLINK-1528
>                 URL: https://issues.apache.org/jira/browse/FLINK-1528
>             Project: Flink
>          Issue Type: Task
>          Components: Gelly
>            Reporter: Vasia Kalavri
>            Assignee: Daniel Bali
> Add a gelly library method and example to compute the local clustering 
> coefficient.

This message was sent by Atlassian JIRA

Reply via email to