The stack-trace for the OOM:

21/04/21 21:40:43 WARN TaskSetManager: Lost task 1.2 in stage 2.0 (TID 57,
> 10.139.64.6, executor 3): org.apache.beam.sdk.util.UserCodeException:
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> at
> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteVoid$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
> at
> org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
> at
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:140)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
> at
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
> at org.apache.spark.scheduler.Task.run(Task.scala:113)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
>

I should note this exception is not always printed. The issue is usually
represented by an ExecutorLostFailure (I am assuming this is caused by an
OOM-error):

>
> 21/04/21 21:44:52 ERROR ScalaDriverLocal: User Code Stack Trace:
> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 5 in stage 2.0 failed 4 times, most recent
> failure: Lost task 5.3 in stage 2.0 (TID 62, 10.139.64.6, executor 3):
> ExecutorLostFailure (executor 3 exited caused by one of the running tasks)
> Reason: Executor heartbeat timed out after 240254 ms
> Driver stacktrace:
> at
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:92)
> at org.odp.beam.sdk.OdpPipeline.run(OdpPipeline.java:79)
> at org.odp.beam.sdk.OdpPipeline.run(OdpPipeline.java:60)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
> at org.odp.beam.sdk.OdpPipeline.runThenExit(OdpPipeline.java:93)
> at
> org.odp.pipelines.emodnet_bronze.EmodNetBronze.runPipeline(EmodNetBronze.java:203)
> at
> org.odp.pipelines.emodnet_bronze.EmodNetBronze.main(EmodNetBronze.java:209)
> at
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command--1:1)
> at
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw$$iw.<init>(command--1:44)
> at
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw.<init>(command--1:46)
> at
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw.<init>(command--1:48)
> at
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw.<init>(command--1:50)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw.<init>(command--1:52)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read.<init>(command--1:54)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$.<init>(command--1:58)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$.<clinit>(command--1)
> at
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval$.$print$lzycompute(<notebook>:7)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval$.$print(<notebook>:6)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval.$print(<notebook>)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
> at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
> at
> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645)
> at
> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644)
> at
> scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
> at
> scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
> at
> scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
> at
> com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215)
> at
> com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:202)
> at
> com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202)
> at
> com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202)
> at
> com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:714)
> at
> com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:667)
> at
> com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:202)
> at
> com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:396)
> at
> com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:373)
> at
> com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233)
> at
> com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49)
> at
> com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:275)
> at
> com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49)
> at
> com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:373)
> at
> com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
> at
> com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
> at scala.util.Try$.apply(Try.scala:192)
> at
> com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639)
> at
> com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485)
> at
> com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597)
> at
> com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390)
> at
> com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
> at
> com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 5 in stage 2.0 failed 4 times, most recent failure: Lost task
> 5.3 in stage 2.0 (TID 62, 10.139.64.6, executor 3): ExecutorLostFailure
> (executor 3 exited caused by one of the running tasks) Reason: Executor
> heartbeat timed out after 240254 ms
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2582)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2282)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2304)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2348)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:979)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:977)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:392)
> at org.apache.spark.rdd.RDD.foreach(RDD.scala:977)
> at
> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:359)
> at
> org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
> at
> org.apache.beam.runners.spark.translation.BoundedDataset.action(BoundedDataset.java:127)
> at
> org.apache.beam.runners.spark.translation.EvaluationContext.computeOutputs(EvaluationContext.java:228)
> at
> org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:241)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
> 21/04/21 21:44:52 INFO ProgressReporter$: Removed result fetcher for
> 4169729928902178844_6376508440917463187_job-95-run-16-action-295
>

xxx

On Mon, Apr 26, 2021 at 1:03 PM Alexey Romanenko <aromanenko....@gmail.com>
wrote:

> Hi Thomas,
>
> Could you share the stack trace of your OOM and, if possible, the code
> snippet of your pipeline?
> Afaik, usually only “large" GroupByKey transforms, caused by “hot keys”,
> may lead to OOM with SparkRunner.
>
> —
> Alexey
>
>
> > On 26 Apr 2021, at 08:23, Thomas Fredriksen(External) <
> thomas.fredrik...@cognite.com> wrote:
> >
> > Good morning,
> >
> > We are ingesting a very large dataset into our database using Beam on
> Spark. The dataset is available through a REST-like API and is splicedin
> such a way so that in order to obtain the whole dataset, we must do around
> 24000 API calls.
> >
> > All in all, this results in 24000 CSV files that need to be parsed then
> written to our database.
> >
> > Unfortunately, we are encountering some OutOfMemoryErrors along the way.
> From what we have gathered, this is due to the data being queued between
> transforms in the pipeline. In order to mitigate this, we have tried to
> implement a streaming-scheme where the requests streamed to the request
> executor, the flows to the database. This too produced the OOM-error.
> >
> > What are the best ways of implementing such pipelines so as to minimize
> the memory footprint? Are there any differences between runners we should
> be aware of here? (e.g. between Dataflow and Spark)
>
>

Reply via email to