(Sorry, first one sent to incubator maling list which probably doesn't come
here)

Hi, I have been stuck at this for a week.

I have a relatively simple dataframe like this:

+---------+---------+--------------------+-------------------+
|     item|  item_id|              target|              start|
+---------+---------+--------------------+-------------------+
|sensor123|sensor123|[0.005683, 0.0070...|2008-01-01 00:00:00|
|sensor249|sensor249|[0.009783, 0.0068...|2008-01-01 00:00:00|
|sensor379|sensor379|[0.001917, 0.0016...|2008-01-01 00:00:00|
| sensor52| sensor52|[0.016267, 0.0121...|2008-01-01 00:00:00|

target us a WrappedArray[Double]

This simple code runs on local spark but has stack overflow error on
EMR Spark.  I've tried playing with paritioning with no effect.

```scala
def transform(dataset: Dataset[_]): DataFrame = {
    var ds:Dataset[_] = dataset.
    var df:DataFrame = ds.toDF

    val sparkSession = dataset.sparkSession
    import sparkSession.implicits._

    val itemIndex:Int =
SparkJavaUtils.getColumnIndex(DataSources.FIELD_ITEM, df)
    val startIndex:Int =
SparkJavaUtils.getColumnIndex(DataSources.FIELD_START, df)
    val targetIndex:Int =
SparkJavaUtils.getColumnIndex(DataSources.FIELD_TARGET, df)

    val result = df.map(r => {
        val itemName = r.getAs[String](itemIndex)
        val start = r.getAs[Timestamp](startIndex)
        val targetArray = r.getAs[mutable.WrappedArray[Double]](targetIndex)
        (itemName, start, targetArray)
    })
    result.show

    // Get the schema ready
    val schema = StructType(
        Seq(
            StructField(DataSources.FIELD_ITEM, StringType, true,
Metadata.empty),
            StructField(DataSources.FIELD_START, TimestampType, true,
Metadata.empty),
            StructField(DataSources.FIELD_TARGET,
DataTypes.createArrayType(DoubleType), true, Metadata.empty)
        )
    )

    var nn = dataset.sqlContext.createDataFrame(result.toDF.rdd, schema)
    nn.show
    nn
}

```

Error is like:

Exception in thread "main" java.lang.StackOverflowError
        at 
scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:65)
        at scala.StringContext.standardInterpolator(StringContext.scala:123)
        at scala.StringContext.s(StringContext.scala:95)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.freshName(CodeGenerator.scala:565)
        at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
        at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
        at scala.Option.getOrElse(Option.scala:121)
        at 
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
        at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:153)
        at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:152)
        at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
        at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
        at scala.collection.immutable.Stream.drop(Stream.scala:858)
        at scala.collection.immutable.Stream.drop(Stream.scala:202)
        at 
scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:64)
        at scala.collection.immutable.Stream.apply(Stream.scala:202)
        at 
org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:62)
        at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
        at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
        at scala.Option.getOrElse(Option.scala:121)
        at 
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
        at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:153)
        at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:152)
        at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
        at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
        at scala.collection.immutable.Stream.drop(Stream.scala:858)
        at scala.collection.immutable.Stream.drop(Stream.scala:202)
        at 
scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:64)
        at scala.collection.immutable.Stream.apply(Stream.scala:202)
        at 
org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:62)
        at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
        at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
        at scala.Option.getOrElse(Option.scala:121)
        at 
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
        at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:153)
        at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:152)
        at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
        at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)

-Chris

Reply via email to