
ASF GitHub Bot commented on FLINK-3780:

Github user vasia commented on a diff in the pull request:

    --- Diff: 
    @@ -0,0 +1,462 @@
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.graph.library.similarity;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.common.operators.Order;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.functions.FunctionAnnotation;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
    +import org.apache.flink.graph.utils.Murmur3_32;
    +import org.apache.flink.types.CopyableValue;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.util.Collector;
    +import org.apache.flink.util.Preconditions;
    +import java.util.ArrayList;
    +import java.util.List;
    + * The Jaccard Index measures the similarity between vertex neighborhoods.
    + * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are 
    + * <br/>
    + * This implementation produces similarity scores for each pair of vertices
    + * in the graph with at least one common neighbor; equivalently, this is 
    + * set of all non-zero Jaccard Similarity coefficients.
    + * <br/>
    + * The input graph must be a simple, undirected graph containing no 
    + * edges or self-loops.
    + *
    + * @param <K> graph ID type
    + * @param <VV> vertex value type
    + * @param <EV> edge value type
    + */
    +public class JaccardIndex<K extends CopyableValue<K>, VV, EV>
    +implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
    +   public static final int DEFAULT_GROUP_SIZE = 64;
    +   // Optional configuration
    +   private int groupSize = DEFAULT_GROUP_SIZE;
    +   private boolean unboundedScores = true;
    +   private int minimumScoreNumerator = 0;
    +   private int minimumScoreDenominator = 1;
    +   private int maximumScoreNumerator = 1;
    +   private int maximumScoreDenominator = 0;
    +   private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
    +   /**
    +    * Override the default group size for the quadratic expansion of 
    +    * pairs. Small groups generate more data whereas large groups 
    +    * computation less evenly among tasks.
    +    *
    +    * @param groupSize the group size for the quadratic expansion of 
neighbor pairs
    +    * @return this
    +    */
    +   public JaccardIndex<K, VV, EV> setGroupSize(int groupSize) {
    +           Preconditions.checkArgument(groupSize > 0, "Group size must be 
greater than zero");
    +           this.groupSize = groupSize;
    +           return this;
    +   }
    +   /**
    +    * Filter out Jaccard Index scores less than the given minimum fraction.
    +    *
    +    * @param numerator numerator of the minimum score
    +    * @param denominator denominator of the minimum score
    +    * @return this
    +    * @see #setMaximumScore(int, int)
    +    */
    +   public JaccardIndex<K, VV, EV> setMinimumScore(int numerator, int 
denominator) {
    +           Preconditions.checkArgument(numerator >= 0, "Minimum score 
numerator must be non-negative");
    +           Preconditions.checkArgument(denominator > 0, "Minimum score 
denominator must be greater than zero");
    +           Preconditions.checkArgument(numerator <= denominator, "Minimum 
score fraction must be less than or equal to one");
    +           this.unboundedScores = false;
    +           this.minimumScoreNumerator = numerator;
    +           this.minimumScoreDenominator = denominator;
    +           return this;
    +   }
    +   /**
    +    * Filter out Jaccard Index scores greater than or equal to the given 
maximum fraction.
    +    *
    +    * @param numerator numerator of the maximum score
    +    * @param denominator denominator of the maximum score
    +    * @return this
    +    * @see #setMinimumScore(int, int)
    +    */
    +   public JaccardIndex<K, VV, EV> setMaximumScore(int numerator, int 
denominator) {
    +           Preconditions.checkArgument(numerator >= 0, "Maximum score 
numerator must be non-negative");
    +           Preconditions.checkArgument(denominator > 0, "Maximum score 
denominator must be greater than zero");
    +           Preconditions.checkArgument(numerator <= denominator, "Maximum 
score fraction must be less than or equal to one");
    +           this.unboundedScores = false;
    +           this.maximumScoreNumerator = numerator;
    +           this.maximumScoreDenominator = denominator;
    +           return this;
    +   }
    +   /**
    +    * Override the parallelism of operators processing small amounts of 
    +    *
    +    * @param littleParallelism operator parallelism
    +    * @return this
    +    */
    +   public JaccardIndex<K, VV, EV> setLittleParallelism(int 
littleParallelism) {
    +           this.littleParallelism = littleParallelism;
    +           return this;
    +   }
    +   /*
    +    * Implementation notes:
    +    *
    +    * The requirement that "K extends CopyableValue<K>" can be removed when
    +    *   Flink has a self-join which performs the skew distribution handled 
    +    *   GenerateGroupSpans / GenerateGroups / GenerateGroupPairs.
    +    */
    +   @Override
    +   public DataSet<Result<K>> run(Graph<K, VV, EV> input)
    +                   throws Exception {
    +           // s, t, d(t)
    +           DataSet<Edge<K, Tuple2<EV, LongValue>>> neighborDegree = input
    +                   .run(new EdgeTargetDegree<K, VV, EV>()
    +                           .setParallelism(littleParallelism));
    +           // group span, s, t, d(t)
    +           DataSet<Tuple4<IntValue, K, K, IntValue>> groupSpans = 
    +                   .groupBy(0)
    +                   .sortGroup(1, Order.ASCENDING)
    +                   .reduceGroup(new GenerateGroupSpans<K, EV>(groupSize))
    +                           .setParallelism(littleParallelism)
    +                           .name("Generate group spans");
    +           // group, s, t, d(t)
    +           DataSet<Tuple4<IntValue, K, K, IntValue>> groups = groupSpans
    +                   .rebalance()
    +                           .setParallelism(littleParallelism)
    +                           .name("Rebalance")
    +                   .flatMap(new GenerateGroups<K>())
    +                           .setParallelism(littleParallelism)
    +                           .name("Generate groups");
    +           // t, u, d(t)+d(u)
    +           DataSet<Tuple3<K, K, IntValue>> twoPaths = groups
    +                   .groupBy(0, 1)
    +                   .sortGroup(2, Order.ASCENDING)
    +                   .reduceGroup(new GenerateGroupPairs<K>(groupSize))
    +                           .name("Generate group pairs");
    +           // t, u, intersection, union
    +           return twoPaths
    +                   .groupBy(0, 1)
    +                   .reduceGroup(new ComputeScores<K>(unboundedScores,
    +                                   minimumScoreNumerator, 
    +                                   maximumScoreNumerator, 
    +                           .name("Compute scores");
    +   }
    +   /**
    +    * This is the first of three operations implementing a self-join to 
    +    * the full neighbor pairing for each vertex. The number of neighbor 
    +    * is (n choose 2) which is quadratic in the vertex degree.
    +    * <br/>
    +    * The third operation, {@link GenerateGroupPairs}, processes groups of 
    +    * {@link #groupSize} and emits {@code O(groupSize * deg(vertex))} 
    +    * <br/>
    +    * This input to the third operation is still quadratic in the vertex 
    +    * Two prior operations, {@link GenerateGroupSpans} and {@link 
    +    * each emit datasets linear in the vertex degree, with a forced 
    +    * in between. {@link GenerateGroupSpans} first annotates each edge 
with the
    +    * number of groups and {@link GenerateGroups} emits each edge into 
each group.
    --- End diff --
    👍 for the detailed comment!

> Jaccard Similarity
> ------------------
>                 Key: FLINK-3780
>                 URL: https://issues.apache.org/jira/browse/FLINK-3780
>             Project: Flink
>          Issue Type: New Feature
>          Components: Gelly
>    Affects Versions: 1.1.0
>            Reporter: Greg Hogan
>            Assignee: Greg Hogan
>             Fix For: 1.1.0
> Implement a Jaccard Similarity algorithm computing all non-zero similarity 
> scores. This algorithm is similar to {{TriangleListing}} but instead of 
> joining two-paths against an edge list we count two-paths.
> {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which 
> relies on {{Graph.getTriplets()}} so only computes similarity scores for 
> neighbors but not neighbors-of-neighbors.
> This algorithm is easily modified for other similarity scores such as 
> Adamic-Adar similarity where the sum of endpoint degrees is replaced by the 
> degree of the middle vertex.

This message was sent by Atlassian JIRA

Reply via email to