Hi;
I am trying to train Random forest classifier. I have predefined classification set (classifications.csv , ~300.000 line) While fitting, i am getting "Size exceeds Integer.MAX_VALUE" error. Here is the code: object Test1 { var savePath = "c:/Temp/SparkModel/" var stemmer = Resha.Instance var STOP_WORDS: Set[String] = Set() def cropSentence(s: String) = { s.replaceAll("\\([^\\)]*\\)", "") .replaceAll(" - ", " ") .replaceAll("-", " ") .replaceAll(" +", " ") .replaceAll(",", " ").trim() } def main(args: Array[String]): Unit = { val sc = new SparkConf().setAppName("Test").setMaster("local[*]") .set("spark.sql.warehouse.dir", "D:/Temp/wh") .set("spark.executor.memory", "12g") .set("spark.driver.memory", "4g") .set("spark.hadoop.validateOutputSpecs", "false") val spark = SparkSession.builder.appName("Java Spark").config(sc).getOrCreate() import spark.implicits._ val mainDataset = spark.sparkContext.textFile("file:///C:/Temp/classifications.csv") .map( _.split(";")) .map(tokens => { var list=new ListBuffer[String]() var token0=cropSentence(tokens(0).toLowerCase(Locale.forLanguageTag("TR-tr"))); token0.split("\\s+").map {list+=stemmer.stem(_)} (tokens(1), tokens(0),list.toList.mkString(" ")) }).toDF("className","productName") val classIndexer = new StringIndexer() .setInputCol("className") .setOutputCol("label") val classIndexerModel = classIndexer.fit(mainDataset) var mainDS=classIndexerModel.transform(mainDataset) classIndexerModel.write.overwrite.save(savePath + "ClassIndexer") //Tokenizer val tokenizer = new Tokenizer() .setInputCol("productName") .setOutputCol("words_nonfiltered") //StopWords val remover = new StopWordsRemover() .setInputCol("words_nonfiltered") .setOutputCol("words") .setStopWords( Array[String]("stop1","stop2","stop3")) //CountVectorize val countVectorizer = new CountVectorizer() .setInputCol("words") .setOutputCol("features") val rfc = new RandomForestClassifier () .setLabelCol("label") .setNumTrees(50) .setMaxDepth(15) .setFeatureSubsetStrategy("auto") .setFeaturesCol("features") .setImpurity("gini") .setMaxBins(32) val pipeline = new Pipeline().setStages(Array(tokenizer,remover,countVectorizer,rfc)) val train =mainDS val model = pipeline.fit(train) <============= EXCEPTION model.write.overwrite.save(savePath+"RandomForestClassifier") } } 16/10/23 19:10:37 INFO scheduler.DAGScheduler: Job 8 failed: collectAsMap at RandomForest.scala:550, took 848.552917 s Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 13.0 failed 1 times, most recent failure: Lost task 1.0 in stage 13.0 (TID 36, localhost): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala :103) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala :91) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105) at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:438) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:674 ) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11 42) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6 17) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSchedu ler$$failJobAndIndependentStages(DAGScheduler.scala:1454) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSched uler.scala:1442) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSched uler.scala:1441) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:5 9) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply (DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply (DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.sca la:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGSched uler.scala:1667) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGSchedul er.scala:1622) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGSchedul er.scala:1611) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:15 1) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:11 2) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.collect(RDD.scala:911) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDF unctions.scala:745) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDF unctions.scala:744) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:15 1) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:11 2) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:74 4) at org.apache.spark.ml.tree.impl.RandomForest$.findBestSplits(RandomForest.scal a:550) at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:187) at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForest Classifier.scala:118) at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForest Classifier.scala:45) at org.apache.spark.ml.Predictor.fit(Predictor.scala:90) at org.apache.spark.ml.Predictor.fit(Predictor.scala:71) at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149) at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:145) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike .scala:44) at scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:3 7) at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:145) at main.Test1$.main(Test1.scala:162) at main.Test1.main(Test1.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62 ) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl .java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$ru nMain(SparkSubmit.scala:736) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)