[ https://issues.apache.org/jira/browse/FLINK-5423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15824451#comment-15824451 ]
ASF GitHub Bot commented on FLINK-5423: --------------------------------------- Github user Fokko commented on a diff in the pull request: https://github.com/apache/flink/pull/3077#discussion_r96290365 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala --- @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.outlier + +/** An implementation of the Stochastic Outlier Selection algorithm by Jeroen Jansen + * + * For more information about SOS, see https://github.com/jeroenjanssens/sos + * J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic + * Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University, + * Tilburg, the Netherlands, 2012. + * + * @example + * {{{ + * val inputDS = env.fromCollection(List( + * LabeledVector(0.0, DenseVector(1.0, 1.0)), + * LabeledVector(1.0, DenseVector(2.0, 1.0)), + * LabeledVector(2.0, DenseVector(1.0, 2.0)), + * LabeledVector(3.0, DenseVector(2.0, 2.0)), + * LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier! + * )) + * + * val sos = StochasticOutlierSelection() + * .setPerplexity(3) + * + * val outputDS = sos.transform(inputDS) + * + * val expectedOutputDS = Array( + * 0.2790094479202896, + * 0.25775014551682535, + * 0.22136130977995766, + * 0.12707053787018444, + * 0.9922779902453757 // The outlier! + * ) + * + * assert(outputDS == expectedOutputDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.Perplexity]]: + * Perplexity can be interpreted as the k in k-nearest neighbor algorithms. The difference is that + * in SOS being a neighbor is not a binary property, but a probabilistic one. Should be between + * 1 and n-1, where n is the number of observations. + * (Default value: '''30''') + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.ErrorTolerance]]: + * The accepted error tolerance. When increasing this number, it will sacrifice accuracy in + * return for reduced computational time. + * (Default value: '''1e-20''') + * + * - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.MaxIterations]]: + * The maximum number of iterations to perform. (Default value: '''5000''') + */ + +import breeze.linalg.functions.euclideanDistance +import breeze.linalg.{sum, DenseVector => BreezeDenseVector, Vector => BreezeVector} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.utils._ +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap, WithParameters} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} +import org.apache.flink.ml.pipeline.{TransformDataSetOperation, Transformer} + +import scala.language.implicitConversions +import scala.reflect.ClassTag + +class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] { + + import StochasticOutlierSelection._ + + + /** Sets the perplexity of the outlier selection algorithm, can be seen as the k of kNN + * For more information, please read the Stochastic Outlier Selection algorithm paper + * + * @param perplexity the perplexity of the affinity fit + * @return + */ + def setPerplexity(perplexity: Double): StochasticOutlierSelection = { + require(perplexity >= 1, "Perplexity must be at least one.") + parameters.add(Perplexity, perplexity) + this + } + + /** The accepted error tolerance to save computational time when computing the affinity + * + * @param errorTolerance the accepted error tolerance with respect to the affinity + * @return + */ + def setErrorTolerance(errorTolerance: Double): StochasticOutlierSelection = { + require(errorTolerance >= 0, "Error tolerance cannot be negative.") + parameters.add(ErrorTolerance, errorTolerance) + this + } + + /** The maximum number of iterations to approximate the affinity + * + * @param maxIterations the maximum number of iterations + * @return + */ + def setMaxIterations(maxIterations: Int): StochasticOutlierSelection = { + require(maxIterations > 0, "Maximum iterations must be positive.") + parameters.add(MaxIterations, maxIterations) + this + } + +} + +object StochasticOutlierSelection extends WithParameters { + + // ========================================= Parameters ========================================== + case object Perplexity extends Parameter[Double] { + val defaultValue: Option[Double] = Some(30) + } + + case object ErrorTolerance extends Parameter[Double] { + val defaultValue: Option[Double] = Some(1e-20) + } + + case object MaxIterations extends Parameter[Int] { + val defaultValue: Option[Int] = Some(5000) + } + + // ==================================== Factory methods ========================================== + + def apply(): StochasticOutlierSelection = { + new StochasticOutlierSelection() + } + + // ===================================== Operations ============================================== + case class BreezeLabeledVector(idx: Int, data: BreezeVector[Double]) + + implicit val transformLabeledVectors = { + new TransformDataSetOperation[StochasticOutlierSelection, LabeledVector, (Int, Double)] { + override def transformDataSet(instance: StochasticOutlierSelection, + transformParameters: ParameterMap, + input: DataSet[LabeledVector]): DataSet[(Int, Double)] = { + + val resultingParameters = instance.parameters ++ transformParameters + + val vectorsWithIndex = input.map(labeledVector => { + BreezeLabeledVector(labeledVector.label.toInt, labeledVector.vector.asBreeze) + }) + + // Don't map back to a labeled-vector since the output of the algorithm is + // a single double instead of vector + outlierSelection(vectorsWithIndex, resultingParameters) + } + } + } + + /** [[TransformDataSetOperation]] applies the stochastic outlier selection algorithm on a + * [[Vector]] which will transform the high-dimensionaly input to a single Double output. + * + * @tparam T Type of the input and output data which has to be a subtype of [[Vector]] + * @return [[TransformDataSetOperation]] a single double which represents the oulierness of + * the input vectors, where the output is in [0, 1] + */ + implicit def transformVectors[T <: Vector : BreezeVectorConverter : TypeInformation : ClassTag] + = { + new TransformDataSetOperation[StochasticOutlierSelection, T, Double] { + override def transformDataSet(instance: StochasticOutlierSelection, + transformParameters: ParameterMap, + input: DataSet[T]): DataSet[Double] = { + + val resultingParameters = instance.parameters ++ transformParameters + + // Map to the right format + val vectorsWithIndex = input.zipWithIndex.map(vector => { + BreezeLabeledVector(vector._1.toInt, vector._2.asBreeze) + }) + + outlierSelection(vectorsWithIndex, resultingParameters).map(_._2) + } + } + } + + /** Internal entry point which will execute the different stages of the algorithm using a single + * interface + * + * @param inputVectors Input vectors on which the stochastic outlier selection algorithm + * will be applied + * @param transformParameters The user defined parameters of the algorithm + * @return The outlierness of the vectors compared to each other + */ + private def outlierSelection(inputVectors: DataSet[BreezeLabeledVector], + transformParameters: ParameterMap): DataSet[(Int, Double)] = { + val dissimilarityVectors = computeDissimilarityVectors(inputVectors) + val affinityVectors = computeAffinity(dissimilarityVectors, transformParameters) + val bindingProbabilityVectors = computeBindingProbabilities(affinityVectors) + val outlierProbability = computeOutlierProbability(bindingProbabilityVectors) + + outlierProbability + } + + /** Compute pair-wise distance from each vector, to all other vectors. + * + * @param inputVectors The input vectors, will compare the vector to all other vectors based + * on an distance method. + * @return Returns new set of [[BreezeLabeledVector]] with dissimilarity vector + */ + def computeDissimilarityVectors(inputVectors: DataSet[BreezeLabeledVector]): + DataSet[BreezeLabeledVector] = + inputVectors.cross(inputVectors) { + (a, b) => (a.idx, b.idx, euclideanDistance(a.data, b.data)) + }.filter(dist => dist._1 != dist._2) // Filter out the diagonal, this contains no information. + .groupBy(0) + .sortGroup(1, Order.ASCENDING) + .reduceGroup { + distancesIterator => { + val distances = distancesIterator.toList + val distanceVector = distances.map(_._3).toArray + + BreezeLabeledVector(distances.head._1, BreezeDenseVector(distanceVector)) + } + } + + /** Approximate the affinity by fitting a Gaussian-like function + * + * @param dissimilarityVectors The dissimilarity vectors which represents the distance to the + * other vectors in the data set. + * @param resultingParameters The user defined parameters of the algorithm + * @return Returns new set of [[BreezeLabeledVector]] with dissimilarity vector + */ + def computeAffinity(dissimilarityVectors: DataSet[BreezeLabeledVector], + resultingParameters: ParameterMap): DataSet[BreezeLabeledVector] = { + val logPerplexity = Math.log(resultingParameters(Perplexity)) + val maxIterations = resultingParameters(MaxIterations) + val errorTolerance = resultingParameters(ErrorTolerance) + + dissimilarityVectors.map(vec => { + val breezeVec = binarySearch(vec.data, logPerplexity, maxIterations, errorTolerance) + BreezeLabeledVector(vec.idx, breezeVec) + }) + } + + /** Normalizes the input vectors so each row sums up to one. + * + * @param affinityVectors The affinity vectors which is the quantification of the relationship + * between the original vectors. + * @return Returns new set of [[BreezeLabeledVector]] with represents the binding + * probabilities, which is in fact the affinity where each row sums up to one. + */ + def computeBindingProbabilities(affinityVectors: DataSet[BreezeLabeledVector]): + DataSet[BreezeLabeledVector] = + affinityVectors.map(vec => BreezeLabeledVector(vec.idx, vec.data :/ sum(vec.data))) + + /** Compute the final outlier probability by taking the product of the column. + * + * @param bindingProbabilityVectors The binding probability vectors where the binding + * probability is based on the affinity and represents the + * probability of a vector binding with another vector. + * @return Returns a single double which represents the final outlierness of the input vector. + */ + def computeOutlierProbability(bindingProbabilityVectors: DataSet[BreezeLabeledVector]): + DataSet[(Int, Double)] = bindingProbabilityVectors + .flatMap(vec => vec.data.toArray.zipWithIndex.map(pair => { + + // The DistanceMatrix removed the diagonal, but we need to compute the product + // of the column, so we need to correct the offset. + val columnIndex = if (pair._2 >= vec.idx) { + 1 + } else { + 0 + } + + (columnIndex + pair._2, pair._1) + })).groupBy(0).reduceGroup { + probabilities => { + var rowNumber = -1 + var outlierProbability = 1.0 + for (probability <- probabilities) { + rowNumber = probability._1 + outlierProbability = outlierProbability * (1.0 - probability._2) + } + + (rowNumber, outlierProbability) + } + } + + /** Performs a binary search to get affinities in such a way that each conditional Gaussian has + * the same perplexity. + * + * @param dissimilarityVector The input dissimilarity vector which represents the current + * vector distance to the other vectors in the data set + * @param logPerplexity The log of the perplexity, which represents the probability of having + * affinity with another vector. + * @param maxIterations The maximum iterations to limit the computational time. + * @param tolerance The allowed tolerance to sacrifice precision for decreased computational + * time. + * @param beta: The current beta + * @param betaMin The lower bound of beta + * @param betaMax The upper bound of beta + * @param iteration The current iteration + * @return Returns the affinity vector of the input vector. + */ + def binarySearch(dissimilarityVector: BreezeVector[Double], + logPerplexity: Double, + maxIterations: Int, + tolerance: Double, + beta: Double = 1.0, + betaMin: Double = Double.NegativeInfinity, + betaMax: Double = Double.PositiveInfinity, + iteration: Int = 0): BreezeVector[Double] = { --- End diff -- Check > Implement Stochastic Outlier Selection > -------------------------------------- > > Key: FLINK-5423 > URL: https://issues.apache.org/jira/browse/FLINK-5423 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library > Reporter: Fokko Driesprong > Assignee: Fokko Driesprong > > I've implemented the Stochastic Outlier Selection (SOS) algorithm by Jeroen > Jansen. > http://jeroenjanssens.com/2013/11/24/stochastic-outlier-selection.html > Integrated as much as possible with the components from the machine learning > library. > The algorithm itself has been compared to four other algorithms and it it > shows that SOS has a higher performance on most of these real-world datasets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)