AFAICT, there might be data skews, some partitions got too much rows,
which caused out of memory limitation. Trying .groupBy().count()
or .aggregateByKey().count() may help check each partition data size.
If no data skew, to increase .groupBy() parameter `numPartitions` is
worth a try.

-- 
Cheers,
-z

On Wed, 6 May 2020 00:07:58 +0000
Gautham Acharya <gauth...@alleninstitute.org> wrote:

> Hi everyone,
> 
> I'm running a job that runs a Pandas UDF to GROUP BY a large matrix.
> 
> The GROUP BY function runs on a wide dataset. The first column of the dataset 
> contains string labels that are GROUPed on. The remaining columns are numeric 
> values that are aggregated in the Pandas UDF. The dataset is very wide, with 
> 50,000 columns and 3 million rows.
> 
> ----------
> | label_col | num_col_0 | num_col_1 | num_col_2 |  --- | num_col_50000|
> |   label_a  |         2.0        |         5.6       |      7.123      |
> |   label_b  |         11.0      |         1.4       |      2.345      |
> |   label_a  |         3.1        |         6.2       |      5.444      |
> 
> 
> 
> My job runs fine on smaller datasets, with the same number of columns but 
> fewer rows. However, when run on a dataset with 3 million rows, I see the 
> following exception:
> 
> 20/05/05 23:36:27 ERROR Executor: Exception in task 66.1 in stage 12.0 (TID 
> 2358)
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py",
>  line 377, in main
>     process()
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py",
>  line 372, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py",
>  line 286, in dump_stream
>     for series in iterator:
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py",
>  line 303, in load_stream
>     for batch in reader:
>   File "pyarrow/ipc.pxi", line 266, in __iter__
>   File "pyarrow/ipc.pxi", line 282, in 
> pyarrow.lib._CRecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> 
> Looking at this issue<https://issues.apache.org/jira/browse/ARROW-4890>, it 
> looks like PyArrow has a 2GB limit for each shard that is sent to the 
> grouping function.
> 
> I'm currently running this job on 4 nodes with 16cores and 64GB of memory 
> each.
> 
> I've attached the full error log here as well. What are some workarounds that 
> I can do to get this job running? Unfortunately, we are running up to a 
> production release and this is becoming a severe blocker.
> 
> Thanks,
> Gautham
> 
> 
> 
> 

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to