Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63894348 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +/** + * The Jaccard Index measures the similarity between vertex neighborhoods. + * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common). + * <br/> + * This implementation produces similarity scores for each pair of vertices + * in the graph with at least one common neighbor; equivalently, this is the + * set of all non-zero Jaccard Similarity coefficients. + * <br/> + * The input graph must be a simple, undirected graph containing no duplicate + * edges or self-loops. + * + * @param <K> graph ID type + * @param <VV> vertex value type + * @param <EV> edge value type + */ +public class JaccardIndex<K extends CopyableValue<K>, VV, EV> +implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> { + + public static final int DEFAULT_GROUP_SIZE = 64; + + // Optional configuration + private int groupSize = DEFAULT_GROUP_SIZE; + + private boolean unboundedScores = true; + + private int minimumScoreNumerator = 0; + + private int minimumScoreDenominator = 1; + + private int maximumScoreNumerator = 1; + + private int maximumScoreDenominator = 0; + + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** + * Override the default group size for the quadratic expansion of neighbor + * pairs. Small groups generate more data whereas large groups distribute + * computation less evenly among tasks. + * + * @param groupSize the group size for the quadratic expansion of neighbor pairs + * @return this + */ + public JaccardIndex<K, VV, EV> setGroupSize(int groupSize) { + Preconditions.checkArgument(groupSize > 0, "Group size must be greater than zero"); + + this.groupSize = groupSize; + + return this; + } + + /** + * Filter out Jaccard Index scores less than the given minimum fraction. + * + * @param numerator numerator of the minimum score + * @param denominator denominator of the minimum score + * @return this + * @see #setMaximumScore(int, int) + */ + public JaccardIndex<K, VV, EV> setMinimumScore(int numerator, int denominator) { + Preconditions.checkArgument(numerator >= 0, "Minimum score numerator must be non-negative"); + Preconditions.checkArgument(denominator > 0, "Minimum score denominator must be greater than zero"); + Preconditions.checkArgument(numerator <= denominator, "Minimum score fraction must be less than or equal to one"); + + this.unboundedScores = false; + this.minimumScoreNumerator = numerator; + this.minimumScoreDenominator = denominator; + + return this; + } + + /** + * Filter out Jaccard Index scores greater than or equal to the given maximum fraction. + * + * @param numerator numerator of the maximum score + * @param denominator denominator of the maximum score + * @return this + * @see #setMinimumScore(int, int) + */ + public JaccardIndex<K, VV, EV> setMaximumScore(int numerator, int denominator) { --- End diff -- This one also and all other configuration methods.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---