Sorry if this email is a duplicate, I realised that I was not registered with
the mailing list ...
I am having a problem with a spark sql script which is running on a spark 1.2
CentOS CDH 5.3 mini 5 node cluster. The script processes some image csv data
each record/line of which has 28x28 integer elements ending with an integer
label
value which describes the record.
I have checked the data file rows to ensure that they contain the correct
number of
elements.
[hadoop@hc2r1m2 h2o_spark_1_2]$ cat chk_csv.bash
#!/bin/bash
MYFILE=$1
echo ""
echo "check $MYFILE"
echo ""
cat $MYFILE | while read line
do
echo -n "Line records = "
echo $line | tr "," "\n" | wc -l
done
[hadoop@hc2r1m2 h2o_spark_1_2]$ ./chk_csv.bash mnist_train_1x.csv
check mnist_train_1x.csv
Line records = 785
Line records = 785
Line records = 785
Line records = 785
Line records = 785
Line records = 785
Line records = 785
Line records = 785
Line records = 785
Line records = 785
The basic script looks like this ..
// create a spark conf and context
val sparkMaster = "spark://hc2nn.semtech-solutions.co.nz:7077"
val appName = "Spark ex3"
val conf = new SparkConf()
conf.setMaster(sparkMaster)
conf.setAppName(appName)
val sparkCxt = new SparkContext(conf)
// Initialize SQL context
import org.apache.spark.sql._
implicit val sqlContext = new SQLContext(sparkCxt)
// prep the hdfs based data
val server = "hdfs://hc2nn.semtech-solutions.co.nz:8020"
val path = "/data/spark/h2o/"
val train_csv = server + path + "mnist_train_1x.csv"
val schemaString = getSchema() // string representing schema 28 x 28 int plus
label int
val schema = StructType( schemaString.split(" ")
.map(fieldName => StructField(fieldName, IntegerType, false)))
val rawTrainData = sparkCxt.textFile(train_csv)
val trainRDD = rawTrainData.map( rawRow => Row( rawRow.split(",")
.map(_.toInt) ) )
val trainSchemaRDD = sqlContext.applySchema(trainRDD, schema)
trainSchemaRDD.registerTempTable("trainingTable")
val resSchemaRddTrain = sqlContext.sql("""SELECT P1,Label FROM
trainingTable""".stripMargin)
println( " >>>>> sqlResult rows = " + resSchemaRddTrain.count() )
val resArray = resSchemaRddTrain.collect()
// collect results in java.lang.ArrayIndexOutOfBoundsException
No matter what I try I get an ArrayIndexOutOfBoundsException when I try to
examine the result
of the select even though I know that there are 10 result rows.
I dont think its a data issue as both the schema and all data rows have 28x28+1
= 785 elements.
Advice appreciated