I don't believe you'll be able to use globals in a Spark task, as they won't exist on the remote executor machines.
On Sun, Dec 13, 2020 at 3:46 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > thanks Marco. > > When I stripped down spark etc and ran your map, it came back OK (no > errors) WITHOUT global numRows > > However, with full code, this is the unresolved reference notification I > am getting as attached embedded your code WITHOUT global numRows > > regards, > > > Mich > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sat, 12 Dec 2020 at 21:48, Sofia’s World <mmistr...@gmail.com> wrote: > >> Hi Mich >> i dont think it's a good idea... I believe your IDE is playing tricks >> on you. >> Take spark out of the equation.... this is a python issue only. >> i am guessing your IDE is somehow messing up your environment. >> >> if you take out the whole spark code and replace it by this code >> >> map(lambda x: (x, uf.clustered(x,numRows), \ >> uf.scattered(x,numRows), \ >> uf.randomised(x, numRows), \ >> uf.randomString(50), \ >> uf.padString(x," ",50), \ >> uf.padSingleChar("x",4000)), [1,2,3,4,5]) >> >> you should get exactly the same error... >> >> Send me a zip with the tfconstants,py and a trimmed donw version of your >> main,py and i'll plug it in my IDE and see if i can reproduce >> It worked fine in Jupyter, but then i have all functins in same notebook >> hth >> marco >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> On Sat, Dec 12, 2020 at 9:02 PM Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> I solved the issue of variable numRows within the lambda function not >>> defined by defining it as a Global variable >>> >>> global numRows >>> numRows = 10 ## do in increment of 50K rows otherwise you blow up driver >>> memory! >>> # >>> >>> Then I could call it within the lambda function as follows >>> >>> >>> rdd = sc.parallelize(Range). \ >>> map(lambda x: (x, uf.clustered(x,numRows), \ >>> uf.scattered(x,numRows), \ >>> uf.randomised(x, numRows), \ >>> uf.randomString(50), \ >>> uf.padString(x," ",50), \ >>> uf.padSingleChar("x",4000))) >>> >>> This then worked. I am not convinced this is *the correct* solution but >>> somehow it worked. >>> >>> >>> Thanks >>> >>> >>> Mich >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Fri, 11 Dec 2020 at 18:52, Mich Talebzadeh <mich.talebza...@gmail.com> >>> wrote: >>> >>>> many thanks KR. >>>> >>>> If i call the clusterted function on its own it works >>>> >>>> numRows = 100000 >>>> >>>> print(uf.clustered(200,numRows)) >>>> >>>> and returns >>>> >>>> 0.00199 >>>> If I run all in one including the UsedFunctions claa in the same py >>>> file it works. The code is attached >>>> >>>> However, in PyCharm, I do the following >>>> >>>> UsedFunctions.py. Note that this file only contains functions and no >>>> class >>>> >>>> import logging >>>> import random >>>> import string >>>> import math >>>> >>>> def randomString(length): >>>> letters = string.ascii_letters >>>> result_str = ''.join(random.choice(letters) for i in range(length)) >>>> return result_str >>>> >>>> def clustered(x,numRows): >>>> return math.floor(x -1)/numRows >>>> >>>> def scattered(x,numRows): >>>> return abs((x -1 % numRows))* 1.0 >>>> >>>> def randomised(seed,numRows): >>>> random.seed(seed) >>>> return abs(random.randint(0, numRows) % numRows) * 1.0 >>>> >>>> def padString(x,chars,length): >>>> n = int(math.log10(x) + 1) >>>> result_str = ''.join(random.choice(chars) for i in >>>> range(length-n)) + str(x) >>>> return result_str >>>> >>>> def padSingleChar(chars,length): >>>> result_str = ''.join(chars for i in range(length)) >>>> return result_str >>>> >>>> def println(lst): >>>> for ll in lst: >>>> print(ll[0]) >>>> >>>> In the main.py(PyCharm) I have this code which is failing >>>> >>>> from pyspark import SparkContext, SparkConf >>>> >>>> from pyspark.sql import SQLContext >>>> >>>> from pyspark.sql import HiveContext >>>> >>>> from pyspark.sql import SparkSession >>>> >>>> from pyspark.sql import Row >>>> >>>> from pyspark.sql.types import StringType, ArrayType >>>> >>>> from pyspark.sql.functions import udf, col, max as max, to_date, >>>> date_add, \ >>>> >>>> add_months >>>> >>>> from datetime import datetime, timedelta >>>> >>>> import os >>>> >>>> from os.path import join, abspath >>>> >>>> from typing import Optional >>>> >>>> import logging >>>> >>>> import random >>>> >>>> import string >>>> >>>> import math >>>> >>>> import mathOperations as mo >>>> >>>> import UsedFunctions as uf >>>> >>>> ##import test_oracle as to >>>> >>>> >>>> class main: >>>> >>>> rec = {} >>>> >>>> settings = [ >>>> >>>> ("hive.exec.dynamic.partition", "true"), >>>> >>>> ("hive.exec.dynamic.partition.mode", "nonstrict"), >>>> >>>> ("spark.sql.orc.filterPushdown", "true"), >>>> >>>> ("hive.msck.path.validation", "ignore"), >>>> >>>> ("spark.sql.caseSensitive", "true"), >>>> >>>> ("spark.speculation", "false"), >>>> >>>> ("hive.metastore.authorization.storage.checks", >>>> "false"), >>>> >>>> ("hive.metastore.client.connect.retry.delay", "5s"), >>>> >>>> ("hive.metastore.client.socket.timeout", "1800s"), >>>> >>>> ("hive.metastore.connect.retries", "12"), >>>> >>>> ("hive.metastore.execute.setugi", "false"), >>>> >>>> ("hive.metastore.failure.retries", "12"), >>>> >>>> ("hive.metastore.schema.verification", "false"), >>>> >>>> ("hive.metastore.schema.verification.record.version", >>>> "false"), >>>> >>>> ("hive.metastore.server.max.threads", "100000"), >>>> >>>> ("hive.metastore.authorization.storage.checks", >>>> "/apps/hive/warehouse") >>>> >>>> ] >>>> >>>> configs = {"DB":"pycharm", >>>> >>>> "tableName":"randomDataPy"} >>>> >>>> DB = "pycharm" >>>> >>>> tableName = "randomDataPy" >>>> >>>> fullyQualifiedTableName = DB +"."+tableName >>>> >>>> spark = SparkSession.builder \ >>>> >>>> .appName("app1") \ >>>> >>>> .enableHiveSupport() \ >>>> >>>> .getOrCreate() >>>> >>>> >>>> spark.sparkContext._conf.setAll(settings) >>>> >>>> >>>> sc = SparkContext.getOrCreate() >>>> >>>> print(sc.getConf().getAll()) >>>> >>>> sqlContext = SQLContext(sc) >>>> >>>> HiveContext = HiveContext(sc) >>>> >>>> lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy >>>> HH:mm:ss.ss') ")).collect() >>>> >>>> print("\nStarted at");uf.println(lst) >>>> >>>> >>>> numRows = 100000 ## do in increment of 50K rows otherwise you blow >>>> up driver memory! >>>> >>>> # >>>> >>>> ## Check if table exist otherwise create it >>>> >>>> >>>> rows = 0 >>>> >>>> sqltext = "" >>>> >>>> if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == >>>> 1): >>>> >>>> rows = spark.sql(f"""SELECT COUNT(1) FROM >>>> {fullyQualifiedTableName}""").collect()[0][0] >>>> >>>> print ("number of rows is ",rows) >>>> >>>> else: >>>> >>>> print(f"\nTable {fullyQualifiedTableName} does not exist, creating >>>> table ") >>>> >>>> sqltext = """ >>>> >>>> CREATE TABLE {DB}.{tableName}( >>>> >>>> ID INT >>>> >>>> , CLUSTERED INT >>>> >>>> , SCATTERED INT >>>> >>>> , RANDOMISED INT >>>> >>>> , RANDOM_STRING VARCHAR(50) >>>> >>>> , SMALL_VC VARCHAR(50) >>>> >>>> , PADDING VARCHAR(4000) >>>> >>>> ) >>>> >>>> STORED AS PARQUET >>>> >>>> """ >>>> >>>> spark.sql(sqltext) >>>> >>>> >>>> start = 0 >>>> >>>> if (rows == 0): >>>> >>>> start = 1 >>>> >>>> else: >>>> >>>> maxID = spark.sql(f"SELECT MAX(id) FROM >>>> {fullyQualifiedTableName}").collect()[0][0] >>>> >>>> start = maxID + 1 >>>> >>>> end = start + numRows - 1 >>>> >>>> print ("starting at ID = ",start, ",ending on = ",end) >>>> >>>> Range = range(start, end+1) >>>> >>>> ## This traverses through the Range and increment "x" by one unit >>>> each time, and that x value is used in the code to generate random data >>>> through Python functions in a class >>>> >>>> print(numRows) >>>> >>>> print(uf.clustered(200,numRows)) >>>> >>>> rdd = sc.parallelize(Range). \ >>>> >>>> map(lambda x: (x, uf.clustered(x, numRows), \ >>>> >>>> uf.scattered(x,10000), \ >>>> >>>> uf.randomised(x,10000), \ >>>> >>>> uf.randomString(50), \ >>>> >>>> uf.padString(x," ",50), \ >>>> >>>> uf.padSingleChar("x",4000))) >>>> >>>> df = rdd.toDF(). \ >>>> >>>> withColumnRenamed("_1","ID"). \ >>>> >>>> withColumnRenamed("_2", "CLUSTERED"). \ >>>> >>>> withColumnRenamed("_3", "SCATTERED"). \ >>>> >>>> withColumnRenamed("_4", "RANDOMISED"). \ >>>> >>>> withColumnRenamed("_5", "RANDOM_STRING"). \ >>>> >>>> withColumnRenamed("_6", "SMALL_VC"). \ >>>> >>>> withColumnRenamed("_7", "PADDING") >>>> >>>> df.write.mode("overwrite").saveAsTable("pycharm.ABCD") >>>> >>>> df.printSchema() >>>> >>>> df.explain() >>>> >>>> df.createOrReplaceTempView("tmp") >>>> >>>> sqltext = f""" >>>> >>>> INSERT INTO TABLE {fullyQualifiedTableName} >>>> >>>> SELECT >>>> >>>> ID >>>> >>>> , CLUSTERED >>>> >>>> , SCATTERED >>>> >>>> , RANDOMISED >>>> >>>> , RANDOM_STRING >>>> >>>> , SMALL_VC >>>> >>>> , PADDING >>>> >>>> FROM tmp >>>> >>>> """ >>>> >>>> spark.sql(sqltext) >>>> >>>> spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM >>>> {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False) >>>> >>>> ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY >>>> id""").show(n=20,truncate=False,vertical=False) >>>> >>>> lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy >>>> HH:mm:ss.ss') ")).collect() >>>> >>>> print("\nFinished at");usedFunctions.println(lst) >>>> >>>> >>>> >>>> >>>> >>>> On Fri, 11 Dec 2020 at 18:04, Sofia’s World <mmistr...@gmail.com> >>>> wrote: >>>> >>>>> copying and pasting your code code in a jup notebook works fine. that >>>>> is, using my own version of Range which is simply a list of numbers >>>>> >>>>> how bout this.. does this work fine? >>>>> list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4])) >>>>> >>>>> If it does, i'd look in what's inside your Range and what you get out >>>>> of it. I suspect something wrong in there >>>>> >>>>> If there was something with the clustered function, then you should be >>>>> able to take it out of the map() and still have the code working.. >>>>> Could you try that as well? >>>>> kr >>>>> >>>>> >>>>> On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh < >>>>> mich.talebza...@gmail.com> wrote: >>>>> >>>>>> Sorry, part of the code is not that visible >>>>>> >>>>>> rdd = sc.parallelize(Range). \ >>>>>> map(lambda x: (x, uf.clustered(x, numRows), \ >>>>>> uf.scattered(x,10000), \ >>>>>> uf.randomised(x,10000), \ >>>>>> uf.randomString(50), \ >>>>>> uf.padString(x," ",50), \ >>>>>> uf.padSingleChar("x",4000))) >>>>>> >>>>>> >>>>>> >>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>> for any loss, damage or destruction of data or any other property which >>>>>> may >>>>>> arise from relying on this email's technical content is explicitly >>>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>>> arising from such loss, damage or destruction. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, 11 Dec 2020 at 16:56, Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> Thanks Sean, >>>>>>> >>>>>>> This is the code >>>>>>> >>>>>>> numRows = 100000 ## do in increment of 50K rows otherwise you blow up >>>>>>> driver memory! >>>>>>> # >>>>>>> ## Check if table exist otherwise create it >>>>>>> >>>>>>> >>>>>>> rows = 0 >>>>>>> sqltext = "" >>>>>>> if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): >>>>>>> rows = spark.sql(f"""SELECT COUNT(1) FROM >>>>>>> {fullyQualifiedTableName}""").collect()[0][0] >>>>>>> print ("number of rows is ",rows) >>>>>>> else: >>>>>>> print(f"\nTable {fullyQualifiedTableName} does not exist, creating >>>>>>> table ") >>>>>>> sqltext = """ >>>>>>> CREATE TABLE {DB}.{tableName}( >>>>>>> ID INT >>>>>>> , CLUSTERED INT >>>>>>> , SCATTERED INT >>>>>>> , RANDOMISED INT >>>>>>> , RANDOM_STRING VARCHAR(50) >>>>>>> , SMALL_VC VARCHAR(50) >>>>>>> , PADDING VARCHAR(4000) >>>>>>> ) >>>>>>> STORED AS PARQUET >>>>>>> """ >>>>>>> spark.sql(sqltext) >>>>>>> >>>>>>> start = 0 >>>>>>> if (rows == 0): >>>>>>> start = 1 >>>>>>> else: >>>>>>> maxID = spark.sql(f"SELECT MAX(id) FROM >>>>>>> {fullyQualifiedTableName}").collect()[0][0] >>>>>>> start = maxID + 1 >>>>>>> end = start + numRows - 1 >>>>>>> print ("starting at ID = ",start, ",ending on = ",end) >>>>>>> Range = range(start, end+1) >>>>>>> ## This traverses through the Range and increment "x" by one unit each >>>>>>> time, and that x value is used in the code to generate random data >>>>>>> through Python functions in a class >>>>>>> print(numRows) >>>>>>> print(uf.clustered(200,numRows)) >>>>>>> rdd = sc.parallelize(Range). \ >>>>>>> map(lambda x: (x, uf.clustered(x, numRows), \ >>>>>>> uf.scattered(x,10000), \ >>>>>>> uf.randomised(x,10000), \ >>>>>>> uf.randomString(50), \ >>>>>>> uf.padString(x," ",50), \ >>>>>>> uf.padSingleChar("x",4000))) >>>>>>> df = rdd.toDF(). \ >>>>>>> withColumnRenamed("_1","ID"). \ >>>>>>> withColumnRenamed("_2", "CLUSTERED"). \ >>>>>>> withColumnRenamed("_3", "SCATTERED"). \ >>>>>>> withColumnRenamed("_4", "RANDOMISED"). \ >>>>>>> withColumnRenamed("_5", "RANDOM_STRING"). \ >>>>>>> withColumnRenamed("_6", "SMALL_VC"). \ >>>>>>> withColumnRenamed("_7", "PADDING") >>>>>>> >>>>>>> >>>>>>> And this is the run with error >>>>>>> >>>>>>> >>>>>>> Started at >>>>>>> >>>>>>> 11/12/2020 14:42:45.45 >>>>>>> >>>>>>> number of rows is 4500000 >>>>>>> >>>>>>> starting at ID = 4500001 ,ending on = 4600000 >>>>>>> >>>>>>> 100000 >>>>>>> >>>>>>> 0.00199 >>>>>>> >>>>>>> 20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 >>>>>>> (TID 33) >>>>>>> >>>>>>> org.apache.spark.api.python.PythonException: Traceback (most recent >>>>>>> call last): >>>>>>> >>>>>>> File >>>>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", >>>>>>> line 605, in main >>>>>>> >>>>>>> File >>>>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", >>>>>>> line 597, in process >>>>>>> >>>>>>> File >>>>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", >>>>>>> line 271, in dump_stream >>>>>>> >>>>>>> vs = list(itertools.islice(iterator, batch)) >>>>>>> >>>>>>> File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line >>>>>>> 1440, in takeUpToNumLeft >>>>>>> >>>>>>> yield next(iterator) >>>>>>> >>>>>>> File >>>>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", >>>>>>> line >>>>>>> 107, in wrapper >>>>>>> >>>>>>> return f(*args, **kwargs) >>>>>>> >>>>>>> File >>>>>>> "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line >>>>>>> 101, in <lambda> >>>>>>> >>>>>>> map(lambda x: (x, uf.clustered(x, numRows), \ >>>>>>> >>>>>>> NameError: name 'numRows' is not defined >>>>>>> >>>>>>> Regards, >>>>>>> >>>>>>> Mich >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>> for any loss, damage or destruction of data or any other property which >>>>>>> may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages >>>>>>> arising from such loss, damage or destruction. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, 11 Dec 2020 at 16:47, Sean Owen <sro...@gmail.com> wrote: >>>>>>> >>>>>>>> Looks like a simple Python error - you haven't shown the code that >>>>>>>> produces it. Indeed, I suspect you'll find there is no such symbol. >>>>>>>> >>>>>>>> On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh < >>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> This used to work but not anymore. >>>>>>>>> >>>>>>>>> I have UsedFunctions.py file that has these functions >>>>>>>>> >>>>>>>>> import random >>>>>>>>> import string >>>>>>>>> import math >>>>>>>>> >>>>>>>>> def randomString(length): >>>>>>>>> letters = string.ascii_letters >>>>>>>>> result_str = ''.join(random.choice(letters) for i in >>>>>>>>> range(length)) >>>>>>>>> return result_str >>>>>>>>> >>>>>>>>> def clustered(x,numRows): >>>>>>>>> return math.floor(x -1)/numRows >>>>>>>>> >>>>>>>>> def scattered(x,numRows): >>>>>>>>> return abs((x -1 % numRows))* 1.0 >>>>>>>>> >>>>>>>>> def randomised(seed,numRows): >>>>>>>>> random.seed(seed) >>>>>>>>> return abs(random.randint(0, numRows) % numRows) * 1.0 >>>>>>>>> >>>>>>>>> def padString(x,chars,length): >>>>>>>>> n = int(math.log10(x) + 1) >>>>>>>>> result_str = ''.join(random.choice(chars) for i in >>>>>>>>> range(length-n)) + str(x) >>>>>>>>> return result_str >>>>>>>>> >>>>>>>>> def padSingleChar(chars,length): >>>>>>>>> result_str = ''.join(chars for i in range(length)) >>>>>>>>> return result_str >>>>>>>>> >>>>>>>>> def println(lst): >>>>>>>>> for ll in lst: >>>>>>>>> print(ll[0]) >>>>>>>>> >>>>>>>>> Now in the main().py module I import this file as follows: >>>>>>>>> >>>>>>>>> import UsedFunctions as uf >>>>>>>>> >>>>>>>>> Then I try the following >>>>>>>>> >>>>>>>>> import UsedFunctions as uf >>>>>>>>> >>>>>>>>> numRows = 100000 ## do in increment of 100K rows >>>>>>>>> rdd = sc.parallelize(Range). \ >>>>>>>>> map(lambda x: (x, uf.clustered(x, numRows), \ >>>>>>>>> uf.scattered(x,10000), \ >>>>>>>>> uf.randomised(x,10000), \ >>>>>>>>> uf.randomString(50), \ >>>>>>>>> uf.padString(x," ",50), \ >>>>>>>>> uf.padSingleChar("x",4000))) >>>>>>>>> The problem is that now it throws error for numRows as below >>>>>>>>> >>>>>>>>> >>>>>>>>> File >>>>>>>>> "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", >>>>>>>>> line >>>>>>>>> 101, in <lambda> >>>>>>>>> map(lambda x: (x, uf.clustered(x, numRows), \ >>>>>>>>> NameError: name 'numRows' is not defined >>>>>>>>> >>>>>>>>> I don't know why this error is coming! >>>>>>>>> >>>>>>>>> Appreciate any ideas >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Mich >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>>>> for any loss, damage or destruction of data or any other property >>>>>>>>> which may >>>>>>>>> arise from relying on this email's technical content is explicitly >>>>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>>>> damages >>>>>>>>> arising from such loss, damage or destruction. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>