Hi Sandy, thanks for the reply. I tried to run this code without the cache and it worked. Also if I cache before repartition, it also works, the problem seems to be something related with repartition and caching. My train is a SchemaRDD, and if I make all my columns as StringType, the error doesn't happen, but if I have anything else, this exception is thrown.
2015-01-21 16:37 GMT-02:00 Sandy Ryza <sandy.r...@cloudera.com>: > Hi Dirceu, > > Does the issue not show up if you run "map(f => > f(1).asInstanceOf[Int]).sum" on the "train" RDD? It appears that f(1) is > an String, not an Int. If you're looking to parse and convert it, "toInt" > should be used instead of "asInstanceOf". > > -Sandy > > On Wed, Jan 21, 2015 at 8:43 AM, Dirceu Semighini Filho < > dirceu.semigh...@gmail.com> wrote: > >> Hi guys, have anyone find something like this? >> I have a training set, and when I repartition it, if I call cache it throw >> a classcastexception when I try to execute anything that access it >> >> val rep120 = train.repartition(120) >> val cached120 = rep120.cache >> cached120.map(f => f(1).asInstanceOf[Int]).sum >> >> Cell Toolbar: >> In [1]: >> >> ClusterSettings.executorMemory=Some("28g") >> >> ClusterSettings.maxResultSize = "20g" >> >> ClusterSettings.resume=true >> >> ClusterSettings.coreInstanceType="r3.xlarge" >> >> ClusterSettings.coreInstanceCount = 30 >> >> ClusterSettings.clusterName="UberdataContextCluster-Dirceu" >> >> uc.applyDateFormat("YYMMddHH") >> >> Searching for existing cluster UberdataContextCluster-Dirceu ... >> Spark standalone cluster started at >> http://ec2-54-68-91-64.us-west-2.compute.amazonaws.com:8080 >> Found 1 master(s), 30 slaves >> Ganglia started at >> http://ec2-54-68-91-64.us-west-2.compute.amazonaws.com:5080/ganglia >> >> In [37]: >> >> import org.apache.spark.sql.catalyst.types._ >> >> import eleflow.uberdata.util.IntStringImplicitTypeConverter._ >> >> import eleflow.uberdata.enums.SupportedAlgorithm._ >> >> import eleflow.uberdata.data._ >> >> import org.apache.spark.mllib.tree.DecisionTree >> >> import eleflow.uberdata.enums.DateSplitType._ >> >> import org.apache.spark.mllib.regression.LabeledPoint >> >> import org.apache.spark.mllib.linalg.Vectors >> >> import org.apache.spark.mllib.classification._ >> >> import eleflow.uberdata.model._ >> >> import eleflow.uberdata.data.stat.Statistics >> >> import eleflow.uberdata.enums.ValidationMethod._ >> >> import org.apache.spark.rdd.RDD >> >> In [5]: >> >> val train = >> uc.load(uc.toHDFSURI("/tmp/data/input/train_rev4.csv")).applyColumnTypes(Seq(DecimalType(), >> LongType,TimestampType, StringType, >> >> >> StringType, StringType, StringType, StringType, >> >> >> StringType, StringType, StringType, StringType, >> >> >> StringType, StringType, StringType, StringType, >> >> >> StringType, StringType, StringType, StringType, >> >> >> LongType, LongType,StringType, StringType,StringType, >> >> >> StringType,StringType)) >> >> .formatDateValues(2,DayOfAWeek | Period).slice(excludes = Seq(12,13)) >> >> Out[5]: >> >> idclickhour1hour2C1banner_possite_idsite_domainsite_categoryapp_idapp_domain >> >> app_categorydevice_modeldevice_typedevice_conn_typeC14C15C16C17C18C19C20C21 >> 100000941815109427302.03.0100501fbe01fef384576728905ebdecad23867801e8d9 >> 07d7df2244956a241215706320501722035-1791000016934911786371502.03.010050 >> >> 1fbe01fef384576728905ebdecad23867801e8d907d7df22711ee1201015704320501722035 >> 100084791000037190421511948602.03.0100501fbe01fef384576728905ebdecad2386 >> >> 7801e8d907d7df228a4875bd1015704320501722035100084791000064072448083837602.0 >> >> 3.0100501fbe01fef384576728905ebdecad23867801e8d907d7df226332421a101570632050 >> 1722035100084791000067905641704209602.03.010051fe8cc4489166c1610569f928 >> >> ecad23867801e8d907d7df22779d90c21018993320502161035-115710000720757801103869 >> >> 02.03.010050d6137915bb1ef334f028772becad23867801e8d907d7df228a4875bd1016920 >> 32050189904311000771171000072472998854491102.03.0100508fda644b25d4cfcd >> f028772becad23867801e8d907d7df22be6db1d71020362320502333039-1157 >> In [7]: >> >> val test = >> uc.load(uc.toHDFSURI("/tmp/data/input/test_rev4.csv")).applyColumnTypes(Seq(DecimalType(), >> TimestampType, StringType, >> >> >> StringType, StringType, StringType, StringType, >> >> >> StringType, StringType, StringType, StringType, >> >> >> StringType, StringType, StringType, StringType, >> >> >> StringType, StringType, StringType, StringType, >> >> >> LongType, LongType,StringType, StringType,StringType, >> >> >> StringType,StringType)). >> >> formatDateValues(1,DayOfAWeek | Period).slice(excludes =Seq(11,12)) >> >> Out[7]: >> idhour1hour2C1banner_possite_idsite_domainsite_categoryapp_idapp_domain >> >> app_categorydevice_modeldevice_typedevice_conn_typeC14C15C16C17C18C19C20C21 >> 100001740588092635695.03.010050235ba823f6ebf28ef028772becad23867801e8d9 >> 07d7df220eb711ec10833032050761317510007523100001825269208554285.03.010050 >> >> 1fbe01fef384576728905ebdecad23867801e8d907d7df22ecb851b21022676320502616035 >> 10008351100005541398292139845.03.0100501fbe01fef384576728905ebdecad2386 >> 7801e8d907d7df221f0bc64f102267632050261603510008351100010946378097988455.0 >> >> 3.01005085f751fdc4e18dd650e219e051cedd4eaefc06bd0f2161f8542422a7101864832050 >> 1092380910015661100013770415586707455.03.01005085f751fdc4e18dd650e219e0 >> >> 9c13b4192347f47af95efa071f0bc64f1023160320502667047-122110001521204153353724 >> >> 5.03.01005157fe1b205b626596f028772becad23867801e8d907d7df2268b6db2c106563320 >> >> 50572239-132100019110567070233785.03.0100501fbe01fef384576728905ebdecad2386 >> 7801e8d907d7df22d4897fef102281332050264723910014823 >> In [ ]: >> >> val (validationPrediction2, logRegModel2, testDataSet2, >> validationDataSet2, trainDataSet2, testPrediction2) = >> >> eleflow.uberdata.data.Predictor.predict(train,test,excludes= >> Seq(6,7,9,10,12,13), iterations = 100, algorithm = >> BinaryLogisticRegressionBFGS) >> >> spent time 1943 >> >> Out[5]: >> >> MappedRDD[165] at map at Predictor.scala:265 >> >> In [ ]: >> >> val corr2 = >> eleflow.uberdata.data.stat.Statistics.targetCorrelation(validationDataSet2) >> >> In [ ]: >> >> val correlated = corr2.filter(_._1>0.01) >> >> In [ ]: >> >> val correlated2 = correlated.map(_._2) >> >> Out[8]: >> >> Array(11768, 11285, 11278, 11289, 12051, 11279, 42, 11805, 11767, 46, >> 22, 12063, 20, 8388, 11438, 11783, 8981, 11408, 8386, 11360, 11377, >> 12059, 11418, 12044, 11771, 11359, 11839, 9118, 9116, 8666, 11986, >> 8665, 8888, 8887, 18, 12058, 11925, 11468, 11336, 11769, 9254, 9029, >> 11404, 9028, 71, 11982, 11798, 63, 7401, 8673, 12040, 8664, 4986, 452, >> 11949, 12050, 76, 11800, 8975, 11189, 11743, 11956, 11801, 12026, >> 8976, 11784, 2418, 11808, 12054, 11904, 1819, 7, 1840, 11429, 11608, >> 11983, 11387, 9403, 11495, 11985, 8658, 1020, 11626, 8384, 41, 8387, >> 11778, 4390, 7067, 11489, 11542, 3, 8381, 9154, 11766, 11479, 9077, >> 10782, 11680, 11830, 12043, 8926, 8982, 11409, 11391, 11364, 8656, >> 1274, 5523, 9, 12025, 8279, 1528, 10, 11490, 12046, 6771, 3937, 11450, >> 11811, 8632, 38, 8898, 11382, 12028, 12053, 4563, 5040, 11330, 1983, >> 11799, 11327, 11672, 8628, 11342, 11813, 6450, 11825, 8941, 10407, >> 11806, 11643, 8940, 9405, 11757, 9075, 12056, 11522, 11688, 10406, >> 11322, 9076, 29, 12064, 8637, 11347, 10831, 11406, 11773, 40, 10560, >> 11645, 9404, 11789, 11651, 9743, 11835, 11843, 9382, 11971, 11646, >> 12065, 11984, 8681, 10563, 12039, 9383, 8680, 8391, 3260, 5453, 10120, >> 8602, 11649, 9385, 4320, 9384, 11210, 11750, 11319, 11787, 11506, >> 11628, 11415, 11777, 10576, 240, 12017, 0, 10121, 11644, 8929, 11392, >> 12024, 5602, 9280, 11473, 884, 11812, 10741, 11780, 11503, 8672, >> 11357, 11966, 12055, 11539, 8644, 11350, 11836, 9058, 11271, 11764, >> 5094, 7881, 11504, 11698, 11424, 11831, 11587, 11426, 2577, 11610, >> 8948, 11987, 10744, 9290, 11477, 11497, 11367, 8622, 11969, 12030, >> 8062, 11664, 11704, 10949, 11508, 10530, 10225, 7655, 4274, 10534, >> 11394, 8934, 15, 11671, 11845, 12069, 6767, 3713, 8979, 11310, 10670, >> 8978, 11498, 11281, 11291, 11549, 11840, 10119, 10419, 897, 5875, >> 11482, 10617, 9331, 10618, 11662, 12060, 11496, 10654, 9742, 11422, >> 12027, 11545, 6612, 9757, 11881, 19, 11321, 11402, 11256, 8389, 9379, >> 9741, 11705, 5188, 2780, 8593, 11325, 9452, 11255, 9304, 11990, 8393, >> 11853, 11619, 9312, 9061, 11425, 8385, 11642, 12023, 9303, 8885, >> 11375, 6807, 8576, 11528, 11485, 11786, 8518, 11834, 12066, 2257, >> 11345, 11333, 11903, 9918, 11992, 11257, 11488, 11637, 7215, 10556, >> 11744, 12018, 12031, 1990, 542, 6099, 9005, 11900, 9739, 11566, 11481, >> 11314, 12052, 11307, 1828, 12072, 5, 10020, 11413, 10138, 11295, 8959, >> 8025) >> >> In [ ]: >> >> val trained = trainDataSet2.map{f => >> >> val array = f._2.features.toArray >> >> new LabeledPoint(f._2.label,Vectors.dense( >> >> correlated2.map(i => array(i))))}.cache >> >> Out[9]: >> >> MappedRDD[175] at map at <console>:52 >> >> In [ ]: >> >> val m = Predictor.binaryLogisticRegressionModelSGD(trained,100) >> >> In [23]: >> >> val validated = validationDataSet2.map{f => >> >> val array = f._2.features.toArray >> >> (f._1,new >> LabeledPoint(f._2.label,Vectors.dense( >> >> correlated2.map(i => array(i)))))}.cache >> >> Out[23]: >> >> MappedRDD[682] at map at <console>:71 >> >> In [24]: >> >> val prediction = validated.map { >> >> case (ids, point) => >> >> (ids, >> m.model.asInstanceOf[LogisticRegressionModel].predict(point.features)) >> >> } >> >> Out[24]: >> >> MappedRDD[683] at map at <console>:79 >> >> In [20]: >> >> validated.take(2) >> >> Out[20]: >> >> >> Array((0.0,[0.0,0.0,0.0,1.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]), >> >> (0.0,[0.0,0.0,0.0,1.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0])) >> >> In [26]: >> >> val logloss = eleflow.uberdata.data.stat.Statistics.logLoss(prediction) >> >> Out[26]: >> >> 5.861273254972684 >> >> In [17]: >> >> validationDataSet2.take(3) >> >> Out[17]: >> >> >> Array((0,(0.0,(12073,[0,1,4,9,18,42,4563,8382,8386,8575,11279,11289,11322,11766,11803,11904,12065],[2.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))), >> >> (0,(0.0,(12073,[0,1,4,9,18,42,3260,8382,8386,8577,11279,11289,11322,11766,11803,11904,12065],[2.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))), >> >> (0,(0.0,(12073,[0,1,4,10,40,42,4729,8382,8386,8672,11279,11289,11357,11768,11805,11852,12051],[2.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])))) >> >> In [ ]: >> >> trained.take(4) >> >> In [7]: >> >> >> >> import org.apache.spark.mllib.classification._ >> >> In [8]: >> >> val steps = Seq(Step(10,2),new Step(7,3), new Step(6,4)) >> >> Out[8]: >> 0Step(10,2)1Step(7,3)2Step(6,4) >> In [ ]: >> >> val predictor = >> eleflow.uberdata.data.Predictor.evolutivePredict(train.repartition(240), >> test, algorithm = BinaryLogisticRegressionBFGS, >> >> validationMethod = LogarithmicLoss, steps >> = steps, iterations = 30) >> >> In [ ]: >> >> uc.terminate >> >> In [11]: >> >> train.partitions.size >> >> Out[11]: >> >> 94 >> >> In [20]: >> >> val rep60 = train.repartition(120) >> >> Out[20]: >> idclickhourC1banner_possite_idsite_domainsite_categoryapp_idapp_domain >> >> app_categorydevice_iddevice_ipdevice_modeldevice_typedevice_conn_typeC14C15 >> >> C16C17C18C19C20C211769841751868484765301410221210051e8f79e60c4342784f028772b >> ecad23867801e8d907d7df22a99f214ab526ff2ce9b8d8d71020634320502374339-123 >> 17703074559452740131141022121005085f751fdc4e18dd650e219e0399477562347f47a >> >> cef3e64932d58615ab5a307674de3ee61221768320502506035-115717708054784542889711 >> 0141022121005085f751fdc4e18dd650e219e051cedd4eaefc06bd0f2161f8a99f214a >> d30ecac3542422a7102161132050248032971001116117713001998424865357114102212 >> 1005085f751fdc4e18dd650e219e0bc44c87d7801e8d90f2161f8ad97ca8caa305f51 >> 43836a961020633320502374339-123177175933008005586270141022121005085f751fd >> >> c4e18dd650e219e0f888bf4c5b9c592b0f2161f89a5442e768bc961a1f0bc64f102115332050 >> 2420235-169177224932175731189110141022121005085f751fdc4e18dd650e219e0 >> e96773f02347f47a0f2161f8a99f214abf741817ef726eae1021767320502506035-1157 >> 17727816327614515164014102212100505bcf81a29d54950bf028772becad23867801e8d9 >> 07d7df22a99f214a5e4ee78bbe87996b1221770320502507035100176157 >> In [ ]: >> >> val cach60 = rep60.cache >> >> In [28]: >> >> cach60.map(f => f(1).asInstanceOf[Int]).sum >> >> org.apache.spark.SparkException: Job aborted due to stage failure: >> Task 53 in stage 53.0 failed 4 times, most recent failure: Lost task >> 53.3 in stage 53.0 (TID 1322, >> ip-172-31-0-62.us-west-2.compute.internal): >> java.lang.ClassCastException: java.lang.String cannot be cast to >> java.lang.Integer >> at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) >> at >> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:59) >> at >> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:59) >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> at >> scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) >> at >> scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157) >> at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:853) >> at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:851) >> at >> org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350) >> at >> org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350) >> at >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> at org.apache.spark.scheduler.Task.run(Task.scala:56) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> >> Driver stacktrace: >> org.apache.spark.scheduler.DAGScheduler.org >> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) >> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) >> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) >> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) >> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) >> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) >> scala.Option.foreach(Option.scala:236) >> >> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) >> >> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) >> akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> >> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) >> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> akka.actor.ActorCell.invoke(ActorCell.scala:487) >> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >> akka.dispatch.Mailbox.run(Mailbox.scala:220) >> >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> >> >> R >> > >