Hi, super duper. Please try to see if you can write out the data to S3, and then write a load script to load that data from S3 to HBase.
Regards, Gourav Sengupta On Wed, Apr 6, 2022 at 4:39 PM Joris Billen <joris.bil...@bigindustries.be> wrote: > HI, > thanks for your reply. > > > I believe I have found the issue: the job writes data to hbase which is on > the same cluster. > When I keep on processing data and writing with spark to hbase , > eventually the garbage collection can not keep up anymore for hbase, and > the hbase memory consumption increases. As the clusters hosts both hbase > and spark, this leads to an overall increase and at some point you hit the > limit of the available memory on each worker. > I dont think the spark memory is increasing over time. > > > > Here more details: > > **Spark: 2.4 > **operation: many spark sql statements followed by writing data to a nosql > db from spark > like this: > df=read(fromhdfs) > df2=spark.sql(using df 1) > ..df10=spark.sql(using df9) > spark.sql(CACHE TABLE df10) > df11 =spark.sql(using df10) > df11.write > Df12 =spark.sql(using df10) > df12.write > df13 =spark.sql(using df10) > df13.write > **caching: yes one df that I will use to eventually write 3 x to a db > (those 3 are different) > **Loops: since I need to process several years, and processing 1 day is > already a complex process (40 minutes on 9 node cluster running quite a bit > of executors). So in the end it will do all at one go and there is a limit > of how much data I can process in one go with the available resources. > Some people here pointed out they believe this looping should not be > necessary. But what is the alternative? > —> Maybe I can write to disk somewhere in the middle, and read again from > there so that in the end not all must happen in one go in memory. > > > > > > > > On 5 Apr 2022, at 14:58, Gourav Sengupta <gourav.sengu...@gmail.com> > wrote: > > Hi, > > can you please give details around: > spark version, what is the operation that you are running, why in loops, > and whether you are caching in any data or not, and whether you are > referencing the variables to create them like in the following expression > we are referencing x to create x, x = x + 1 > > Thanks and Regards, > Gourav Sengupta > > On Mon, Apr 4, 2022 at 10:51 AM Joris Billen < > joris.bil...@bigindustries.be> wrote: > >> Clear-probably not a good idea. >> >> But a previous comment said “you are doing everything in the end in one >> go”. >> So this made me wonder: in case your only action is a write in the end >> after lots of complex transformations, then what is the alternative for >> writing in the end which means doing everything all at once in the end? My >> understanding is that if there is no need for an action earlier, you will >> do all at the end, which means there is a limitation to how many days you >> can process at once. And hence the solution is to loop over a couple days, >> and submit always the same spark job just for other input. >> >> >> Thanks! >> >> On 1 Apr 2022, at 15:26, Sean Owen <sro...@gmail.com> wrote: >> >> This feels like premature optimization, and not clear it's optimizing, >> but maybe. >> Caching things that are used once is worse than not caching. It looks >> like a straight-line through to the write, so I doubt caching helps >> anything here. >> >> On Fri, Apr 1, 2022 at 2:49 AM Joris Billen < >> joris.bil...@bigindustries.be> wrote: >> >>> Hi, >>> as said thanks for little discussion over mail. >>> I understand that the action is triggered in the end at the write and >>> then all of a sudden everything is executed at once. But I dont really need >>> to trigger an action before. I am caching somewherew a df that will be >>> reused several times (slightly updated pseudocode below). >>> >>> Question: is it then better practice to already trigger some actions on >>> intermediate data frame (like df4 and df8), and cache them? So that these >>> actions will not be that expensive yet, and the actions to write at the end >>> will require less resources, which would allow to process more days in one >>> go? LIke what is added in red in improvement section in the pseudo code >>> below? >>> >>> >>> >>> *pseudocode:* >>> >>> >>> *loop over all days:* >>> * spark submit 1 day* >>> >>> >>> >>> with spark submit (overly simplified)= >>> >>> >>> * df=spark.read(hfs://somepath)* >>> * …* >>> * ##IMPROVEMENT START* >>> * df4=spark.sql(some stuff with df3)* >>> * spark.sql(CACHE TABLE df4)* >>> * …* >>> * df8=spark.sql(some stuff with df7)* >>> * spark.sql(CACHE TABLE df8)* >>> * ##IMPROVEMENT END* >>> * ...* >>> * df12=df11.spark.sql(complex stufff)* >>> * spark.sql(CACHE TABLE df10)* >>> * ...* >>> * df13=spark.sql( complex stuff with df12)* >>> * df13.write * >>> * df14=spark.sql( some other complex stuff with df12)* >>> * df14.write * >>> * df15=spark.sql( some completely other complex stuff with df12)* >>> * df15.write * >>> >>> >>> >>> >>> >>> >>> THanks! >>> >>> >>> >>> On 31 Mar 2022, at 14:37, Sean Owen <sro...@gmail.com> wrote: >>> >>> If that is your loop unrolled, then you are not doing parts of work at a >>> time. That will execute all operations in one go when the write finally >>> happens. That's OK, but may be part of the problem. For example if you are >>> filtering for a subset, processing, and unioning, then that is just a >>> harder and slower way of applying the transformation to all data at once. >>> >>> On Thu, Mar 31, 2022 at 3:30 AM Joris Billen < >>> joris.bil...@bigindustries.be> wrote: >>> >>>> Thanks for reply :-) >>>> >>>> I am using pyspark. Basicially my code (simplified is): >>>> >>>> df=spark.read.csv(hdfs://somehdfslocation) >>>> df1=spark.sql (complex statement using df) >>>> ... >>>> dfx=spark.sql(complex statement using df x-1) >>>> ... >>>> dfx15.write() >>>> >>>> >>>> What exactly is meant by "closing resources"? Is it just unpersisting >>>> cached dataframes at the end and stopping the spark context explicitly: >>>> sc.stop()? >>>> >>>> >>>> FOr processing many years at once versus a chunk in a loop: I see that >>>> if I go up to certain number of days, one iteration will start to have >>>> tasks that fail. So I only take a limited number of days, and do this >>>> process several times. Isnt this normal as you are always somehow limited >>>> in terms of resources (I have 9 nodes wiht 32GB). Or is it like this that >>>> in theory you could process any volume, in case you wait long enough? I >>>> guess spark can only break down the tasks up to a certain level (based on >>>> the datasets' and the intermediate results’ partitions) and at some moment >>>> you hit the limit where your resources are not sufficient anymore to >>>> process such one task? Maybe you can tweak it a bit, but in the end you’ll >>>> hit a limit? >>>> >>>> >>>> >>>> Concretely following topics would be interesting to find out more >>>> about (links): >>>> -where to see what you are still consuming after spark job ended if you >>>> didnt close resources >>>> -memory leaks for pyspark >>>> -good article about closing resources (you find tons of snippets on how >>>> to start spark context+ config for number/cores/memory of worker/executors >>>> etc, but never saw a focus on making sure you clean up —> or is it just >>>> stopping the spark context) >>>> >>>> >>>> >>>> >>>> On 30 Mar 2022, at 21:24, Bjørn Jørgensen <bjornjorgen...@gmail.com> >>>> wrote: >>>> >>>> It`s quite impossible for anyone to answer your question about what is >>>> eating your memory, without even knowing what language you are using. >>>> >>>> If you are using C then it`s always pointers, that's the mem issue. >>>> If you are using python, there can be some like not using context >>>> manager like With Context Managers and Python's with Statement >>>> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Frealpython.com%2Fpython-with-statement%2F&data=04%7C01%7Cjoris.billen%40bigindustries.be%7C1052bbbb08ce41659e5d08da1703fc52%7C49c3d703357947bfa8887c913fbdced9%7C0%7C0%7C637847603163284467%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=xs5S5MLsdtZAod%2FotAiT7Ta7ENXcA4N4lEaDKjxHzYg%3D&reserved=0> >>>> >>>> And another can be not to close resources after use. >>>> >>>> In my experience you can process 3 years or more of data, IF you are >>>> closing opened resources. >>>> I use the web GUI http://spark:4040 to follow what spark is doing. >>>> >>>> >>>> >>>> >>>> ons. 30. mar. 2022 kl. 17:41 skrev Joris Billen < >>>> joris.bil...@bigindustries.be>: >>>> >>>>> Thanks for answer-much appreciated! This forum is very useful :-) >>>>> >>>>> I didnt know the sparkcontext stays alive. I guess this is eating up >>>>> memory. The eviction means that he knows that he should clear some of the >>>>> old cached memory to be able to store new one. In case anyone has good >>>>> articles about memory leaks I would be interested to read. >>>>> I will try to add following lines at the end of my job (as I cached >>>>> the table in spark sql): >>>>> >>>>> >>>>> *sqlContext.sql("UNCACHE TABLE mytableofinterest ")* >>>>> *spark.stop()* >>>>> >>>>> >>>>> Wrt looping: if I want to process 3 years of data, my modest cluster >>>>> will never do it one go , I would expect? I have to break it down in >>>>> smaller pieces and run that in a loop (1 day is already lots of data). >>>>> >>>>> >>>>> >>>>> Thanks! >>>>> >>>>> >>>>> >>>>> >>>>> On 30 Mar 2022, at 17:25, Sean Owen <sro...@gmail.com> wrote: >>>>> >>>>> The Spark context does not stop when a job does. It stops when you >>>>> stop it. There could be many ways mem can leak. Caching maybe - but it >>>>> will >>>>> evict. You should be clearing caches when no longer needed. >>>>> >>>>> I would guess it is something else your program holds on to in its >>>>> logic. >>>>> >>>>> Also consider not looping; there is probably a faster way to do it in >>>>> one go. >>>>> >>>>> On Wed, Mar 30, 2022, 10:16 AM Joris Billen < >>>>> joris.bil...@bigindustries.be> wrote: >>>>> >>>>>> Hi, >>>>>> I have a pyspark job submitted through spark-submit that does some >>>>>> heavy processing for 1 day of data. It runs with no errors. I have to >>>>>> loop >>>>>> over many days, so I run this spark job in a loop. I notice after couple >>>>>> executions the memory is increasing on all worker nodes and eventually >>>>>> this >>>>>> leads to faillures. My job does some caching, but I understand that when >>>>>> the job ends successfully, then the sparkcontext is destroyed and the >>>>>> cache >>>>>> should be cleared. However it seems that something keeps on filling the >>>>>> memory a bit more and more after each run. THis is the memory behaviour >>>>>> over time, which in the end will start leading to failures : >>>>>> >>>>>> (what we see is: green=physical memory used, green-blue=physical >>>>>> memory cached, grey=memory capacity =straight line around 31GB ) >>>>>> This runs on a healthy spark 2.4 and was optimized already to come to >>>>>> a stable job in terms of spark-submit resources parameters like >>>>>> driver-memory/num-executors/executor-memory/executor-cores/spark.locality.wait). >>>>>> Any clue how to “really” clear the memory in between jobs? So >>>>>> basically currently I can loop 10x and then need to restart my cluster so >>>>>> all memory is cleared completely. >>>>>> >>>>>> >>>>>> Thanks for any info! >>>>>> >>>>>> <Screenshot 2022-03-30 at 15.28.24.png> >>>>> >>>>> >>>>> >>>> >>>> -- >>>> Bjørn Jørgensen >>>> Vestre Aspehaug 4, 6010 Ålesund >>>> Norge >>>> >>>> +47 480 94 297 >>>> >>>> >>>> >>> >> >