I'm using Pyspark (version 3.2) and I've encountered the following exception
while trying to perform a slice on array in a DataFrame:
"org.apache.spark.SparkRuntimeException: Unexpected value for length in
function slice: length must be greater than or equal to 0" but the length is
grater then 1
Here's the full exception I'm receiving:
```
Caused by: org.apache.spark.SparkRuntimeException: Unexpected value for length
in function slice: length must be greater than or equal to 0.
at
org.apache.spark.sql.errors.QueryExecutionErrors$.unexpectedValueForLengthInFunctionError(QueryExecutionErrors.scala:1602)
at
org.apache.spark.sql.errors.QueryExecutionErrors.unexpectedValueForLengthInFunctionError(QueryExecutionErrors.scala)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.Slice_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.subExpr_1$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
Source)
at
org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3(basicPhysicalOperators.scala:276)
at
org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3$adapted(basicPhysicalOperators.scala:275)
at
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:515)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
```
The DataFrame I'm working with has the following data:
```
'id': '2',
'arr2': [1, 1, 1, 2],
'arr1': [0.0, 1.0, 1.0, 1.0]
```
And the schema of the DataFrame is as follows:
```
root
|-- id: string (nullable = true)
|-- arr2: array (nullable = true)
| |-- element: long (containsNull = true)
|-- arr1: array (nullable = true)
| |-- element: double (containsNull = true)
```
The code is:
```
spark =
SparkSession.builder.master("local").appName("test-app").getOrCreate()
data = [
{
'id': '2',
'arr2': [1, 1, 1, 2],
'arr1': [0.0, 1.0, 1.0, 1.0]
}
]
df = spark.createDataFrame(data)
df = df.withColumn('end_index', F.size('arr2') - F.lit(6))
df = df.filter(F.col('end_index') > 0) #Note HERE
df = df.withColumn("trimmed_arr2",F.slice(F.col('arr2'), start=F.lit(1),
length=F.col('end_index')))
df = df.withColumn("avg_trimmed", F.expr('aggregate(trimmed_arr2, 0L,
(acc,x) -> acc+x, acc -> acc / end_index)'))
df = df.filter(F.col('avg_trimmed') > 30)
df = df.withColumn('repeated_counts',
F.size(F.array_distinct('trimmed_arr2')))
df = df.withColumn('ratio', F.col('repeated_counts') /
F.size('trimmed_arr2'))
df = df.filter(F.col('ratio') > 0.6)
df.show(truncate=False, vertical=True)
```
What's strange is that when I write the DataFrame to disk and then read it back
before the following line: `df = df.filter(F.col('ratio') > 0.6)` the code
executes perfectly without any exceptions.
**Note:
The code works if I add the following code:**
spark = SparkSession.builder \
.master("local") \
.appName("test-app") \
.config("spark.driver.bindAddress", "127.0.0.1") \
.config('spark.sql.optimizer.excludedRules',
"org.apache.spark.sql.catalyst.optimizer.PushDownPredicates") \
.getOrCreate()
I would greatly appreciate any insights on why this SparkRuntimeException might
be occurring .
Any idea what else can I check?
Thank you in advance for your help!
Daniel Bariudin
Software Developer | Cyber R&D | Cognyte
Mobile: +972-050-9011701
[cid:[email protected]]
www.cognyte.com<http://www.cognyte.com/>