On Mon, Aug 8, 2016 at 2:24 PM, Zoltan Fedor <zoltan.1.fe...@gmail.com> wrote:
> Hi all,
>
> I have an interesting issue trying to use UDFs from SparkSQL in Spark 2.0.0
> using pyspark.
>
> There is a big table (5.6 Billion rows, 450Gb in memory) loaded into 300
> executors's memory in SparkSQL, on which we would do some calculation using
> UDFs in pyspark.
> If I run my SQL on only a portion of the data (filtering by one of the
> attributes), let's say 800 million records, then all works well. But when I
> run the same SQL on all the data, then I receive
> "java.lang.OutOfMemoryError: GC overhead limit exceeded" from basically all
> of the executors.
>
> It seems to me that pyspark UDFs in SparkSQL might have a memory leak,
> causing this "GC overhead limit being exceeded".
>
> Details:
>
> - using Spark 2.0.0 on a Hadoop YARN cluster
>
> - 300 executors, each with 2 CPU cores and 8Gb memory (
> spark.yarn.executor.memoryOverhead=6400 )

Does this mean you only have 1.6G memory for executor (others left for Python) ?
The cached table could take 1.5G, it means almost nothing left for other things.

Python UDF do requires some buffering in JVM, the size of buffering depends on
how much rows are under processing by Python process.

> - a table of 5.6 Billions rows loaded into the memory of the executors
> (taking up 450Gb of memory), partitioned evenly across the executors
>
> - creating even the simplest UDF in SparkSQL causes 'GC overhead limit
> exceeded' error if running on all records. Running the same on a smaller
> dataset (~800 million rows) does succeed. If no UDF, the query succeed on
> the whole dataset.
>
> - simplified pyspark code:
>
> from pyspark.sql.types import StringType
>
> def test_udf(var):
>     """test udf that will always return a"""
>     return "a"
> sqlContext.registerFunction("test_udf", test_udf, StringType())
>
> sqlContext.sql("""CACHE TABLE ma""")
>
> results_df = sqlContext.sql("""SELECT SOURCE, SOURCE_SYSTEM,
>                 test_udf(STANDARD_ACCOUNT_STREET_SRC) AS TEST_UDF_OP,
> ROUND(1.0 - (levenshtein(STANDARD_ACCOUNT_CITY_SRC,
> STANDARD_ACCOUNT_CITY_SRC)
>      /
> CASE WHEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)>LENGTH
> (STANDARD_ACCOUNT_CITY_SRC)
>         THEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)
>         ELSE LENGTH (STANDARD_ACCOUNT_CITY_SRC)
>    END),2) AS SCORE_ED_STANDARD_ACCOUNT_CITY,
> STANDARD_ACCOUNT_STATE_SRC, STANDARD_ACCOUNT_STATE_UNIV
> FROM ma""")
>
> results_df.registerTempTable("m")
> sqlContext.cacheTable("m")
>
> results_df = sqlContext.sql("""SELECT COUNT(*) FROM m""")
> print(results_df.take(1))
>
>
> - the error thrown on the executors:
>
> 16/08/08 15:38:17 ERROR util.Utils: Uncaught exception in thread stdout
> writer for /hadoop/cloudera/parcels/Anaconda/bin/python
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:503)
> at
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:61)
> at
> org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$1.apply(BatchEvalPythonExec.scala:64)
> at
> org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$1.apply(BatchEvalPythonExec.scala:64)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at
> scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1076)
> at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091)
> at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1129)
> at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
> at
> org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
> at
> org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
> 16/08/08 15:38:17 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED
> SIGNAL TERM
>
>
> Has anybody experienced these "GC overhead limit exceeded" errors with
> pyspark UDFs before?
>
> Thanks,
> Zoltan
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to