Hello all,

I'm using Spark for medium to large datasets regression analysis and its performance are very great when using random forest or decision trees. Continuing my experimentation, I started using GBTRegressor and am finding it extremely slow when compared to R while both other methods were very fast.
Two examples to illustrate :
- on a 300k lines dataset, R takes 3 minutes and GBTRegressor 15 to process 2000 iterations, maxdepth = 1, MinInstancesPerNode = 50 - on a 3M lines dataset, R takes 3 minutes and GBTRegressor 47 to process 10 iterations, maxdepth = 2, MinInstancesPerNode = 50

I placed the code for the first example at the end of this message.

For the 300k dataset, I understand that there is a setup cost associated to Spark which means that small datasets may not be processed as efficiently as in R, even if my testing with DecisionTree and RandomForest shows otherwise. When I look at CPU usage for the GBT, it has spikes at 90% CPU usage (7 out of 8 cores) for relatively short bursts and then goes back to 8/10% (less than one core) for quite a while. Comparing to R that takes 1 core for its full 3 minutes run, it's quite surprising
What have I missed in my setup?

I've been told that the behavior I'm observing may be related to data skewness, but I'm not sure what's at hand here.

From my untrained eye, it looks as if there was an issue in the GBTRegressor class, but I can't figure it out.

Any help would be most welcome.

Regards

=============================
R code

train <- read.table("c:/Path/to/file.csv", header=T, sep=";",dec=".")
train$X1 <- factor(train$X1)
train$X2 <- factor(train$X2)
train$X3 <- factor(train$X3)
train$X4 <- factor(train$X4)
train$X5 <- factor(train$X5)
train$X6 <- factor(train$X6)
train$X7 <- factor(train$X7)
train$X8 <- factor(train$X8)
train$X9 <- factor(train$X9)

library(gbm)
boost <- gbm(Freq~X1+X2+X3+X4+X5+X6+X7+X8+X9+Y1, distribution = "gaussian", data = train, n.trees = 2000, bag.fraction = 1, shrinkY1 = 1, interaction.depth = 1, n.minobsinnode = 50, train.fraction = 1.0, cv.folds = 0, keep.data = TRUE)

=============================
scala code for Spark

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.regression.GBTRegressor

val conf = new SparkConf()
  .setAppName("GBTExample")
  .set("spark.driver.memory", "8g")
  .set("spark.executor.memory", "8g")
  .set("spark.network.timeout", "120s")
val sc = SparkContext.getOrCreate(conf.setMaster("local[8]"))
val spark = new SparkSession.Builder().getOrCreate()
import spark.implicits._

val sourceData = spark.read.format("com.databricks.spark.csv")
  .option("header", "true")
  .option("delimiter", ";")
  .option("inferSchema", "true")
  .load("c:/Path/to/file.csv")

val data = sourceData.select($"X1", $"X2", $"X3", $"X4", $"X5", $"X6", $"X7", $"X8", $"X9", $"Y1".cast("double"), $"Freq".cast("double"))

val X1Indexer = new StringIndexer().setInputCol("X1").setOutputCol("X1Idx")
val X2Indexer = new StringIndexer().setInputCol("X2").setOutputCol("X2Idx")
val X3Indexer = new StringIndexer().setInputCol("X3").setOutputCol("X3Idx")
val X4Indexer = new StringIndexer().setInputCol("X4").setOutputCol("X4Idx")
val X5Indexer = new StringIndexer().setInputCol("X5").setOutputCol("X5Idx")
val X6Indexer = new StringIndexer().setInputCol("X6").setOutputCol("X6Idx")
val X7Indexer = new StringIndexer().setInputCol("X7").setOutputCol("X7Idx")
val X8Indexer = new StringIndexer().setInputCol("X8").setOutputCol("X8Idx")
val X9Indexer = new StringIndexer().setInputCol("X9").setOutputCol("X9Idx")

val assembler = new VectorAssembler()
.setInputCols(Array("X1Idx", "X2Idx", "X3Idx", "X4Idx", "X5Idx", "X6Idx", "X7Idx", "X8Idx", "X9Idx", "Y1"))
  .setOutputCol("features")

val dt = new GBTRegressor()
  .setLabelCol("Freq")
  .setFeaturesCol("features")
  .setImpurity("variance")
  .setMaxIter(2000)
  .setMinInstancesPerNode(50)
  .setMaxDepth(1)
  .setStepSize(1)
  .setSubsamplingRate(1)
  .setMaxBins(32)

val pipeline = new Pipeline()
.setStages(Array(X1Indexer, X2Indexer, X3Indexer, X4Indexer, X5Indexer, X6Indexer, X7Indexer, X8Indexer, X9Indexer, assembler, dt))

val model = pipeline.fit(data)

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to