LiveListenerBus is occupying most of the Driver Memory and frequent GC is degrading the performance
We have ~120 executors with 5 cores each, for a very long-running job which crunches ~2.5 TB of data with has too many filters to query. Currently, we have ~30k partitions which make ~90MB per partition. We are using Spark v2.2.2 as of now. The major problem we are facing is due to GC on the driver. All of the driver memory (30G) is getting filled and GC is very active, which is taking more than 50% of the runtime for Full GC Evacuation. The heap dump indicates that 80% of the memory is being occupied by LiveListenerBus and it's not being cleared by GC. Frequent GC runs are clearing newly created objects only. >From the Jira tickets, I got to know that Memory consumption by LiveListenerBus has been addressed in v2.3 (not sure of the specifics). But until we evaluate migrating to v2.3, is there any quick fix or workaround either to prevent various listerner events bulking up in driver's memory or to identify and disable the Listener which is causing the delay in processing events. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Support for group aggregate pandas UDF in streaming aggregation for SPARK 3.0 python
Hi, Is there any plan to remove the limitation mentioned below? *Streaming aggregation doesn't support group aggregate pandas UDF * We want to run our data modelling jobs real time using Spark 3.0 and kafka 2.4 and need to have support for custom aggregate pandas UDF on stream windows. Is there any plan for this in the upcoming releases ? Regards, Aesha
Re: LiveListenerBus is occupying most of the Driver Memory and frequent GC is degrading the performance
Hi Teja, The only thought I have is maybe considering decreasing the spark.scheduler.listenerbus.eventqueue.capacity parameter. That should decrease the driver memory pressure but of course you'll end up with dropping events probably more frequently, meaning you can't really trust anything you see in the UI anymore. I'm not sure what other options there are other than trying things like increasing driver memory and tuning GC. Have you looked at the GC logs? For example, are both young and old generation portions of the heap heavily utilized or is it just the young generation? Depending on what you end up seeing in the GC log your particular application might just need a larger young generation size for example. Just some ideas for you to consider till you make the move to 2.3 or later. On Tue, Aug 11, 2020 at 7:14 AM Teja wrote: > We have ~120 executors with 5 cores each, for a very long-running job which > crunches ~2.5 TB of data with has too many filters to query. Currently, we > have ~30k partitions which make ~90MB per partition. > > We are using Spark v2.2.2 as of now. The major problem we are facing is due > to GC on the driver. All of the driver memory (30G) is getting filled and > GC > is very active, which is taking more than 50% of the runtime for Full GC > Evacuation. The heap dump indicates that 80% of the memory is being > occupied > by LiveListenerBus and it's not being cleared by GC. Frequent GC runs are > clearing newly created objects only. > > From the Jira tickets, I got to know that Memory consumption by > LiveListenerBus has been addressed in v2.3 (not sure of the specifics). But > until we evaluate migrating to v2.3, is there any quick fix or workaround > either to prevent various listerner events bulking up in driver's memory or > to identify and disable the Listener which is causing the delay in > processing events. > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: LiveListenerBus is occupying most of the Driver Memory and frequent GC is degrading the performance
Hi, 50% of driver time being spent in gc just for listenerbus sounds very high in a 30G heap. Did you try to take a heap dump and see what is occupying so much memory ? This will help us eliminate if the memory usage is due to some user code/library holding references to large objects/graph of objects - or memory usage is actually in listener/related code. Regards, Mridul On Tue, Aug 11, 2020 at 8:14 AM Teja wrote: > We have ~120 executors with 5 cores each, for a very long-running job which > crunches ~2.5 TB of data with has too many filters to query. Currently, we > have ~30k partitions which make ~90MB per partition. > > We are using Spark v2.2.2 as of now. The major problem we are facing is due > to GC on the driver. All of the driver memory (30G) is getting filled and > GC > is very active, which is taking more than 50% of the runtime for Full GC > Evacuation. The heap dump indicates that 80% of the memory is being > occupied > by LiveListenerBus and it's not being cleared by GC. Frequent GC runs are > clearing newly created objects only. > > From the Jira tickets, I got to know that Memory consumption by > LiveListenerBus has been addressed in v2.3 (not sure of the specifics). But > until we evaluate migrating to v2.3, is there any quick fix or workaround > either to prevent various listerner events bulking up in driver's memory or > to identify and disable the Listener which is causing the delay in > processing events. > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: S3 read/write from PySpark
Hi there, Also for the benefit of others, if you attempt to use any version of Hadoop > 3.2.0 (such as 3.2.1), you will need to update the version of Google Guava used by Apache Spark to that consumed by Hadoop. Hadoop 3.2.1 requires guava-27.0-jre.jar. The latest is guava-29.0-jre.jar which also works. This guava update will resolve the java.lang.NoSuchMethodError issue. Cheers, Steve C On 6 Aug 2020, at 6:06 pm, Daniel Stojanov mailto:m...@danielstojanov.com>> wrote: Hi, Thanks for your help. Problem solved, but I thought I should add something in case this problem is encountered by others. Both responses are correct; BasicAWSCredentialsProvider is gone, but simply making the substitution leads to the traceback just below. java.lang.NoSuchMethodError: 'void com.google.common.base.Preconditions.checkArgument(boolean, java.lang.String, java.lang.Object, java.lang.Object)' I found that changing all of the Hadoop packages from 3.3.0 to 3.2.0 *and* changing the options away from BasicAWSCredentialsProvider solved the problem. Thanks. Regards, Traceback (most recent call last): File "", line 1, in File "/home/daniel/packages/spark-3.0.0-bin-hadoop3.2/python/pyspark/sql/readwriter.py", line 535, in csv return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) File "/home/daniel/packages/spark-3.0.0-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/home/daniel/packages/spark-3.0.0-bin-hadoop3.2/python/pyspark/sql/utils.py", line 131, in deco return f(*a, **kw) File "/home/daniel/packages/spark-3.0.0-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o39.csv. : java.lang.NoSuchMethodError: 'void com.google.common.base.Preconditions.checkArgument(boolean, java.lang.String, java.lang.Object, java.lang.Object)' at org.apache.hadoop.fs.s3a.S3AUtils.lookupPassword(S3AUtils.java:893) at org.apache.hadoop.fs.s3a.S3AUtils.lookupPassword(S3AUtils.java:869) at org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm(S3AUtils.java:1580) at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:341) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361) at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279) at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268) at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:705) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:834) On Thu, 6 Aug 2020 at 17:19, Stephen Coy mailto:s...@infomedia.com.au>> wrote: Hi Daniel, It looks like …BasicAWSCredentialsProvider has become org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider. However, the way that the username and password are provided appears to have changed so you will probably need to look in to that. Cheers, Steve C On 6 Aug 2020, at 11:15 am, Daniel Stojanov mailto:m...@danielstojanov.com>> wrote: Hi, I am trying to read/write files to S3 from PySpark. The procedure that I have used is to download Spark, start PySpark with the hadoop-aws, guava, aws-java-sdk-bundle packages. The versions are explicitly specified by looking up the exact dependency version on Maven. Allowing dependencies to be auto determined does not work. This procedur