Hello all,
I'm using Spark for medium to large datasets regression analysis and its
performance are very great when using random forest or decision trees.
Continuing my experimentation, I started using GBTRegressor and am
finding it extremely slow when compared to R while both other methods
were very fast.
Two examples to illustrate :
- on a 300k lines dataset, R takes 3 minutes and GBTRegressor 15 to
process 2000 iterations, maxdepth = 1, MinInstancesPerNode = 50
- on a 3M lines dataset, R takes 3 minutes and GBTRegressor 47 to
process 10 iterations, maxdepth = 2, MinInstancesPerNode = 50
I placed the code for the first example at the end of this message.
For the 300k dataset, I understand that there is a setup cost associated
to Spark which means that small datasets may not be processed as
efficiently as in R, even if my testing with DecisionTree and
RandomForest shows otherwise.
When I look at CPU usage for the GBT, it has spikes at 90% CPU usage (7
out of 8 cores) for relatively short bursts and then goes back to 8/10%
(less than one core) for quite a while.
Comparing to R that takes 1 core for its full 3 minutes run, it's quite
surprising
What have I missed in my setup?
I've been told that the behavior I'm observing may be related to data
skewness, but I'm not sure what's at hand here.
From my untrained eye, it looks as if there was an issue in the
GBTRegressor class, but I can't figure it out.
Any help would be most welcome.
Regards
=============================
R code
train <- read.table("c:/Path/to/file.csv", header=T, sep=";",dec=".")
train$X1 <- factor(train$X1)
train$X2 <- factor(train$X2)
train$X3 <- factor(train$X3)
train$X4 <- factor(train$X4)
train$X5 <- factor(train$X5)
train$X6 <- factor(train$X6)
train$X7 <- factor(train$X7)
train$X8 <- factor(train$X8)
train$X9 <- factor(train$X9)
library(gbm)
boost <- gbm(Freq~X1+X2+X3+X4+X5+X6+X7+X8+X9+Y1, distribution =
"gaussian", data = train, n.trees = 2000, bag.fraction = 1, shrinkY1 =
1, interaction.depth = 1, n.minobsinnode = 50, train.fraction = 1.0,
cv.folds = 0, keep.data = TRUE)
=============================
scala code for Spark
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.regression.GBTRegressor
val conf = new SparkConf()
.setAppName("GBTExample")
.set("spark.driver.memory", "8g")
.set("spark.executor.memory", "8g")
.set("spark.network.timeout", "120s")
val sc = SparkContext.getOrCreate(conf.setMaster("local[8]"))
val spark = new SparkSession.Builder().getOrCreate()
import spark.implicits._
val sourceData = spark.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("delimiter", ";")
.option("inferSchema", "true")
.load("c:/Path/to/file.csv")
val data = sourceData.select($"X1", $"X2", $"X3", $"X4", $"X5", $"X6",
$"X7", $"X8", $"X9", $"Y1".cast("double"), $"Freq".cast("double"))
val X1Indexer = new StringIndexer().setInputCol("X1").setOutputCol("X1Idx")
val X2Indexer = new StringIndexer().setInputCol("X2").setOutputCol("X2Idx")
val X3Indexer = new StringIndexer().setInputCol("X3").setOutputCol("X3Idx")
val X4Indexer = new StringIndexer().setInputCol("X4").setOutputCol("X4Idx")
val X5Indexer = new StringIndexer().setInputCol("X5").setOutputCol("X5Idx")
val X6Indexer = new StringIndexer().setInputCol("X6").setOutputCol("X6Idx")
val X7Indexer = new StringIndexer().setInputCol("X7").setOutputCol("X7Idx")
val X8Indexer = new StringIndexer().setInputCol("X8").setOutputCol("X8Idx")
val X9Indexer = new StringIndexer().setInputCol("X9").setOutputCol("X9Idx")
val assembler = new VectorAssembler()
.setInputCols(Array("X1Idx", "X2Idx", "X3Idx", "X4Idx", "X5Idx",
"X6Idx", "X7Idx", "X8Idx", "X9Idx", "Y1"))
.setOutputCol("features")
val dt = new GBTRegressor()
.setLabelCol("Freq")
.setFeaturesCol("features")
.setImpurity("variance")
.setMaxIter(2000)
.setMinInstancesPerNode(50)
.setMaxDepth(1)
.setStepSize(1)
.setSubsamplingRate(1)
.setMaxBins(32)
val pipeline = new Pipeline()
.setStages(Array(X1Indexer, X2Indexer, X3Indexer, X4Indexer,
X5Indexer, X6Indexer, X7Indexer, X8Indexer, X9Indexer, assembler, dt))
val model = pipeline.fit(data)
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org