Does it work for just a single path input and single output? Is the destPath a collection that is sitting on the driver?
On Sun, 22 Dec 2019, 7:59 pm Ruijing Li, <liruijin...@gmail.com> wrote: > I was experimenting and found something interesting. I have executor OOM > even if I don’t write to remote clusters. So it is purely a dataframe read > and write issue > ————————————————————— > To recap, I have an ETL data pipeline that does some logic, repartitions > to reduce the amount of files written, writes the output to HDFS as parquet > files. After, it reads the output and writes it to other locations, doesn’t > matter if on the same hadoop cluster or multiple. This is a simple piece of > code > ``` > destPaths.foreach(path => > Try(spark.read.parquet(sourceOutputPath).write.mode(SaveMode.Overwrite).parquet(path)) > match { > //log failure or success > } > ``` > However this stage - read from sourceOutput and write to different > locations - is failing in Spark, despite all other stages succeeding, > including the heavy duty logic. And the data is not too big to handle for > spark. > > Only bumping memoryOverhead, and also repartitioning output to more > partitions, 40 precisely (when it failed, we partitioned the output to 20 > after logic is finished but before writing to HDFS) have made the > read&write stage succeed. > > Not understanding how spark read&write stage can experience OOM issues. > Hoping to shed some light on why. > > On Sat, Dec 21, 2019 at 7:57 PM Chris Teoh <chris.t...@gmail.com> wrote: > >> I'm not entirely sure what the behaviour is when writing to remote >> cluster. It could be that the connections are being established for every >> element in your dataframe, perhaps having to use for each partition may >> reduce the number of connections? You may have to look at what the >> executors do when they reach out to the remote cluster. >> >> On Sun, 22 Dec 2019, 8:07 am Ruijing Li, <liruijin...@gmail.com> wrote: >> >>> I managed to make the failing stage work by increasing memoryOverhead to >>> something ridiculous > 50%. Our spark.executor.memory = 12gb and I bumped >>> spark.mesos.executor.memoryOverhead=8G >>> >>> *Can someone explain why this solved the issue?* As I understand, usage >>> of memoryOverhead is for VM overhead and non heap items, which a simple >>> read and write should not use (albeit to different hadoop clusters, but >>> network should be nonissue since they are from the same machines). >>> >>> We use spark defaults for everything else. >>> >>> We are calling df.repartition(20) in our write after logic is done >>> (before failing stage of multiple cluster write) to prevent spark’s small >>> files problem. We reduce from 4000 partitions to 20. >>> >>> On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li <liruijin...@gmail.com> >>> wrote: >>> >>>> Not for the stage that fails, all it does is read and write - the >>>> number of tasks is # of cores * # of executor instances. For us that is 60 >>>> (3 cores 20 executors) >>>> >>>> The input partition size for the failing stage, when spark reads the 20 >>>> files each 132M, it comes out to be 40 partitions. >>>> >>>> >>>> >>>> On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh <chris.t...@gmail.com> >>>> wrote: >>>> >>>>> If you're using Spark SQL, that configuration setting causes a shuffle >>>>> if the number of your input partitions to the write is larger than that >>>>> configuration. >>>>> >>>>> Is there anything in the executor logs or the Spark UI DAG that >>>>> indicates a shuffle? I don't expect a shuffle if it is a straight write. >>>>> What's the input partition size? >>>>> >>>>> On Sat, 21 Dec 2019, 10:24 am Ruijing Li, <liruijin...@gmail.com> >>>>> wrote: >>>>> >>>>>> Could you explain why shuffle partitions might be a good starting >>>>>> point? >>>>>> >>>>>> Some more details: when I write the output the first time after logic >>>>>> is complete, I repartition the files to 20 after having >>>>>> spark.sql.shuffle.partitions = 2000 so we don’t have too many small >>>>>> files. >>>>>> Data is small about 130MB per file. When spark reads it reads in 40 >>>>>> partitions and tries to output that to the different cluster. >>>>>> Unfortunately >>>>>> during that read and write stage executors drop off. >>>>>> >>>>>> We keep hdfs block 128Mb >>>>>> >>>>>> On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh <chris.t...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> spark.sql.shuffle.partitions might be a start. >>>>>>> >>>>>>> Is there a difference in the number of partitions when the parquet >>>>>>> is read to spark.sql.shuffle.partitions? Is it much higher than >>>>>>> spark.sql.shuffle.partitions? >>>>>>> >>>>>>> On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, <liruijin...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> I have encountered a strange executor OOM error. I have a data >>>>>>>> pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the >>>>>>>> output to >>>>>>>> one HDFS location as parquet then reads the files back in and writes to >>>>>>>> multiple hadoop clusters (all co-located in the same datacenter). It >>>>>>>> should be a very simple task, but executors are being killed off >>>>>>>> exceeding >>>>>>>> container thresholds. From logs, it is exceeding given memory (using >>>>>>>> Mesos >>>>>>>> as the cluster manager). >>>>>>>> >>>>>>>> The ETL process works perfectly fine with the given resources, >>>>>>>> doing joins and adding columns. The output is written successfully the >>>>>>>> first time. *Only when the pipeline at the end reads the output >>>>>>>> from HDFS and writes it to different HDFS cluster paths does it fail.* >>>>>>>> (It >>>>>>>> does a spark.read.parquet(source).write.parquet(dest)) >>>>>>>> >>>>>>>> This doesn't really make sense and I'm wondering what >>>>>>>> configurations I should start looking at. >>>>>>>> >>>>>>>> -- >>>>>>>> Cheers, >>>>>>>> Ruijing Li >>>>>>>> -- >>>>>>>> Cheers, >>>>>>>> Ruijing Li >>>>>>>> >>>>>>> -- >>>>>> Cheers, >>>>>> Ruijing Li >>>>>> >>>>> -- >>>> Cheers, >>>> Ruijing Li >>>> >>> -- >>> Cheers, >>> Ruijing Li >>> -- >>> Cheers, >>> Ruijing Li >>> >> -- > Cheers, > Ruijing Li >