I think this might be problematic with the current way we define the
predict operations because they require that both the Testing and
PredictionValue types are available.

Here's what I had to do to get it to work (in ml/pipeline/Predictor.scala):

import org.apache.flink.ml.math.{Vector => FlinkVector}
implicit def labeledVectorEvaluateDataSetOperation[
Instance <: Estimator[Instance],
Model,
FlinkVector,
Double](
implicit predictOperation: PredictOperation[Instance, Model,
FlinkVector, Double],
  testingTypeInformation: TypeInformation[FlinkVector],
  predictionValueTypeInformation: TypeInformation[Double])
: EvaluateDataSetOperation[Instance, LabeledVector, Double] = {
  new EvaluateDataSetOperation[Instance, LabeledVector, Double] {
    override def evaluateDataSet(
                                  instance: Instance,
                                  evaluateParameters: ParameterMap,
                                  testing: DataSet[LabeledVector])
    : DataSet[(Double,  Double)] = {
      val resultingParameters = instance.parameters ++ evaluateParameters
      val model = predictOperation.getModel(instance, resultingParameters)

      implicit val resultTypeInformation =
createTypeInformation[(FlinkVector, Double)]

      testing.mapWithBcVariable(model){
        (element, model) => {
          (element.label.asInstanceOf[Double],
predictOperation.predict(element.vector.asInstanceOf[FlinkVector],
model))
        }
      }
    }
  }
}

I'm not a fan of casting objects, but the compiler complains here otherwise.

Maybe someone has some input as to why the casting is necessary here, given
that the underlying types are correct? Probably has to do with some type
erasure I'm not seeing here.

--Theo

On Wed, Oct 19, 2016 at 10:30 PM, Thomas FOURNIER <
thomasfournier...@gmail.com> wrote:

> Hi,
>
> Two questions:
>
> 1- I was thinking of doing this:
>
> implicit def evaluateLabeledVector[T <: LabeledVector] = {
>
>   new EvaluateDataSetOperation[SVM,T,Double]() {
>
>     override def evaluateDataSet(instance: SVM, evaluateParameters:
> ParameterMap, testing: DataSet[T]): DataSet[(Double, Double)] = {
>       val predictor = ...
>       testing.map(l => (l.label, predictor.predict(l.vector)))
>
>     }
>   }
> }
>
> How can I access to my predictor object (predictor has type
> PredictOperation[SVM, DenseVector, T, Double]) ?
>
> 2- My first idea was to develop a predictOperation[T <: LabeledVector]
> so that I could use implicit def defaultEvaluateDatasetOperation
>
> to get an EvaluateDataSetOperationObject. Is it also valid or not ?
>
> Thanks
> Regards
>
> Thomas
>
>
>
>
>
>
>
> 2016-10-19 16:26 GMT+02:00 Theodore Vasiloudis <
> theodoros.vasilou...@gmail.com>:
>
> > Hello Thomas,
> >
> > since you are calling evaluate here, you should be creating an
> > EvaluateDataSet operation that works with LabeledVector, I see you are
> > creating a new PredictOperation.
> >
> > On Wed, Oct 19, 2016 at 3:05 PM, Thomas FOURNIER <
> > thomasfournier...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > I'd like to improve SVM evaluate function so that it can use
> > LabeledVector
> > > (and not only Vector).
> > > Indeed, what is done in test is the following (data is a
> > > DataSet[LabeledVector]):
> > >
> > > val test = data.map(l => (l.vector, l.label))
> > > svm.evaluate(test)
> > >
> > > We would like to do:
> > > sm.evaluate(data)
> > >
> > >
> > > Adding this "new" code:
> > >
> > > implicit def predictLabeledPoint[T <: LabeledVector] = {
> > >  new PredictOperation  ...
> > > }
> > >
> > > gives me a predictOperation that should be used with
> > > defaultEvaluateDataSetOperation
> > > with the correct signature (ie with T <: LabeledVector and not T<:
> > Vector).
> > >
> > > Nonetheless, tests are failing:
> > >
> > >
> > > it should "predict with LabeledDataPoint" in {
> > >
> > >   val env = ExecutionEnvironment.getExecutionEnvironment
> > >
> > >   val svm = SVM().
> > >     setBlocks(env.getParallelism).
> > >     setIterations(100).
> > >     setLocalIterations(100).
> > >     setRegularization(0.002).
> > >     setStepsize(0.1).
> > >     setSeed(0)
> > >
> > >   val trainingDS = env.fromCollection(Classification.trainingData)
> > >   svm.fit(trainingDS)
> > >   val predictionPairs = svm.evaluate(trainingDS)
> > >
> > >   ....
> > > }
> > >
> > > There is no PredictOperation defined for
> > > org.apache.flink.ml.classification.SVM which takes a
> > > DataSet[org.apache.flink.ml.common.LabeledVector] as input.
> > > java.lang.RuntimeException: There is no PredictOperation defined for
> > > org.apache.flink.ml.classification.SVM which takes a
> > > DataSet[org.apache.flink.ml.common.LabeledVector] as input.
> > >
> > >
> > >
> > > Thanks
> > >
> > > Regards
> > > Thomas
> > >
> >
>

Reply via email to