"But it will be faster to use S3 (or GCS) through some network and it will
be faster than writing to the local SSD. I don't understand the point
here."
Minio is a S3 mock, so you run minio local.

tor. 7. apr. 2022 kl. 09:27 skrev Mich Talebzadeh <mich.talebza...@gmail.com
>:

> Ok so that is your assumption. The whole thing is based on-premise on JBOD
> (including hadoop cluster which has Spark binaries on each node as I
> understand) as I understand. But it will be faster to use S3 (or GCS)
> through some network and it will be faster than writing to the local SSD. I
> don't understand the point here.
>
> Also it appears the thread owner is talking about having HBase on Hadoop
> cluster on some node eating memory.  This can be easily sorted by moving
> HBase to its own cluster, which will ease up Hadoop, Spark and HBase
> competing for resources. It is possible that the issue is with HBase setup
> as well.
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 7 Apr 2022 at 08:11, Bjørn Jørgensen <bjornjorgen...@gmail.com>
> wrote:
>
>>
>>    1. Where does S3 come into this
>>
>> He is processing data for each day at a time. So to dump each day to a
>> fast storage he can use parquet files and write it to S3.
>>
>> ons. 6. apr. 2022 kl. 22:27 skrev Mich Talebzadeh <
>> mich.talebza...@gmail.com>:
>>
>>>
>>> Your statement below:
>>>
>>>
>>> I believe I have found the issue: the job writes data to hbase which is
>>> on the same cluster.
>>> When I keep on processing data and writing with spark to hbase ,
>>> eventually the garbage collection can not keep up anymore for hbase, and
>>> the hbase memory consumption increases. As the clusters hosts both hbase
>>> and spark, this leads to an overall increase and at some point you hit the
>>> limit of the available memory on each worker.
>>> I dont think the spark memory is increasing over time.
>>>
>>>
>>>    1. Where is your cluster on Prem? Do you Have a Hadoop cluster
>>>    with spark using the same nodes as HDFS?
>>>    2. Is your Hbase clustered or standalone and has been created on
>>>    HDFS nodes
>>>    3. Are you writing to Hbase through phoenix or straight to HBase
>>>    4. Where does S3 come into this
>>>
>>>
>>> HTH
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 6 Apr 2022 at 16:41, Joris Billen <joris.bil...@bigindustries.be>
>>> wrote:
>>>
>>>> HI,
>>>> thanks for your reply.
>>>>
>>>>
>>>> I believe I have found the issue: the job writes data to hbase which is
>>>> on the same cluster.
>>>> When I keep on processing data and writing with spark to hbase ,
>>>> eventually the garbage collection can not keep up anymore for hbase, and
>>>> the hbase memory consumption increases. As the clusters hosts both hbase
>>>> and spark, this leads to an overall increase and at some point you hit the
>>>> limit of the available memory on each worker.
>>>> I dont think the spark memory is increasing over time.
>>>>
>>>>
>>>>
>>>> Here more details:
>>>>
>>>> **Spark: 2.4
>>>> **operation: many spark sql statements followed by writing data to a
>>>> nosql db from spark
>>>> like this:
>>>> df=read(fromhdfs)
>>>> df2=spark.sql(using df 1)
>>>> ..df10=spark.sql(using df9)
>>>> spark.sql(CACHE TABLE df10)
>>>> df11 =spark.sql(using df10)
>>>> df11.write
>>>> Df12 =spark.sql(using df10)
>>>> df12.write
>>>> df13 =spark.sql(using df10)
>>>> df13.write
>>>> **caching: yes one df that I will use to eventually write 3 x to a db
>>>> (those 3 are different)
>>>> **Loops: since I need to process several years, and processing 1 day is
>>>> already a complex process (40 minutes on 9 node cluster running quite a bit
>>>> of executors). So in the end it will do all at one go and there is a limit
>>>> of how much data I can process in one go with the available resources.
>>>> Some people here pointed out they believe this looping should not be
>>>> necessary. But what is the alternative?
>>>> —> Maybe I can write to disk somewhere in the middle, and read again
>>>> from there so that in the end not all must happen in one go in memory.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On 5 Apr 2022, at 14:58, Gourav Sengupta <gourav.sengu...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> can you please give details around:
>>>> spark version, what is the operation that you are running, why in
>>>> loops, and whether you are caching in any data or not, and whether you are
>>>> referencing the variables to create them like in the following expression
>>>> we are referencing x to create x, x = x + 1
>>>>
>>>> Thanks and Regards,
>>>> Gourav Sengupta
>>>>
>>>> On Mon, Apr 4, 2022 at 10:51 AM Joris Billen <
>>>> joris.bil...@bigindustries.be> wrote:
>>>>
>>>>> Clear-probably not a good idea.
>>>>>
>>>>> But a previous comment said “you are doing everything in the end in
>>>>> one go”.
>>>>> So this made me wonder: in case your only action is a write in the end
>>>>> after lots of complex transformations, then what is the alternative for
>>>>> writing in the end which means doing everything all at once in the end? My
>>>>> understanding is that if there is no need for an action earlier, you will
>>>>> do all at the end, which means there is a limitation to how many days you
>>>>> can process at once. And hence the solution is to loop over a couple days,
>>>>> and submit always the same spark job just for other input.
>>>>>
>>>>>
>>>>> Thanks!
>>>>>
>>>>> On 1 Apr 2022, at 15:26, Sean Owen <sro...@gmail.com> wrote:
>>>>>
>>>>> This feels like premature optimization, and not clear it's optimizing,
>>>>> but maybe.
>>>>> Caching things that are used once is worse than not caching. It looks
>>>>> like a straight-line through to the write, so I doubt caching helps
>>>>> anything here.
>>>>>
>>>>> On Fri, Apr 1, 2022 at 2:49 AM Joris Billen <
>>>>> joris.bil...@bigindustries.be> wrote:
>>>>>
>>>>>> Hi,
>>>>>> as said thanks for little discussion over mail.
>>>>>> I understand that the action is triggered in the end at the write and
>>>>>> then all of a sudden everything is executed at once. But I dont really 
>>>>>> need
>>>>>> to trigger an action before. I am caching somewherew a df that will be
>>>>>> reused several times (slightly updated pseudocode below).
>>>>>>
>>>>>> Question: is it then better practice to already trigger some actions
>>>>>> on  intermediate data frame (like df4 and df8), and cache them? So that
>>>>>> these actions will not be that expensive yet, and the actions to write at
>>>>>> the end will require less resources, which would allow to process more 
>>>>>> days
>>>>>> in one go? LIke what is added in red in improvement section in the
>>>>>> pseudo code below?
>>>>>>
>>>>>>
>>>>>>
>>>>>> *pseudocode:*
>>>>>>
>>>>>>
>>>>>> *loop over all days:*
>>>>>> *    spark submit 1 day*
>>>>>>
>>>>>>
>>>>>>
>>>>>> with spark submit (overly simplified)=
>>>>>>
>>>>>>
>>>>>> *  df=spark.read(hfs://somepath)*
>>>>>> *  …*
>>>>>> *   ##IMPROVEMENT START*
>>>>>> *   df4=spark.sql(some stuff with df3)*
>>>>>> *   spark.sql(CACHE TABLE df4)*
>>>>>> *   …*
>>>>>> *   df8=spark.sql(some stuff with df7)*
>>>>>> *   spark.sql(CACHE TABLE df8)*
>>>>>> *  ##IMPROVEMENT END*
>>>>>> *   ...*
>>>>>> *   df12=df11.spark.sql(complex stufff)*
>>>>>> *  spark.sql(CACHE TABLE df10)*
>>>>>> *   ...*
>>>>>> *  df13=spark.sql( complex stuff with df12)*
>>>>>> *  df13.write *
>>>>>> *  df14=spark.sql( some other complex stuff with df12)*
>>>>>> *  df14.write *
>>>>>> *  df15=spark.sql( some completely other complex stuff with df12)*
>>>>>> *  df15.write *
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> THanks!
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 31 Mar 2022, at 14:37, Sean Owen <sro...@gmail.com> wrote:
>>>>>>
>>>>>> If that is your loop unrolled, then you are not doing parts of work
>>>>>> at a time. That will execute all operations in one go when the write
>>>>>> finally happens. That's OK, but may be part of the problem. For example 
>>>>>> if
>>>>>> you are filtering for a subset, processing, and unioning, then that is 
>>>>>> just
>>>>>> a harder and slower way of applying the transformation to all data at 
>>>>>> once.
>>>>>>
>>>>>> On Thu, Mar 31, 2022 at 3:30 AM Joris Billen <
>>>>>> joris.bil...@bigindustries.be> wrote:
>>>>>>
>>>>>>> Thanks for reply :-)
>>>>>>>
>>>>>>> I am using pyspark. Basicially my code (simplified is):
>>>>>>>
>>>>>>> df=spark.read.csv(hdfs://somehdfslocation)
>>>>>>> df1=spark.sql (complex statement using df)
>>>>>>> ...
>>>>>>> dfx=spark.sql(complex statement using df x-1)
>>>>>>> ...
>>>>>>> dfx15.write()
>>>>>>>
>>>>>>>
>>>>>>> What exactly is meant by "closing resources"? Is it just
>>>>>>> unpersisting cached dataframes at the end and stopping the spark context
>>>>>>> explicitly: sc.stop()?
>>>>>>>
>>>>>>>
>>>>>>> FOr processing many years at once versus a chunk in a loop: I see
>>>>>>> that if I go up to certain number of days, one iteration will start to 
>>>>>>> have
>>>>>>> tasks that fail. So I only take a limited number of days, and do this
>>>>>>> process several times. Isnt this normal as you are always somehow 
>>>>>>> limited
>>>>>>> in terms of resources (I have 9 nodes wiht 32GB). Or is it like this 
>>>>>>> that
>>>>>>> in theory you could process any volume, in case you wait long enough? I
>>>>>>> guess spark can only break down the tasks up to a certain level (based 
>>>>>>> on
>>>>>>> the datasets' and the intermediate results’ partitions) and at some 
>>>>>>> moment
>>>>>>> you hit the limit where your resources are not sufficient anymore to
>>>>>>> process such one task? Maybe you can tweak it a bit, but in the end 
>>>>>>> you’ll
>>>>>>> hit a limit?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Concretely  following topics would be interesting to find out more
>>>>>>> about (links):
>>>>>>> -where to see what you are still consuming after spark job ended if
>>>>>>> you didnt close resources
>>>>>>> -memory leaks for pyspark
>>>>>>> -good article about closing resources (you find tons of snippets on
>>>>>>> how to start spark context+ config for number/cores/memory of
>>>>>>> worker/executors etc, but never saw a focus on making sure you clean up 
>>>>>>> —>
>>>>>>> or is it just stopping the spark context)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 30 Mar 2022, at 21:24, Bjørn Jørgensen <bjornjorgen...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> It`s quite impossible for anyone to answer your question about what
>>>>>>> is eating your memory, without even knowing what language you are using.
>>>>>>>
>>>>>>> If you are using C then it`s always pointers, that's the mem issue.
>>>>>>> If you are using python, there can be some like not using context
>>>>>>> manager like With Context Managers and Python's with Statement
>>>>>>> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Frealpython.com%2Fpython-with-statement%2F&data=04%7C01%7Cjoris.billen%40bigindustries.be%7C1052bbbb08ce41659e5d08da1703fc52%7C49c3d703357947bfa8887c913fbdced9%7C0%7C0%7C637847603163284467%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=xs5S5MLsdtZAod%2FotAiT7Ta7ENXcA4N4lEaDKjxHzYg%3D&reserved=0>
>>>>>>>
>>>>>>> And another can be not to close resources after use.
>>>>>>>
>>>>>>> In my experience you can process 3 years or more of data, IF you are
>>>>>>> closing opened resources.
>>>>>>> I use the web GUI http://spark:4040 to follow what spark is doing.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ons. 30. mar. 2022 kl. 17:41 skrev Joris Billen <
>>>>>>> joris.bil...@bigindustries.be>:
>>>>>>>
>>>>>>>> Thanks for answer-much appreciated! This forum is very useful :-)
>>>>>>>>
>>>>>>>> I didnt know the sparkcontext stays alive. I guess this is eating
>>>>>>>> up memory.  The eviction means that he knows that he should clear some 
>>>>>>>> of
>>>>>>>> the old cached memory to be able to store new one. In case anyone has 
>>>>>>>> good
>>>>>>>> articles about memory leaks I would be interested to read.
>>>>>>>> I will try to add following lines at the end of my job (as I cached
>>>>>>>> the table in spark sql):
>>>>>>>>
>>>>>>>>
>>>>>>>> *sqlContext.sql("UNCACHE TABLE mytableofinterest ")*
>>>>>>>> *spark.stop()*
>>>>>>>>
>>>>>>>>
>>>>>>>> Wrt looping: if I want to process 3 years of data, my modest
>>>>>>>> cluster will never do it one go , I would expect? I have to break it 
>>>>>>>> down
>>>>>>>> in smaller pieces and run that in a loop (1 day is already lots of 
>>>>>>>> data).
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 30 Mar 2022, at 17:25, Sean Owen <sro...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> The Spark context does not stop when a job does. It stops when you
>>>>>>>> stop it. There could be many ways mem can leak. Caching maybe - but it 
>>>>>>>> will
>>>>>>>> evict. You should be clearing caches when no longer needed.
>>>>>>>>
>>>>>>>> I would guess it is something else your program holds on to in its
>>>>>>>> logic.
>>>>>>>>
>>>>>>>> Also consider not looping; there is probably a faster way to do it
>>>>>>>> in one go.
>>>>>>>>
>>>>>>>> On Wed, Mar 30, 2022, 10:16 AM Joris Billen <
>>>>>>>> joris.bil...@bigindustries.be> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> I have a pyspark job submitted through spark-submit that does some
>>>>>>>>> heavy processing for 1 day of data. It runs with no errors. I have to 
>>>>>>>>> loop
>>>>>>>>> over many days, so I run this spark job in a loop. I notice after 
>>>>>>>>> couple
>>>>>>>>> executions the memory is increasing on all worker nodes and 
>>>>>>>>> eventually this
>>>>>>>>> leads to faillures. My job does some caching, but I understand that 
>>>>>>>>> when
>>>>>>>>> the job ends successfully, then the sparkcontext is destroyed and the 
>>>>>>>>> cache
>>>>>>>>> should be cleared. However it seems that something keeps on filling 
>>>>>>>>> the
>>>>>>>>> memory a bit more and more after each run. THis is the memory 
>>>>>>>>> behaviour
>>>>>>>>> over time, which in the end will start leading to failures :
>>>>>>>>>
>>>>>>>>> (what we see is: green=physical memory used, green-blue=physical
>>>>>>>>> memory cached, grey=memory capacity =straight line around 31GB )
>>>>>>>>> This runs on a healthy spark 2.4 and was optimized already to come
>>>>>>>>> to a stable job in terms of spark-submit resources parameters like
>>>>>>>>> driver-memory/num-executors/executor-memory/executor-cores/spark.locality.wait).
>>>>>>>>> Any clue how to “really” clear the memory in between jobs? So
>>>>>>>>> basically currently I can loop 10x and then need to restart my 
>>>>>>>>> cluster so
>>>>>>>>> all memory is cleared completely.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks for any info!
>>>>>>>>>
>>>>>>>>> <Screenshot 2022-03-30 at 15.28.24.png>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Bjørn Jørgensen
>>>>>>> Vestre Aspehaug 4, 6010 Ålesund
>>>>>>> Norge
>>>>>>>
>>>>>>> +47 480 94 297
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Reply via email to