I have already posted this question to the StackOverflow
<http://stackoverflow.com/questions/43263942/how-to-convert-spark-mllib-vector-to-ml-vector>.
However, not getting any response from someone else. I'm trying to use
RandomForest algorithm for the classification after applying the PCA
technique since the dataset is pretty high-dimensional. Here's my source
code:
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.ml.linalg.{Vectors, VectorUDT}
import org.apache.spark.sql._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.PCA
import org.apache.spark.rdd.RDD
object PCAExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.config("spark.sql.warehouse.dir", "E:/Exp/")
.appName(s"OneVsRestExample")
.getOrCreate()
val dataset = MLUtils.loadLibSVMFile(spark.sparkContext, "data/mnist.bz2")
val splits = dataset.randomSplit(Array(0.7, 0.3), seed = 12345L)
val (trainingData, testData) = (splits(0), splits(1))
val sqlContext = new SQLContext(spark.sparkContext)
import sqlContext.implicits._
val trainingDF = trainingData.toDF("label", "features")
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(100)
.fit(trainingDF)
val pcaTrainingData = pca.transform(trainingDF)
//pcaTrainingData.show()
val labeled = pca.transform(trainingDF).rdd.map(row => LabeledPoint(
row.getAs[Double]("label"),
row.getAs[org.apache.spark.mllib.linalg.Vector]("pcaFeatures")))
//val labeled = pca.transform(trainingDF).rdd.map(row =>
LabeledPoint(row.getAs[Double]("label"),
//
Vector.fromML(row.getAs[org.apache.spark.ml.linalg.SparseVector]("features"))))
val numClasses = 10
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 10 // Use more in practice.
val featureSubsetStrategy = "auto" // Let the algorithm choose.
val impurity = "gini"
val maxDepth = 20
val maxBins = 32
val model = RandomForest.trainClassifier(labeled, numClasses,
categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
}
}
However, I'm getting the following error:
*Exception in thread "main" java.lang.IllegalArgumentException: requirement
failed: Column features must be of type
org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually
org.apache.spark.mllib.linalg.VectorUDT@f71b0bce.*
What am I doing wrong in my code? Actually, I'm getting the above
exception in this line:
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(100)
.fit(trainingDF) /// GETTING EXCEPTION HERE
Please, someone, help me to solve the problem.
Kind regards,
*Md. Rezaul Karim*