Hi,
super duper.

Please try to see if you can write out the data to S3, and then write a
load script to load that data from S3 to HBase.


Regards,
Gourav Sengupta


On Wed, Apr 6, 2022 at 4:39 PM Joris Billen <joris.bil...@bigindustries.be>
wrote:

> HI,
> thanks for your reply.
>
>
> I believe I have found the issue: the job writes data to hbase which is on
> the same cluster.
> When I keep on processing data and writing with spark to hbase ,
> eventually the garbage collection can not keep up anymore for hbase, and
> the hbase memory consumption increases. As the clusters hosts both hbase
> and spark, this leads to an overall increase and at some point you hit the
> limit of the available memory on each worker.
> I dont think the spark memory is increasing over time.
>
>
>
> Here more details:
>
> **Spark: 2.4
> **operation: many spark sql statements followed by writing data to a nosql
> db from spark
> like this:
> df=read(fromhdfs)
> df2=spark.sql(using df 1)
> ..df10=spark.sql(using df9)
> spark.sql(CACHE TABLE df10)
> df11 =spark.sql(using df10)
> df11.write
> Df12 =spark.sql(using df10)
> df12.write
> df13 =spark.sql(using df10)
> df13.write
> **caching: yes one df that I will use to eventually write 3 x to a db
> (those 3 are different)
> **Loops: since I need to process several years, and processing 1 day is
> already a complex process (40 minutes on 9 node cluster running quite a bit
> of executors). So in the end it will do all at one go and there is a limit
> of how much data I can process in one go with the available resources.
> Some people here pointed out they believe this looping should not be
> necessary. But what is the alternative?
> —> Maybe I can write to disk somewhere in the middle, and read again from
> there so that in the end not all must happen in one go in memory.
>
>
>
>
>
>
>
> On 5 Apr 2022, at 14:58, Gourav Sengupta <gourav.sengu...@gmail.com>
> wrote:
>
> Hi,
>
> can you please give details around:
> spark version, what is the operation that you are running, why in loops,
> and whether you are caching in any data or not, and whether you are
> referencing the variables to create them like in the following expression
> we are referencing x to create x, x = x + 1
>
> Thanks and Regards,
> Gourav Sengupta
>
> On Mon, Apr 4, 2022 at 10:51 AM Joris Billen <
> joris.bil...@bigindustries.be> wrote:
>
>> Clear-probably not a good idea.
>>
>> But a previous comment said “you are doing everything in the end in one
>> go”.
>> So this made me wonder: in case your only action is a write in the end
>> after lots of complex transformations, then what is the alternative for
>> writing in the end which means doing everything all at once in the end? My
>> understanding is that if there is no need for an action earlier, you will
>> do all at the end, which means there is a limitation to how many days you
>> can process at once. And hence the solution is to loop over a couple days,
>> and submit always the same spark job just for other input.
>>
>>
>> Thanks!
>>
>> On 1 Apr 2022, at 15:26, Sean Owen <sro...@gmail.com> wrote:
>>
>> This feels like premature optimization, and not clear it's optimizing,
>> but maybe.
>> Caching things that are used once is worse than not caching. It looks
>> like a straight-line through to the write, so I doubt caching helps
>> anything here.
>>
>> On Fri, Apr 1, 2022 at 2:49 AM Joris Billen <
>> joris.bil...@bigindustries.be> wrote:
>>
>>> Hi,
>>> as said thanks for little discussion over mail.
>>> I understand that the action is triggered in the end at the write and
>>> then all of a sudden everything is executed at once. But I dont really need
>>> to trigger an action before. I am caching somewherew a df that will be
>>> reused several times (slightly updated pseudocode below).
>>>
>>> Question: is it then better practice to already trigger some actions on
>>>  intermediate data frame (like df4 and df8), and cache them? So that these
>>> actions will not be that expensive yet, and the actions to write at the end
>>> will require less resources, which would allow to process more days in one
>>> go? LIke what is added in red in improvement section in the pseudo code
>>> below?
>>>
>>>
>>>
>>> *pseudocode:*
>>>
>>>
>>> *loop over all days:*
>>> *    spark submit 1 day*
>>>
>>>
>>>
>>> with spark submit (overly simplified)=
>>>
>>>
>>> *  df=spark.read(hfs://somepath)*
>>> *  …*
>>> *   ##IMPROVEMENT START*
>>> *   df4=spark.sql(some stuff with df3)*
>>> *   spark.sql(CACHE TABLE df4)*
>>> *   …*
>>> *   df8=spark.sql(some stuff with df7)*
>>> *   spark.sql(CACHE TABLE df8)*
>>> *  ##IMPROVEMENT END*
>>> *   ...*
>>> *   df12=df11.spark.sql(complex stufff)*
>>> *  spark.sql(CACHE TABLE df10)*
>>> *   ...*
>>> *  df13=spark.sql( complex stuff with df12)*
>>> *  df13.write *
>>> *  df14=spark.sql( some other complex stuff with df12)*
>>> *  df14.write *
>>> *  df15=spark.sql( some completely other complex stuff with df12)*
>>> *  df15.write *
>>>
>>>
>>>
>>>
>>>
>>>
>>> THanks!
>>>
>>>
>>>
>>> On 31 Mar 2022, at 14:37, Sean Owen <sro...@gmail.com> wrote:
>>>
>>> If that is your loop unrolled, then you are not doing parts of work at a
>>> time. That will execute all operations in one go when the write finally
>>> happens. That's OK, but may be part of the problem. For example if you are
>>> filtering for a subset, processing, and unioning, then that is just a
>>> harder and slower way of applying the transformation to all data at once.
>>>
>>> On Thu, Mar 31, 2022 at 3:30 AM Joris Billen <
>>> joris.bil...@bigindustries.be> wrote:
>>>
>>>> Thanks for reply :-)
>>>>
>>>> I am using pyspark. Basicially my code (simplified is):
>>>>
>>>> df=spark.read.csv(hdfs://somehdfslocation)
>>>> df1=spark.sql (complex statement using df)
>>>> ...
>>>> dfx=spark.sql(complex statement using df x-1)
>>>> ...
>>>> dfx15.write()
>>>>
>>>>
>>>> What exactly is meant by "closing resources"? Is it just unpersisting
>>>> cached dataframes at the end and stopping the spark context explicitly:
>>>> sc.stop()?
>>>>
>>>>
>>>> FOr processing many years at once versus a chunk in a loop: I see that
>>>> if I go up to certain number of days, one iteration will start to have
>>>> tasks that fail. So I only take a limited number of days, and do this
>>>> process several times. Isnt this normal as you are always somehow limited
>>>> in terms of resources (I have 9 nodes wiht 32GB). Or is it like this that
>>>> in theory you could process any volume, in case you wait long enough? I
>>>> guess spark can only break down the tasks up to a certain level (based on
>>>> the datasets' and the intermediate results’ partitions) and at some moment
>>>> you hit the limit where your resources are not sufficient anymore to
>>>> process such one task? Maybe you can tweak it a bit, but in the end you’ll
>>>> hit a limit?
>>>>
>>>>
>>>>
>>>> Concretely  following topics would be interesting to find out more
>>>> about (links):
>>>> -where to see what you are still consuming after spark job ended if you
>>>> didnt close resources
>>>> -memory leaks for pyspark
>>>> -good article about closing resources (you find tons of snippets on how
>>>> to start spark context+ config for number/cores/memory of worker/executors
>>>> etc, but never saw a focus on making sure you clean up —> or is it just
>>>> stopping the spark context)
>>>>
>>>>
>>>>
>>>>
>>>> On 30 Mar 2022, at 21:24, Bjørn Jørgensen <bjornjorgen...@gmail.com>
>>>> wrote:
>>>>
>>>> It`s quite impossible for anyone to answer your question about what is
>>>> eating your memory, without even knowing what language you are using.
>>>>
>>>> If you are using C then it`s always pointers, that's the mem issue.
>>>> If you are using python, there can be some like not using context
>>>> manager like With Context Managers and Python's with Statement
>>>> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Frealpython.com%2Fpython-with-statement%2F&data=04%7C01%7Cjoris.billen%40bigindustries.be%7C1052bbbb08ce41659e5d08da1703fc52%7C49c3d703357947bfa8887c913fbdced9%7C0%7C0%7C637847603163284467%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=xs5S5MLsdtZAod%2FotAiT7Ta7ENXcA4N4lEaDKjxHzYg%3D&reserved=0>
>>>>
>>>> And another can be not to close resources after use.
>>>>
>>>> In my experience you can process 3 years or more of data, IF you are
>>>> closing opened resources.
>>>> I use the web GUI http://spark:4040 to follow what spark is doing.
>>>>
>>>>
>>>>
>>>>
>>>> ons. 30. mar. 2022 kl. 17:41 skrev Joris Billen <
>>>> joris.bil...@bigindustries.be>:
>>>>
>>>>> Thanks for answer-much appreciated! This forum is very useful :-)
>>>>>
>>>>> I didnt know the sparkcontext stays alive. I guess this is eating up
>>>>> memory.  The eviction means that he knows that he should clear some of the
>>>>> old cached memory to be able to store new one. In case anyone has good
>>>>> articles about memory leaks I would be interested to read.
>>>>> I will try to add following lines at the end of my job (as I cached
>>>>> the table in spark sql):
>>>>>
>>>>>
>>>>> *sqlContext.sql("UNCACHE TABLE mytableofinterest ")*
>>>>> *spark.stop()*
>>>>>
>>>>>
>>>>> Wrt looping: if I want to process 3 years of data, my modest cluster
>>>>> will never do it one go , I would expect? I have to break it down in
>>>>> smaller pieces and run that in a loop (1 day is already lots of data).
>>>>>
>>>>>
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 30 Mar 2022, at 17:25, Sean Owen <sro...@gmail.com> wrote:
>>>>>
>>>>> The Spark context does not stop when a job does. It stops when you
>>>>> stop it. There could be many ways mem can leak. Caching maybe - but it 
>>>>> will
>>>>> evict. You should be clearing caches when no longer needed.
>>>>>
>>>>> I would guess it is something else your program holds on to in its
>>>>> logic.
>>>>>
>>>>> Also consider not looping; there is probably a faster way to do it in
>>>>> one go.
>>>>>
>>>>> On Wed, Mar 30, 2022, 10:16 AM Joris Billen <
>>>>> joris.bil...@bigindustries.be> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I have a pyspark job submitted through spark-submit that does some
>>>>>> heavy processing for 1 day of data. It runs with no errors. I have to 
>>>>>> loop
>>>>>> over many days, so I run this spark job in a loop. I notice after couple
>>>>>> executions the memory is increasing on all worker nodes and eventually 
>>>>>> this
>>>>>> leads to faillures. My job does some caching, but I understand that when
>>>>>> the job ends successfully, then the sparkcontext is destroyed and the 
>>>>>> cache
>>>>>> should be cleared. However it seems that something keeps on filling the
>>>>>> memory a bit more and more after each run. THis is the memory behaviour
>>>>>> over time, which in the end will start leading to failures :
>>>>>>
>>>>>> (what we see is: green=physical memory used, green-blue=physical
>>>>>> memory cached, grey=memory capacity =straight line around 31GB )
>>>>>> This runs on a healthy spark 2.4 and was optimized already to come to
>>>>>> a stable job in terms of spark-submit resources parameters like
>>>>>> driver-memory/num-executors/executor-memory/executor-cores/spark.locality.wait).
>>>>>> Any clue how to “really” clear the memory in between jobs? So
>>>>>> basically currently I can loop 10x and then need to restart my cluster so
>>>>>> all memory is cleared completely.
>>>>>>
>>>>>>
>>>>>> Thanks for any info!
>>>>>>
>>>>>> <Screenshot 2022-03-30 at 15.28.24.png>
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Bjørn Jørgensen
>>>> Vestre Aspehaug 4, 6010 Ålesund
>>>> Norge
>>>>
>>>> +47 480 94 297
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to