LiveListenerBus is occupying most of the Driver Memory and frequent GC is degrading the performance

2020-08-11 Thread Teja
We have ~120 executors with 5 cores each, for a very long-running job which
crunches ~2.5 TB of data with has too many filters to query. Currently, we
have ~30k partitions which make ~90MB per partition.

We are using Spark v2.2.2 as of now. The major problem we are facing is due
to GC on the driver. All of the driver memory (30G) is getting filled and GC
is very active, which is taking more than 50% of the runtime for Full GC
Evacuation. The heap dump indicates that 80% of the memory is being occupied
by LiveListenerBus and it's not being cleared by GC. Frequent GC runs are
clearing newly created objects only.

>From the Jira tickets, I got to know that Memory consumption by
LiveListenerBus has been addressed in v2.3 (not sure of the specifics). But
until we evaluate migrating to v2.3, is there any quick fix or workaround
either to prevent various listerner events bulking up in driver's memory or
to identify and disable the Listener which is causing the delay in
processing events.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Support for group aggregate pandas UDF in streaming aggregation for SPARK 3.0 python

2020-08-11 Thread Aesha Dhar Roy
Hi,

Is there any plan to remove the limitation mentioned below?

*Streaming aggregation doesn't support group aggregate pandas UDF *

We want to run our data modelling jobs real time using Spark 3.0 and kafka
2.4 and need to have support for custom aggregate pandas UDF on stream
windows.
Is there any plan for this in the upcoming releases ?

Regards,
Aesha


Re: LiveListenerBus is occupying most of the Driver Memory and frequent GC is degrading the performance

2020-08-11 Thread Waleed Fateem
Hi Teja,

The only thought I have is maybe considering decreasing
the spark.scheduler.listenerbus.eventqueue.capacity parameter. That should
decrease the driver memory pressure but of course you'll end up with
dropping events probably more frequently, meaning you can't really trust
anything you see in the UI anymore.

I'm not sure what other options there are other than trying things like
increasing driver memory and tuning GC.

Have you looked at the GC logs? For example, are both young and old
generation portions of the heap heavily utilized or is it just the young
generation? Depending on what you end up seeing in the GC log your
particular application might just need a larger young generation size for
example.

Just some ideas for you to consider till you make the move to 2.3 or later.

On Tue, Aug 11, 2020 at 7:14 AM Teja  wrote:

> We have ~120 executors with 5 cores each, for a very long-running job which
> crunches ~2.5 TB of data with has too many filters to query. Currently, we
> have ~30k partitions which make ~90MB per partition.
>
> We are using Spark v2.2.2 as of now. The major problem we are facing is due
> to GC on the driver. All of the driver memory (30G) is getting filled and
> GC
> is very active, which is taking more than 50% of the runtime for Full GC
> Evacuation. The heap dump indicates that 80% of the memory is being
> occupied
> by LiveListenerBus and it's not being cleared by GC. Frequent GC runs are
> clearing newly created objects only.
>
> From the Jira tickets, I got to know that Memory consumption by
> LiveListenerBus has been addressed in v2.3 (not sure of the specifics). But
> until we evaluate migrating to v2.3, is there any quick fix or workaround
> either to prevent various listerner events bulking up in driver's memory or
> to identify and disable the Listener which is causing the delay in
> processing events.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: LiveListenerBus is occupying most of the Driver Memory and frequent GC is degrading the performance

2020-08-11 Thread Mridul Muralidharan
Hi,

  50% of driver time being spent in gc just for listenerbus sounds very
high in a 30G heap.
Did you try to take a heap dump and see what is occupying so much memory ?

This will help us eliminate if the memory usage is due to some user
code/library holding references to large objects/graph of objects - or
memory usage is actually in listener/related code.

Regards,
Mridul


On Tue, Aug 11, 2020 at 8:14 AM Teja  wrote:

> We have ~120 executors with 5 cores each, for a very long-running job which
> crunches ~2.5 TB of data with has too many filters to query. Currently, we
> have ~30k partitions which make ~90MB per partition.
>
> We are using Spark v2.2.2 as of now. The major problem we are facing is due
> to GC on the driver. All of the driver memory (30G) is getting filled and
> GC
> is very active, which is taking more than 50% of the runtime for Full GC
> Evacuation. The heap dump indicates that 80% of the memory is being
> occupied
> by LiveListenerBus and it's not being cleared by GC. Frequent GC runs are
> clearing newly created objects only.
>
> From the Jira tickets, I got to know that Memory consumption by
> LiveListenerBus has been addressed in v2.3 (not sure of the specifics). But
> until we evaluate migrating to v2.3, is there any quick fix or workaround
> either to prevent various listerner events bulking up in driver's memory or
> to identify and disable the Listener which is causing the delay in
> processing events.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: S3 read/write from PySpark

2020-08-11 Thread Stephen Coy
Hi there,

Also for the benefit of others, if you attempt to use any version of Hadoop > 
3.2.0 (such as 3.2.1), you will need to update the version of Google Guava used 
by Apache Spark to that consumed by Hadoop.

Hadoop 3.2.1 requires guava-27.0-jre.jar. The latest is guava-29.0-jre.jar 
which also works.

This guava update will resolve the java.lang.NoSuchMethodError issue.

Cheers,

Steve C

On 6 Aug 2020, at 6:06 pm, Daniel Stojanov 
mailto:m...@danielstojanov.com>> wrote:

Hi,

Thanks for your help. Problem solved, but I thought I should add something in 
case this problem is encountered by others.

Both responses are correct; BasicAWSCredentialsProvider is gone, but simply 
making the substitution leads to the traceback just below. 
java.lang.NoSuchMethodError: 'void 
com.google.common.base.Preconditions.checkArgument(boolean, java.lang.String, 
java.lang.Object, java.lang.Object)'
I found that changing all of the Hadoop packages from 3.3.0 to 3.2.0 *and* 
changing the options away from BasicAWSCredentialsProvider solved the problem.

Thanks.
Regards,



Traceback (most recent call last):
  File "", line 1, in 
  File 
"/home/daniel/packages/spark-3.0.0-bin-hadoop3.2/python/pyspark/sql/readwriter.py",
 line 535, in csv
return 
self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
  File 
"/home/daniel/packages/spark-3.0.0-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
 line 1305, in __call__
  File 
"/home/daniel/packages/spark-3.0.0-bin-hadoop3.2/python/pyspark/sql/utils.py", 
line 131, in deco
return f(*a, **kw)
  File 
"/home/daniel/packages/spark-3.0.0-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
 line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o39.csv.
: java.lang.NoSuchMethodError: 'void 
com.google.common.base.Preconditions.checkArgument(boolean, java.lang.String, 
java.lang.Object, java.lang.Object)'
at org.apache.hadoop.fs.s3a.S3AUtils.lookupPassword(S3AUtils.java:893)
at org.apache.hadoop.fs.s3a.S3AUtils.lookupPassword(S3AUtils.java:869)
at 
org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm(S3AUtils.java:1580)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:341)
at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
at 
org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:705)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)








On Thu, 6 Aug 2020 at 17:19, Stephen Coy 
mailto:s...@infomedia.com.au>> wrote:
Hi Daniel,

It looks like …BasicAWSCredentialsProvider has become 
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider.

However, the way that the username and password are provided appears to have 
changed so you will probably need to look in to that.

Cheers,

Steve C

On 6 Aug 2020, at 11:15 am, Daniel Stojanov 
mailto:m...@danielstojanov.com>> wrote:

Hi,

I am trying to read/write files to S3 from PySpark. The procedure that I have 
used is to download Spark, start PySpark with the hadoop-aws, guava, 
aws-java-sdk-bundle packages. The versions are explicitly specified by looking 
up the exact dependency version on Maven. Allowing dependencies to be auto 
determined does not work. This procedur