Interesting unitest not pytest :)

What is data in [11] reused compared to 5  -- list()?

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 9 Apr 2021 at 16:44, Attila Zsolt Piros <
[email protected]> wrote:

> Hi!
>
> I looked into the code and find a way to improve it.
>
> With the improvement your test runs just fine:
>
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /__ / .__/\_,_/_/ /_/\_\   version 3.2.0-SNAPSHOT
>       /_/
>
> Using Python version 3.8.1 (default, Dec 30 2020 22:53:18)
> Spark context Web UI available at http://192.168.0.199:4040
> Spark context available as 'sc' (master = local, app id =
> local-1617982367872).
> SparkSession available as 'spark'.
>
> In [1]:     import pyspark
>
> In [2]:
> conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
>
> In [3]:     sc=pyspark.SparkContext.getOrCreate(conf)
>
> In [4]:     rows=70000
>
> In [5]:     data=list(range(rows))
>
> In [6]:     rdd=sc.parallelize(data,rows)
>
> In [7]:     assert rdd.getNumPartitions()==rows
>
> In [8]:     rdd0=rdd.filter(lambda x:False)
>
> In [9]:     assert rdd0.getNumPartitions()==rows
>
> In [10]:     rdd00=rdd0.coalesce(1)
>
> In [11]:     data=rdd00.collect()
> 21/04/09 17:32:54 WARN TaskSetManager: Stage 0 contains a task of very
> large siz
> e (4729 KiB). The maximum recommended task size is 1000 KiB.
>
> In [12]:     assert data==[]
>
> In [13]:
>
>
> I will create a jira and need to add some unittest before opening the PR.
>
> Best Regards,
> Attila
>
> On Fri, Apr 9, 2021 at 7:04 AM Weiand, Markus, NMA-CFD <
> [email protected]> wrote:
>
>> I’ve changed the code to set driver memory to 100g, changed python code:
>>
>>     import pyspark
>>
>>
>> conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1").set(key="spark.driver.memory",
>> value="100g")
>>
>>     sc=pyspark.SparkContext.getOrCreate(conf)
>>
>>     rows=70000
>>
>>     data=list(range(rows))
>>
>>     rdd=sc.parallelize(data,rows)
>>
>>     assert rdd.getNumPartitions()==rows
>>
>>     rdd0=rdd.filter(lambda x:False)
>>
>>     assert rdd0.getNumPartitions()==rows
>>
>>     rdd00=rdd0.coalesce(1)
>>
>>     data=rdd00.collect()
>>
>> assert data==[]
>>
>>
>>
>> Still the same error happens:
>>
>>
>>
>> 21/04/09 04:48:38 WARN TaskSetManager: Stage 0 contains a task of very
>> large size (4732 KiB). The maximum recommended task size is 1000 KiB.
>>
>> OpenJDK 64-Bit Server VM warning: INFO:
>> os::commit_memory(0x00007f4643550000, 16384, 0) failed; error='Not enough
>> space' (errno=12)
>>
>> [423.701s][warning][os,thread] Attempt to protect stack guard pages
>> failed (0x00007f4640d28000-0x00007f4640d2c000).
>>
>> [423.701s][warning][os,thread] Attempt to deallocate stack guard pages
>> failed.
>>
>> [423.704s][warning][os,thread] Failed to start thread - pthread_create
>> failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
>>
>> #
>>
>> # There is insufficient memory for the Java Runtime Environment to
>> continue.
>>
>> # Native memory allocation (mmap) failed to map 16384 bytes for
>> committing reserved memory.
>>
>>
>>
>> A function which needs 423 seconds to crash with excessive memory
>> consumption when trying to coalesce 70000 empty partitions is not very
>> practical. As I do not know the limits in which coalesce without shuffling
>> can be used safely and with performance, I will now always use coalesce
>> with shuffling, even though in theory this will come with quite a
>> performance decrease.
>>
>>
>>
>> Markus
>>
>>
>>
>> *Von:* Russell Spitzer <[email protected]>
>> *Gesendet:* Donnerstag, 8. April 2021 15:24
>> *An:* Weiand, Markus, NMA-CFD <[email protected]>
>> *Cc:* [email protected]
>> *Betreff:* Re: possible bug
>>
>>
>>
>> Could be that the driver JVM cannot handle the metadata required to store
>> the partition information of a 70k partition RDD. I see you say you have a
>> 100GB driver but i'm not sure where you configured that?
>>
>> Did you set --driver-memory 100G ?
>>
>>
>>
>> On Thu, Apr 8, 2021 at 8:08 AM Weiand, Markus, NMA-CFD <
>> [email protected]> wrote:
>>
>> This is the reduction of an error in a complex program where allocated
>> 100 GB driver (=worker=executor as local mode) memory. In the example I
>> used the default size, as the puny example shouldn’t need more anyway.
>>
>> And without the coalesce or with coalesce(1,True) everything works fine.
>>
>> I’m trying to coalesce an empty rdd with 70000 partitions in an empty rdd
>> with 1 partition, why is this a problem without shuffling?
>>
>>
>>
>> *Von:* Sean Owen <[email protected]>
>> *Gesendet:* Donnerstag, 8. April 2021 15:00
>> *An:* Weiand, Markus, NMA-CFD <[email protected]>
>> *Cc:* [email protected]
>> *Betreff:* Re: possible bug
>>
>>
>>
>> That's a very low level error from the JVM. Any chance you are
>> misconfiguring the executor size? like to 10MB instead of 10GB, that kind
>> of thing. Trying to think of why the JVM would have very little memory to
>> operate.
>>
>> An app running out of mem would not look like this.
>>
>>
>>
>> On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD <
>> [email protected]> wrote:
>>
>> Hi all,
>>
>>
>>
>> I'm using spark on a c5a.16xlarge machine in amazon cloud (so having  64
>> cores and 128 GB RAM). I'm using spark 3.01.
>>
>>
>>
>> The following python code leads to an exception, is this a bug or is my
>> understanding of the API incorrect?
>>
>>
>>
>>     import pyspark
>>
>>     conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
>>
>>     sc=pyspark.SparkContext.getOrCreate(conf)
>>
>>     rows=70000
>>
>>     data=list(range(rows))
>>
>>     rdd=sc.parallelize(data,rows)
>>
>>     assert rdd.getNumPartitions()==rows
>>
>>     rdd0=rdd.filter(lambda x:False)
>>
>>     assert rdd0.getNumPartitions()==rows
>>
>>     rdd00=rdd0.coalesce(1)
>>
>>     data=rdd00.collect()
>>
>>     assert data==[]
>>
>>
>>
>> output when starting from PyCharm:
>>
>>
>>
>> /home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python
>> /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py
>> --mode=client --port=41185
>>
>> import sys; print('Python %s on %s' % (sys.version, sys.platform))
>>
>> sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>'])
>>
>> PyDev console: starting.
>>
>> Python 3.8.5 (default, Jan 27 2021, 15:41:15)
>>
>> [GCC 9.3.0] on linux
>>
>> import os
>>
>> os.environ['PYTHONHASHSEED'] = '0'
>>
>> runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py',
>> wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests')
>>
>> WARNING: An illegal reflective access operation has occurred
>>
>> WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform
>> (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor
>> java.nio.DirectByteBuffer(long,int)
>>
>> WARNING: Please consider reporting this to the maintainers of
>> org.apache.spark.unsafe.Platform
>>
>> WARNING: Use --illegal-access=warn to enable warnings of further illegal
>> reflective access operations
>>
>> WARNING: All illegal access operations will be denied in a future release
>>
>> 21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>>
>> 21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very
>> large size (4732 KiB). The maximum recommended task size is 1000 KiB.
>>
>> [Stage 0:>                                                          (0 +
>> 1) / 1][423.190s][warning][os,thread] Attempt to protect stack guard pages
>> failed (0x00007f43d23ff000-0x00007f43d2403000).
>>
>> [423.190s][warning][os,thread] Attempt to deallocate stack guard pages
>> failed.
>>
>> OpenJDK 64-Bit Server VM warning: INFO:
>> os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not enough
>> space' (errno=12)
>>
>> [423.231s][warning][os,thread] Failed to start thread - pthread_create
>> failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
>>
>> #
>>
>> # There is insufficient memory for the Java Runtime Environment to
>> continue.
>>
>> # Native memory allocation (mmap) failed to map 16384 bytes for
>> committing reserved memory.
>>
>> # An error report file with more information is saved as:
>>
>> # /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log
>>
>> [thread 17966 also had an error]
>>
>> OpenJDK 64-Bit Server VM warning: INFO:
>> os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not enough
>> space' (errno=12)
>>
>> ERROR:root:Exception while sending command.
>>
>> Traceback (most recent call last):
>>
>>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
>> line 1207, in send_command
>>
>>     raise Py4JNetworkError("Answer from Java side is empty")
>>
>> py4j.protocol.Py4JNetworkError: Answer from Java side is empty
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>
>>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
>> line 1033, in send_command
>>
>>     response = connection.send_command(command)
>>
>>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
>> line 1211, in send_command
>>
>>     raise Py4JNetworkError(
>>
>> py4j.protocol.Py4JNetworkError: Error while receiving
>>
>> ERROR:py4j.java_gateway:An error occurred while trying to connect to the
>> Java server (127.0.0.1:42439
>> <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2F127.0.0.1%3A42439%2F&data=04%7C01%7C%7C96a91861f12c46dc454b08d8fa91a005%7C1ca8bd943c974fc68955bad266b43f0b%7C0%7C0%7C637534850695450752%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=dKfkds67mOAtrXSaWrv15vfMx7yUn%2Fjnwc58RdLhGt8%3D&reserved=0>
>> )
>>
>> Traceback (most recent call last):
>>
>>   File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
>>
>>     sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
>>
>>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
>> line 1304, in __call__
>>
>>     return_value = get_return_value(
>>
>>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line
>> 334, in get_return_value
>>
>>     raise Py4JError(
>>
>> py4j.protocol.Py4JError: An error occurred while calling
>> z:org.apache.spark.api.python.PythonRDD.collectAndServe
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>
>>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
>> line 977, in _get_connection
>>
>>     connection = self.deque.pop()
>>
>> IndexError: pop from an empty deque
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>
>>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
>> line 1115, in start
>>
>>     self.socket.connect((self.address, self.port))
>>
>> ConnectionRefusedError: [Errno 111] Connection refused
>>
>> Traceback (most recent call last):
>>
>>   File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
>>
>>     sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
>>
>>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
>> line 1304, in __call__
>>
>>     return_value = get_return_value(
>>
>>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line
>> 334, in get_return_value
>>
>>     raise Py4JError(
>>
>> py4j.protocol.Py4JError: An error occurred while calling
>> z:org.apache.spark.api.python.PythonRDD.collectAndServe
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>
>>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
>> line 977, in _get_connection
>>
>>     connection = self.deque.pop()
>>
>> IndexError: pop from an empty deque
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>
>>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
>> line 1115, in start
>>
>>     self.socket.connect((self.address, self.port))
>>
>> ConnectionRefusedError: [Errno 111] Connection refused
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>
>>   File "<input>", line 3, in <module>
>>
>>   File
>> "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py",
>> line 197, in runfile
>>
>>     pydev_imports.execfile(filename, global_vars, local_vars)  # execute
>> the script
>>
>>   File
>> "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py",
>> line 18, in execfile
>>
>>     exec(compile(contents+"\n", file, 'exec'), glob, loc)
>>
>>   File
>> "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py",
>> line 992, in <module>
>>
>>     test_70000()
>>
>>   File
>> "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py",
>> line 974, in test_70000
>>
>>     data=rdd00.collect()
>>
>>   File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
>>
>>     sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
>>
>>   File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in
>> __exit__
>>
>>     self._context._jsc.setCallSite(None)
>>
>>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
>> line 1303, in __call__
>>
>>     answer = self.gateway_client.send_command(command)
>>
>>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
>> line 1031, in send_command
>>
>>     connection = self._get_connection()
>>
>>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
>> line 979, in _get_connection
>>
>>     connection = self._create_connection()
>>
>>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
>> line 985, in _create_connection
>>
>>     connection.start()
>>
>>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
>> line 1127, in start
>>
>>     raise Py4JNetworkError(msg, e)
>>
>> py4j.protocol.Py4JNetworkError: An error occurred while trying to connect
>> to the Java server (127.0.0.1:42439
>> <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2F127.0.0.1%3A42439%2F&data=04%7C01%7C%7C96a91861f12c46dc454b08d8fa91a005%7C1ca8bd943c974fc68955bad266b43f0b%7C0%7C0%7C637534850695460748%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=nluVRqUTQeAfMp7ZuF5AdO6IELaRcr86xGkaW6RWOF0%3D&reserved=0>
>> )
>>
>>
>>
>>
>>
>> report of free -m:
>>
>>               total        used        free      shared  buff/cache
>> available
>>
>> Mem:         127462        5548       22680          92       99233
>> 120605
>>
>> Swap:             0           0           0
>>
>>
>>
>> Thanks
>>
>> Markus
>>
>>

Reply via email to