[ 
https://issues.apache.org/jira/browse/FLINK-5423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15824451#comment-15824451
 ] 

ASF GitHub Bot commented on FLINK-5423:
---------------------------------------

Github user Fokko commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3077#discussion_r96290365
  
    --- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala
 ---
    @@ -0,0 +1,367 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.outlier
    +
    +/** An implementation of the Stochastic Outlier Selection algorithm by 
Jeroen Jansen
    +  *
    +  * For more information about SOS, see 
https://github.com/jeroenjanssens/sos
    +  * J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. 
Stochastic
    +  * Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg 
University,
    +  * Tilburg, the Netherlands, 2012.
    +  *
    +  * @example
    +  *          {{{
    +  *             val inputDS = env.fromCollection(List(
    +  *               LabeledVector(0.0, DenseVector(1.0, 1.0)),
    +  *               LabeledVector(1.0, DenseVector(2.0, 1.0)),
    +  *               LabeledVector(2.0, DenseVector(1.0, 2.0)),
    +  *               LabeledVector(3.0, DenseVector(2.0, 2.0)),
    +  *               LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier!
    +  *             ))
    +  *
    +  *             val sos = StochasticOutlierSelection()
    +  *               .setPerplexity(3)
    +  *
    +  *             val outputDS = sos.transform(inputDS)
    +  *
    +  *             val expectedOutputDS = Array(
    +  *                0.2790094479202896,
    +  *                0.25775014551682535,
    +  *                0.22136130977995766,
    +  *                0.12707053787018444,
    +  *                0.9922779902453757 // The outlier!
    +  *             )
    +  *
    +  *             assert(outputDS == expectedOutputDS)
    +  *          }}}
    +  *
    +  * =Parameters=
    +  *
    +  *  - 
[[org.apache.flink.ml.outlier.StochasticOutlierSelection.Perplexity]]:
    +  *  Perplexity can be interpreted as the k in k-nearest neighbor 
algorithms. The difference is that
    +  *  in SOS being a neighbor is not a binary property, but a probabilistic 
one. Should be between
    +  *  1 and n-1, where n is the number of observations.
    +  *  (Default value: '''30''')
    +  *
    +  *  - 
[[org.apache.flink.ml.outlier.StochasticOutlierSelection.ErrorTolerance]]:
    +  *  The accepted error tolerance. When increasing this number, it will 
sacrifice accuracy in
    +  *  return for reduced computational time.
    +  *  (Default value: '''1e-20''')
    +  *
    +  *  - 
[[org.apache.flink.ml.outlier.StochasticOutlierSelection.MaxIterations]]:
    +  *  The maximum number of iterations to perform. (Default value: 
'''5000''')
    +  */
    +
    +import breeze.linalg.functions.euclideanDistance
    +import breeze.linalg.{sum, DenseVector => BreezeDenseVector, Vector => 
BreezeVector}
    +import org.apache.flink.api.common.operators.Order
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.utils._
    +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap, 
WithParameters}
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
    +import org.apache.flink.ml.pipeline.{TransformDataSetOperation, 
Transformer}
    +
    +import scala.language.implicitConversions
    +import scala.reflect.ClassTag
    +
    +class StochasticOutlierSelection extends 
Transformer[StochasticOutlierSelection] {
    +
    +  import StochasticOutlierSelection._
    +
    +
    +  /** Sets the perplexity of the outlier selection algorithm, can be seen 
as the k of kNN
    +    * For more information, please read the Stochastic Outlier Selection 
algorithm paper
    +    *
    +    * @param perplexity the perplexity of the affinity fit
    +    * @return
    +    */
    +  def setPerplexity(perplexity: Double): StochasticOutlierSelection = {
    +    require(perplexity >= 1, "Perplexity must be at least one.")
    +    parameters.add(Perplexity, perplexity)
    +    this
    +  }
    +
    +  /** The accepted error tolerance to save computational time when 
computing the affinity
    +    *
    +    * @param errorTolerance the accepted error tolerance with respect to 
the affinity
    +    * @return
    +    */
    +  def setErrorTolerance(errorTolerance: Double): 
StochasticOutlierSelection = {
    +    require(errorTolerance >= 0, "Error tolerance cannot be negative.")
    +    parameters.add(ErrorTolerance, errorTolerance)
    +    this
    +  }
    +
    +  /** The maximum number of iterations to approximate the affinity
    +    *
    +    * @param maxIterations the maximum number of iterations
    +    * @return
    +    */
    +  def setMaxIterations(maxIterations: Int): StochasticOutlierSelection = {
    +    require(maxIterations > 0, "Maximum iterations must be positive.")
    +    parameters.add(MaxIterations, maxIterations)
    +    this
    +  }
    +
    +}
    +
    +object StochasticOutlierSelection extends WithParameters {
    +
    +  // ========================================= Parameters 
==========================================
    +  case object Perplexity extends Parameter[Double] {
    +    val defaultValue: Option[Double] = Some(30)
    +  }
    +
    +  case object ErrorTolerance extends Parameter[Double] {
    +    val defaultValue: Option[Double] = Some(1e-20)
    +  }
    +
    +  case object MaxIterations extends Parameter[Int] {
    +    val defaultValue: Option[Int] = Some(5000)
    +  }
    +
    +  // ==================================== Factory methods 
==========================================
    +
    +  def apply(): StochasticOutlierSelection = {
    +    new StochasticOutlierSelection()
    +  }
    +
    +  // ===================================== Operations 
==============================================
    +  case class BreezeLabeledVector(idx: Int, data: BreezeVector[Double])
    +
    +  implicit val transformLabeledVectors = {
    +    new TransformDataSetOperation[StochasticOutlierSelection, 
LabeledVector, (Int, Double)] {
    +      override def transformDataSet(instance: StochasticOutlierSelection,
    +                                    transformParameters: ParameterMap,
    +                                    input: DataSet[LabeledVector]): 
DataSet[(Int, Double)] = {
    +
    +        val resultingParameters = instance.parameters ++ 
transformParameters
    +
    +        val vectorsWithIndex = input.map(labeledVector => {
    +          BreezeLabeledVector(labeledVector.label.toInt, 
labeledVector.vector.asBreeze)
    +        })
    +
    +        // Don't map back to a labeled-vector since the output of the 
algorithm is
    +        // a single double instead of vector
    +        outlierSelection(vectorsWithIndex, resultingParameters)
    +      }
    +    }
    +  }
    +
    +  /** [[TransformDataSetOperation]] applies the stochastic outlier 
selection algorithm on a
    +    * [[Vector]] which will transform the high-dimensionaly input to a 
single Double output.
    +    *
    +    * @tparam T Type of the input and output data which has to be a 
subtype of [[Vector]]
    +    * @return [[TransformDataSetOperation]] a single double which 
represents the oulierness of
    +    *         the input vectors, where the output is in [0, 1]
    +    */
    +  implicit def transformVectors[T <: Vector : BreezeVectorConverter : 
TypeInformation : ClassTag]
    +  = {
    +    new TransformDataSetOperation[StochasticOutlierSelection, T, Double] {
    +      override def transformDataSet(instance: StochasticOutlierSelection,
    +                                    transformParameters: ParameterMap,
    +                                    input: DataSet[T]): DataSet[Double] = {
    +
    +        val resultingParameters = instance.parameters ++ 
transformParameters
    +
    +        // Map to the right format
    +        val vectorsWithIndex = input.zipWithIndex.map(vector => {
    +          BreezeLabeledVector(vector._1.toInt, vector._2.asBreeze)
    +        })
    +
    +        outlierSelection(vectorsWithIndex, resultingParameters).map(_._2)
    +      }
    +    }
    +  }
    +
    +  /** Internal entry point which will execute the different stages of the 
algorithm using a single
    +    * interface
    +    *
    +    * @param inputVectors        Input vectors on which the stochastic 
outlier selection algorithm
    +    *                            will be applied
    +    * @param transformParameters The user defined parameters of the 
algorithm
    +    * @return The outlierness of the vectors compared to each other
    +    */
    +  private def outlierSelection(inputVectors: DataSet[BreezeLabeledVector],
    +                               transformParameters: ParameterMap): 
DataSet[(Int, Double)] = {
    +    val dissimilarityVectors = computeDissimilarityVectors(inputVectors)
    +    val affinityVectors = computeAffinity(dissimilarityVectors, 
transformParameters)
    +    val bindingProbabilityVectors = 
computeBindingProbabilities(affinityVectors)
    +    val outlierProbability = 
computeOutlierProbability(bindingProbabilityVectors)
    +
    +    outlierProbability
    +  }
    +
    +  /** Compute pair-wise distance from each vector, to all other vectors.
    +    *
    +    * @param inputVectors The input vectors, will compare the vector to 
all other vectors based
    +    *                     on an distance method.
    +    * @return Returns new set of [[BreezeLabeledVector]] with 
dissimilarity vector
    +    */
    +  def computeDissimilarityVectors(inputVectors: 
DataSet[BreezeLabeledVector]):
    +  DataSet[BreezeLabeledVector] =
    +  inputVectors.cross(inputVectors) {
    +    (a, b) => (a.idx, b.idx, euclideanDistance(a.data, b.data))
    +  }.filter(dist => dist._1 != dist._2) // Filter out the diagonal, this 
contains no information.
    +    .groupBy(0)
    +    .sortGroup(1, Order.ASCENDING)
    +    .reduceGroup {
    +      distancesIterator => {
    +        val distances = distancesIterator.toList
    +        val distanceVector = distances.map(_._3).toArray
    +
    +        BreezeLabeledVector(distances.head._1, 
BreezeDenseVector(distanceVector))
    +      }
    +    }
    +
    +  /** Approximate the affinity by fitting a Gaussian-like function
    +    *
    +    * @param dissimilarityVectors The dissimilarity vectors which 
represents the distance to the
    +    *                             other vectors in the data set.
    +    * @param resultingParameters  The user defined parameters of the 
algorithm
    +    * @return Returns new set of [[BreezeLabeledVector]] with 
dissimilarity vector
    +    */
    +  def computeAffinity(dissimilarityVectors: DataSet[BreezeLabeledVector],
    +                      resultingParameters: ParameterMap): 
DataSet[BreezeLabeledVector] = {
    +    val logPerplexity = Math.log(resultingParameters(Perplexity))
    +    val maxIterations = resultingParameters(MaxIterations)
    +    val errorTolerance = resultingParameters(ErrorTolerance)
    +
    +    dissimilarityVectors.map(vec => {
    +      val breezeVec = binarySearch(vec.data, logPerplexity, maxIterations, 
errorTolerance)
    +      BreezeLabeledVector(vec.idx, breezeVec)
    +    })
    +  }
    +
    +  /** Normalizes the input vectors so each row sums up to one.
    +    *
    +    * @param affinityVectors The affinity vectors which is the 
quantification of the relationship
    +    *                        between the original vectors.
    +    * @return Returns new set of [[BreezeLabeledVector]] with represents 
the binding
    +    *         probabilities, which is in fact the affinity where each row 
sums up to one.
    +    */
    +  def computeBindingProbabilities(affinityVectors: 
DataSet[BreezeLabeledVector]):
    +  DataSet[BreezeLabeledVector] =
    +  affinityVectors.map(vec => BreezeLabeledVector(vec.idx, vec.data :/ 
sum(vec.data)))
    +
    +  /** Compute the final outlier probability by taking the product of the 
column.
    +    *
    +    * @param bindingProbabilityVectors The binding probability vectors 
where the binding
    +    *                                  probability is based on the 
affinity and represents the
    +    *                                  probability of a vector binding 
with another vector.
    +    * @return Returns a single double which represents the final 
outlierness of the input vector.
    +    */
    +  def computeOutlierProbability(bindingProbabilityVectors: 
DataSet[BreezeLabeledVector]):
    +  DataSet[(Int, Double)] = bindingProbabilityVectors
    +    .flatMap(vec => vec.data.toArray.zipWithIndex.map(pair => {
    +
    +      // The DistanceMatrix removed the diagonal, but we need to compute 
the product
    +      // of the column, so we need to correct the offset.
    +      val columnIndex = if (pair._2 >= vec.idx) {
    +        1
    +      } else {
    +        0
    +      }
    +
    +      (columnIndex + pair._2, pair._1)
    +    })).groupBy(0).reduceGroup {
    +    probabilities => {
    +      var rowNumber = -1
    +      var outlierProbability = 1.0
    +      for (probability <- probabilities) {
    +        rowNumber = probability._1
    +        outlierProbability = outlierProbability * (1.0 - probability._2)
    +      }
    +
    +      (rowNumber, outlierProbability)
    +    }
    +  }
    +
    +  /** Performs a binary search to get affinities in such a way that each 
conditional Gaussian has
    +    *  the same perplexity.
    +    *
    +    * @param dissimilarityVector The input dissimilarity vector which 
represents the current
    +    *                            vector distance to the other vectors in 
the data set
    +    * @param logPerplexity The log of the perplexity, which represents the 
probability of having
    +    *                      affinity with another vector.
    +    * @param maxIterations The maximum iterations to limit the 
computational time.
    +    * @param tolerance The allowed tolerance to sacrifice precision for 
decreased computational
    +    *                  time.
    +    * @param beta: The current beta
    +    * @param betaMin The lower bound of beta
    +    * @param betaMax The upper bound of beta
    +    * @param iteration The current iteration
    +    * @return Returns the affinity vector of the input vector.
    +    */
    +  def binarySearch(dissimilarityVector: BreezeVector[Double],
    +                   logPerplexity: Double,
    +                   maxIterations: Int,
    +                   tolerance: Double,
    +                   beta: Double = 1.0,
    +                   betaMin: Double = Double.NegativeInfinity,
    +                   betaMax: Double = Double.PositiveInfinity,
    +                   iteration: Int = 0): BreezeVector[Double] = {
    --- End diff --
    
    Check


> Implement Stochastic Outlier Selection
> --------------------------------------
>
>                 Key: FLINK-5423
>                 URL: https://issues.apache.org/jira/browse/FLINK-5423
>             Project: Flink
>          Issue Type: Improvement
>          Components: Machine Learning Library
>            Reporter: Fokko Driesprong
>            Assignee: Fokko Driesprong
>
> I've implemented the Stochastic Outlier Selection (SOS) algorithm by Jeroen 
> Jansen.
> http://jeroenjanssens.com/2013/11/24/stochastic-outlier-selection.html
> Integrated as much as possible with the components from the machine learning 
> library.
> The algorithm itself has been compared to four other algorithms and it it 
> shows that SOS has a higher performance on most of these real-world datasets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to