The stack-trace for the OOM: 21/04/21 21:40:43 WARN TaskSetManager: Lost task 1.2 in stage 2.0 (TID 57, > 10.139.64.6, executor 3): org.apache.beam.sdk.util.UserCodeException: > java.lang.OutOfMemoryError: GC overhead limit exceeded > at > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) > at > org.apache.beam.sdk.io.jdbc.JdbcIO$WriteVoid$WriteFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191) > at > org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65) > at > org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:140) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136) > at > scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979) > at > org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140) > at org.apache.spark.scheduler.Task.run(Task.scala:113) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded >
I should note this exception is not always printed. The issue is usually represented by an ExecutorLostFailure (I am assuming this is caused by an OOM-error): > > 21/04/21 21:44:52 ERROR ScalaDriverLocal: User Code Stack Trace: > java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted > due to stage failure: Task 5 in stage 2.0 failed 4 times, most recent > failure: Lost task 5.3 in stage 2.0 (TID 62, 10.139.64.6, executor 3): > ExecutorLostFailure (executor 3 exited caused by one of the running tasks) > Reason: Executor heartbeat timed out after 240254 ms > Driver stacktrace: > at > org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60) > at > org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77) > at > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104) > at > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:92) > at org.odp.beam.sdk.OdpPipeline.run(OdpPipeline.java:79) > at org.odp.beam.sdk.OdpPipeline.run(OdpPipeline.java:60) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) > at org.odp.beam.sdk.OdpPipeline.runThenExit(OdpPipeline.java:93) > at > org.odp.pipelines.emodnet_bronze.EmodNetBronze.runPipeline(EmodNetBronze.java:203) > at > org.odp.pipelines.emodnet_bronze.EmodNetBronze.main(EmodNetBronze.java:209) > at > lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command--1:1) > at > lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw$$iw.<init>(command--1:44) > at > lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw.<init>(command--1:46) > at > lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw.<init>(command--1:48) > at > lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw.<init>(command--1:50) > at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw.<init>(command--1:52) > at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read.<init>(command--1:54) > at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$.<init>(command--1:58) > at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$.<clinit>(command--1) > at > lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval$.$print$lzycompute(<notebook>:7) > at lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval$.$print(<notebook>:6) > at lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval.$print(<notebook>) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793) > at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054) > at > scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645) > at > scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644) > at > scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) > at > scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19) > at > scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644) > at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576) > at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572) > at > com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215) > at > com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:202) > at > com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202) > at > com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202) > at > com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:714) > at > com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:667) > at > com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:202) > at > com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:396) > at > com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:373) > at > com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233) > at > com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49) > at > com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:275) > at > com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49) > at > com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:373) > at > com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644) > at > com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644) > at scala.util.Try$.apply(Try.scala:192) > at > com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639) > at > com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485) > at > com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597) > at > com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390) > at > com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337) > at > com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.spark.SparkException: Job aborted due to stage > failure: Task 5 in stage 2.0 failed 4 times, most recent failure: Lost task > 5.3 in stage 2.0 (TID 62, 10.139.64.6, executor 3): ExecutorLostFailure > (executor 3 exited caused by one of the running tasks) Reason: Executor > heartbeat timed out after 240254 ms > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2582) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2282) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2304) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2348) > at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:979) > at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:977) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:392) > at org.apache.spark.rdd.RDD.foreach(RDD.scala:977) > at > org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:359) > at > org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45) > at > org.apache.beam.runners.spark.translation.BoundedDataset.action(BoundedDataset.java:127) > at > org.apache.beam.runners.spark.translation.EvaluationContext.computeOutputs(EvaluationContext.java:228) > at > org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:241) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > 21/04/21 21:44:52 INFO ProgressReporter$: Removed result fetcher for > 4169729928902178844_6376508440917463187_job-95-run-16-action-295 > xxx On Mon, Apr 26, 2021 at 1:03 PM Alexey Romanenko <aromanenko....@gmail.com> wrote: > Hi Thomas, > > Could you share the stack trace of your OOM and, if possible, the code > snippet of your pipeline? > Afaik, usually only “large" GroupByKey transforms, caused by “hot keys”, > may lead to OOM with SparkRunner. > > — > Alexey > > > > On 26 Apr 2021, at 08:23, Thomas Fredriksen(External) < > thomas.fredrik...@cognite.com> wrote: > > > > Good morning, > > > > We are ingesting a very large dataset into our database using Beam on > Spark. The dataset is available through a REST-like API and is splicedin > such a way so that in order to obtain the whole dataset, we must do around > 24000 API calls. > > > > All in all, this results in 24000 CSV files that need to be parsed then > written to our database. > > > > Unfortunately, we are encountering some OutOfMemoryErrors along the way. > From what we have gathered, this is due to the data being queued between > transforms in the pipeline. In order to mitigate this, we have tried to > implement a streaming-scheme where the requests streamed to the request > executor, the flows to the database. This too produced the OOM-error. > > > > What are the best ways of implementing such pipelines so as to minimize > the memory footprint? Are there any differences between runners we should > be aware of here? (e.g. between Dataflow and Spark) > >