Your parenthesis don't look right as you're embedding the filter on the
Row.fromSeq().
Try this:
val trainRDD = rawTrainData
.filter(!_.isEmpty)
.map(rawRow => Row.fromSeq(rawRow.split(",")))
.filter(_.length == 15)
.map(_.toString).map(_.trim)
-Don
On Fri, May 15, 2015 at 11:17 PM, Mike Frampton <[email protected]>
wrote:
> Hi
>
> Im getting the following error when trying to process a csv based data
> file.
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 1 in stage 10.0 failed 4 times, most recent
> failure: Lost task 1.3 in stage 10.0 (TID 262,
> hc2r1m3.semtech-solutions.co.nz):*
> java.lang.ArrayIndexOutOfBoundsException: 0*
> at
> org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
> at
> org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
> at
> org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
> at
> org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
>
> I have made sure that none of my data rows are empty and that they all
> have 15 records. I have also physically checked the
> data. The error occurs when I run the actual spark sql on the last line.
> The script is as follows.
>
> val server = "hdfs://hc2nn.semtech-solutions.co.nz:8020"
> val path = "/data/spark/h2o/"
>
> val train_csv = server + path + "adult.train.data" // 32,562 rows
> val test_csv = server + path + "adult.test.data" // 16,283 rows
>
> // load the data
>
> val rawTrainData = sparkCxt.textFile(train_csv)
> val rawTestData = sparkCxt.textFile(test_csv)
>
> // create a spark sql schema for the row
>
> val schemaString = "age workclass fnlwgt education educationalnum
> maritalstatus" +
> " occupation relationship race gender capitalgain
> capitalloss" +
> " hoursperweek nativecountry income"
>
> val schema = StructType( schemaString.split(" ")
> .map(fieldName => StructField(fieldName, StringType, false)))
>
> // create an RDD from the raw training data
>
> val trainRDD = rawTrainData
> .filter(!_.isEmpty)
> .map(rawRow => Row.fromSeq(rawRow.split(",")
> .filter(_.length == 15)
> .map(_.toString).map(_.trim) ))
>
> println( ">>>>> Raw Training Data Count = " + trainRDD.count() )
>
> val testRDD = rawTestData
> .filter(!_.isEmpty)
> .map(rawRow => Row.fromSeq(rawRow.split(",")
> .filter(_.length == 15)
> .map(_.toString).map(_.trim) ))
>
> println( ">>>>> Raw Testing Data Count = " + testRDD.count() )
>
> // create a schema RDD
>
> val trainSchemaRDD = sqlContext.applySchema(trainRDD, schema)
> val testSchemaRDD = sqlContext.applySchema(testRDD, schema)
>
> // register schema RDD as a table
>
> trainSchemaRDD.registerTempTable("trainingTable")
> testSchemaRDD.registerTempTable("testingTable")
>
> println( ">>>>> Schema RDD Training Data Count = " +
> trainSchemaRDD.count() )
> println( ">>>>> Schema RDD Testing Data Count = " +
> testSchemaRDD.count() )
>
> // now run sql against the table to filter the data
>
>
>
>
>
> * val schemaRddTrain = sqlContext.sql( "SELECT "+
> "age,workclass,education,maritalstatus,occupation,relationship,race,"+
> "gender,hoursperweek,nativecountry,income "+ "FROM trainingTable LIMIT
> 5000")*
>
> println( ">>>>> Training Data Count = " + schemaRddTrain.count() )
>
> Any advice is appreciated :)
>
>
--
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
http://www.MailLaunder.com/
http://www.DrudgeSiren.com/
http://plu.gd/
800-733-2143