"But it will be faster to use S3 (or GCS) through some network and it will be faster than writing to the local SSD. I don't understand the point here." Minio is a S3 mock, so you run minio local.
tor. 7. apr. 2022 kl. 09:27 skrev Mich Talebzadeh <mich.talebza...@gmail.com >: > Ok so that is your assumption. The whole thing is based on-premise on JBOD > (including hadoop cluster which has Spark binaries on each node as I > understand) as I understand. But it will be faster to use S3 (or GCS) > through some network and it will be faster than writing to the local SSD. I > don't understand the point here. > > Also it appears the thread owner is talking about having HBase on Hadoop > cluster on some node eating memory. This can be easily sorted by moving > HBase to its own cluster, which will ease up Hadoop, Spark and HBase > competing for resources. It is possible that the issue is with HBase setup > as well. > > HTH > > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 7 Apr 2022 at 08:11, Bjørn Jørgensen <bjornjorgen...@gmail.com> > wrote: > >> >> 1. Where does S3 come into this >> >> He is processing data for each day at a time. So to dump each day to a >> fast storage he can use parquet files and write it to S3. >> >> ons. 6. apr. 2022 kl. 22:27 skrev Mich Talebzadeh < >> mich.talebza...@gmail.com>: >> >>> >>> Your statement below: >>> >>> >>> I believe I have found the issue: the job writes data to hbase which is >>> on the same cluster. >>> When I keep on processing data and writing with spark to hbase , >>> eventually the garbage collection can not keep up anymore for hbase, and >>> the hbase memory consumption increases. As the clusters hosts both hbase >>> and spark, this leads to an overall increase and at some point you hit the >>> limit of the available memory on each worker. >>> I dont think the spark memory is increasing over time. >>> >>> >>> 1. Where is your cluster on Prem? Do you Have a Hadoop cluster >>> with spark using the same nodes as HDFS? >>> 2. Is your Hbase clustered or standalone and has been created on >>> HDFS nodes >>> 3. Are you writing to Hbase through phoenix or straight to HBase >>> 4. Where does S3 come into this >>> >>> >>> HTH >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Wed, 6 Apr 2022 at 16:41, Joris Billen <joris.bil...@bigindustries.be> >>> wrote: >>> >>>> HI, >>>> thanks for your reply. >>>> >>>> >>>> I believe I have found the issue: the job writes data to hbase which is >>>> on the same cluster. >>>> When I keep on processing data and writing with spark to hbase , >>>> eventually the garbage collection can not keep up anymore for hbase, and >>>> the hbase memory consumption increases. As the clusters hosts both hbase >>>> and spark, this leads to an overall increase and at some point you hit the >>>> limit of the available memory on each worker. >>>> I dont think the spark memory is increasing over time. >>>> >>>> >>>> >>>> Here more details: >>>> >>>> **Spark: 2.4 >>>> **operation: many spark sql statements followed by writing data to a >>>> nosql db from spark >>>> like this: >>>> df=read(fromhdfs) >>>> df2=spark.sql(using df 1) >>>> ..df10=spark.sql(using df9) >>>> spark.sql(CACHE TABLE df10) >>>> df11 =spark.sql(using df10) >>>> df11.write >>>> Df12 =spark.sql(using df10) >>>> df12.write >>>> df13 =spark.sql(using df10) >>>> df13.write >>>> **caching: yes one df that I will use to eventually write 3 x to a db >>>> (those 3 are different) >>>> **Loops: since I need to process several years, and processing 1 day is >>>> already a complex process (40 minutes on 9 node cluster running quite a bit >>>> of executors). So in the end it will do all at one go and there is a limit >>>> of how much data I can process in one go with the available resources. >>>> Some people here pointed out they believe this looping should not be >>>> necessary. But what is the alternative? >>>> —> Maybe I can write to disk somewhere in the middle, and read again >>>> from there so that in the end not all must happen in one go in memory. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On 5 Apr 2022, at 14:58, Gourav Sengupta <gourav.sengu...@gmail.com> >>>> wrote: >>>> >>>> Hi, >>>> >>>> can you please give details around: >>>> spark version, what is the operation that you are running, why in >>>> loops, and whether you are caching in any data or not, and whether you are >>>> referencing the variables to create them like in the following expression >>>> we are referencing x to create x, x = x + 1 >>>> >>>> Thanks and Regards, >>>> Gourav Sengupta >>>> >>>> On Mon, Apr 4, 2022 at 10:51 AM Joris Billen < >>>> joris.bil...@bigindustries.be> wrote: >>>> >>>>> Clear-probably not a good idea. >>>>> >>>>> But a previous comment said “you are doing everything in the end in >>>>> one go”. >>>>> So this made me wonder: in case your only action is a write in the end >>>>> after lots of complex transformations, then what is the alternative for >>>>> writing in the end which means doing everything all at once in the end? My >>>>> understanding is that if there is no need for an action earlier, you will >>>>> do all at the end, which means there is a limitation to how many days you >>>>> can process at once. And hence the solution is to loop over a couple days, >>>>> and submit always the same spark job just for other input. >>>>> >>>>> >>>>> Thanks! >>>>> >>>>> On 1 Apr 2022, at 15:26, Sean Owen <sro...@gmail.com> wrote: >>>>> >>>>> This feels like premature optimization, and not clear it's optimizing, >>>>> but maybe. >>>>> Caching things that are used once is worse than not caching. It looks >>>>> like a straight-line through to the write, so I doubt caching helps >>>>> anything here. >>>>> >>>>> On Fri, Apr 1, 2022 at 2:49 AM Joris Billen < >>>>> joris.bil...@bigindustries.be> wrote: >>>>> >>>>>> Hi, >>>>>> as said thanks for little discussion over mail. >>>>>> I understand that the action is triggered in the end at the write and >>>>>> then all of a sudden everything is executed at once. But I dont really >>>>>> need >>>>>> to trigger an action before. I am caching somewherew a df that will be >>>>>> reused several times (slightly updated pseudocode below). >>>>>> >>>>>> Question: is it then better practice to already trigger some actions >>>>>> on intermediate data frame (like df4 and df8), and cache them? So that >>>>>> these actions will not be that expensive yet, and the actions to write at >>>>>> the end will require less resources, which would allow to process more >>>>>> days >>>>>> in one go? LIke what is added in red in improvement section in the >>>>>> pseudo code below? >>>>>> >>>>>> >>>>>> >>>>>> *pseudocode:* >>>>>> >>>>>> >>>>>> *loop over all days:* >>>>>> * spark submit 1 day* >>>>>> >>>>>> >>>>>> >>>>>> with spark submit (overly simplified)= >>>>>> >>>>>> >>>>>> * df=spark.read(hfs://somepath)* >>>>>> * …* >>>>>> * ##IMPROVEMENT START* >>>>>> * df4=spark.sql(some stuff with df3)* >>>>>> * spark.sql(CACHE TABLE df4)* >>>>>> * …* >>>>>> * df8=spark.sql(some stuff with df7)* >>>>>> * spark.sql(CACHE TABLE df8)* >>>>>> * ##IMPROVEMENT END* >>>>>> * ...* >>>>>> * df12=df11.spark.sql(complex stufff)* >>>>>> * spark.sql(CACHE TABLE df10)* >>>>>> * ...* >>>>>> * df13=spark.sql( complex stuff with df12)* >>>>>> * df13.write * >>>>>> * df14=spark.sql( some other complex stuff with df12)* >>>>>> * df14.write * >>>>>> * df15=spark.sql( some completely other complex stuff with df12)* >>>>>> * df15.write * >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> THanks! >>>>>> >>>>>> >>>>>> >>>>>> On 31 Mar 2022, at 14:37, Sean Owen <sro...@gmail.com> wrote: >>>>>> >>>>>> If that is your loop unrolled, then you are not doing parts of work >>>>>> at a time. That will execute all operations in one go when the write >>>>>> finally happens. That's OK, but may be part of the problem. For example >>>>>> if >>>>>> you are filtering for a subset, processing, and unioning, then that is >>>>>> just >>>>>> a harder and slower way of applying the transformation to all data at >>>>>> once. >>>>>> >>>>>> On Thu, Mar 31, 2022 at 3:30 AM Joris Billen < >>>>>> joris.bil...@bigindustries.be> wrote: >>>>>> >>>>>>> Thanks for reply :-) >>>>>>> >>>>>>> I am using pyspark. Basicially my code (simplified is): >>>>>>> >>>>>>> df=spark.read.csv(hdfs://somehdfslocation) >>>>>>> df1=spark.sql (complex statement using df) >>>>>>> ... >>>>>>> dfx=spark.sql(complex statement using df x-1) >>>>>>> ... >>>>>>> dfx15.write() >>>>>>> >>>>>>> >>>>>>> What exactly is meant by "closing resources"? Is it just >>>>>>> unpersisting cached dataframes at the end and stopping the spark context >>>>>>> explicitly: sc.stop()? >>>>>>> >>>>>>> >>>>>>> FOr processing many years at once versus a chunk in a loop: I see >>>>>>> that if I go up to certain number of days, one iteration will start to >>>>>>> have >>>>>>> tasks that fail. So I only take a limited number of days, and do this >>>>>>> process several times. Isnt this normal as you are always somehow >>>>>>> limited >>>>>>> in terms of resources (I have 9 nodes wiht 32GB). Or is it like this >>>>>>> that >>>>>>> in theory you could process any volume, in case you wait long enough? I >>>>>>> guess spark can only break down the tasks up to a certain level (based >>>>>>> on >>>>>>> the datasets' and the intermediate results’ partitions) and at some >>>>>>> moment >>>>>>> you hit the limit where your resources are not sufficient anymore to >>>>>>> process such one task? Maybe you can tweak it a bit, but in the end >>>>>>> you’ll >>>>>>> hit a limit? >>>>>>> >>>>>>> >>>>>>> >>>>>>> Concretely following topics would be interesting to find out more >>>>>>> about (links): >>>>>>> -where to see what you are still consuming after spark job ended if >>>>>>> you didnt close resources >>>>>>> -memory leaks for pyspark >>>>>>> -good article about closing resources (you find tons of snippets on >>>>>>> how to start spark context+ config for number/cores/memory of >>>>>>> worker/executors etc, but never saw a focus on making sure you clean up >>>>>>> —> >>>>>>> or is it just stopping the spark context) >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 30 Mar 2022, at 21:24, Bjørn Jørgensen <bjornjorgen...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>> It`s quite impossible for anyone to answer your question about what >>>>>>> is eating your memory, without even knowing what language you are using. >>>>>>> >>>>>>> If you are using C then it`s always pointers, that's the mem issue. >>>>>>> If you are using python, there can be some like not using context >>>>>>> manager like With Context Managers and Python's with Statement >>>>>>> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Frealpython.com%2Fpython-with-statement%2F&data=04%7C01%7Cjoris.billen%40bigindustries.be%7C1052bbbb08ce41659e5d08da1703fc52%7C49c3d703357947bfa8887c913fbdced9%7C0%7C0%7C637847603163284467%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=xs5S5MLsdtZAod%2FotAiT7Ta7ENXcA4N4lEaDKjxHzYg%3D&reserved=0> >>>>>>> >>>>>>> And another can be not to close resources after use. >>>>>>> >>>>>>> In my experience you can process 3 years or more of data, IF you are >>>>>>> closing opened resources. >>>>>>> I use the web GUI http://spark:4040 to follow what spark is doing. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> ons. 30. mar. 2022 kl. 17:41 skrev Joris Billen < >>>>>>> joris.bil...@bigindustries.be>: >>>>>>> >>>>>>>> Thanks for answer-much appreciated! This forum is very useful :-) >>>>>>>> >>>>>>>> I didnt know the sparkcontext stays alive. I guess this is eating >>>>>>>> up memory. The eviction means that he knows that he should clear some >>>>>>>> of >>>>>>>> the old cached memory to be able to store new one. In case anyone has >>>>>>>> good >>>>>>>> articles about memory leaks I would be interested to read. >>>>>>>> I will try to add following lines at the end of my job (as I cached >>>>>>>> the table in spark sql): >>>>>>>> >>>>>>>> >>>>>>>> *sqlContext.sql("UNCACHE TABLE mytableofinterest ")* >>>>>>>> *spark.stop()* >>>>>>>> >>>>>>>> >>>>>>>> Wrt looping: if I want to process 3 years of data, my modest >>>>>>>> cluster will never do it one go , I would expect? I have to break it >>>>>>>> down >>>>>>>> in smaller pieces and run that in a loop (1 day is already lots of >>>>>>>> data). >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Thanks! >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 30 Mar 2022, at 17:25, Sean Owen <sro...@gmail.com> wrote: >>>>>>>> >>>>>>>> The Spark context does not stop when a job does. It stops when you >>>>>>>> stop it. There could be many ways mem can leak. Caching maybe - but it >>>>>>>> will >>>>>>>> evict. You should be clearing caches when no longer needed. >>>>>>>> >>>>>>>> I would guess it is something else your program holds on to in its >>>>>>>> logic. >>>>>>>> >>>>>>>> Also consider not looping; there is probably a faster way to do it >>>>>>>> in one go. >>>>>>>> >>>>>>>> On Wed, Mar 30, 2022, 10:16 AM Joris Billen < >>>>>>>> joris.bil...@bigindustries.be> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> I have a pyspark job submitted through spark-submit that does some >>>>>>>>> heavy processing for 1 day of data. It runs with no errors. I have to >>>>>>>>> loop >>>>>>>>> over many days, so I run this spark job in a loop. I notice after >>>>>>>>> couple >>>>>>>>> executions the memory is increasing on all worker nodes and >>>>>>>>> eventually this >>>>>>>>> leads to faillures. My job does some caching, but I understand that >>>>>>>>> when >>>>>>>>> the job ends successfully, then the sparkcontext is destroyed and the >>>>>>>>> cache >>>>>>>>> should be cleared. However it seems that something keeps on filling >>>>>>>>> the >>>>>>>>> memory a bit more and more after each run. THis is the memory >>>>>>>>> behaviour >>>>>>>>> over time, which in the end will start leading to failures : >>>>>>>>> >>>>>>>>> (what we see is: green=physical memory used, green-blue=physical >>>>>>>>> memory cached, grey=memory capacity =straight line around 31GB ) >>>>>>>>> This runs on a healthy spark 2.4 and was optimized already to come >>>>>>>>> to a stable job in terms of spark-submit resources parameters like >>>>>>>>> driver-memory/num-executors/executor-memory/executor-cores/spark.locality.wait). >>>>>>>>> Any clue how to “really” clear the memory in between jobs? So >>>>>>>>> basically currently I can loop 10x and then need to restart my >>>>>>>>> cluster so >>>>>>>>> all memory is cleared completely. >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks for any info! >>>>>>>>> >>>>>>>>> <Screenshot 2022-03-30 at 15.28.24.png> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Bjørn Jørgensen >>>>>>> Vestre Aspehaug 4, 6010 Ålesund >>>>>>> Norge >>>>>>> >>>>>>> +47 480 94 297 >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> >> -- >> Bjørn Jørgensen >> Vestre Aspehaug 4, 6010 Ålesund >> Norge >> >> +47 480 94 297 >> > -- Bjørn Jørgensen Vestre Aspehaug 4, 6010 Ålesund Norge +47 480 94 297