Hi Mich i dont think it's a good idea... I believe your IDE is playing tricks on you. Take spark out of the equation.... this is a python issue only. i am guessing your IDE is somehow messing up your environment.
if you take out the whole spark code and replace it by this code map(lambda x: (x, uf.clustered(x,numRows), \ uf.scattered(x,numRows), \ uf.randomised(x, numRows), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000)), [1,2,3,4,5]) you should get exactly the same error... Send me a zip with the tfconstants,py and a trimmed donw version of your main,py and i'll plug it in my IDE and see if i can reproduce It worked fine in Jupyter, but then i have all functins in same notebook hth marco On Sat, Dec 12, 2020 at 9:02 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > I solved the issue of variable numRows within the lambda function not > defined by defining it as a Global variable > > global numRows > numRows = 10 ## do in increment of 50K rows otherwise you blow up driver > memory! > # > > Then I could call it within the lambda function as follows > > > rdd = sc.parallelize(Range). \ > map(lambda x: (x, uf.clustered(x,numRows), \ > uf.scattered(x,numRows), \ > uf.randomised(x, numRows), \ > uf.randomString(50), \ > uf.padString(x," ",50), \ > uf.padSingleChar("x",4000))) > > This then worked. I am not convinced this is *the correct* solution but > somehow it worked. > > > Thanks > > > Mich > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Fri, 11 Dec 2020 at 18:52, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> many thanks KR. >> >> If i call the clusterted function on its own it works >> >> numRows = 100000 >> >> print(uf.clustered(200,numRows)) >> >> and returns >> >> 0.00199 >> If I run all in one including the UsedFunctions claa in the same py file >> it works. The code is attached >> >> However, in PyCharm, I do the following >> >> UsedFunctions.py. Note that this file only contains functions and no class >> >> import logging >> import random >> import string >> import math >> >> def randomString(length): >> letters = string.ascii_letters >> result_str = ''.join(random.choice(letters) for i in range(length)) >> return result_str >> >> def clustered(x,numRows): >> return math.floor(x -1)/numRows >> >> def scattered(x,numRows): >> return abs((x -1 % numRows))* 1.0 >> >> def randomised(seed,numRows): >> random.seed(seed) >> return abs(random.randint(0, numRows) % numRows) * 1.0 >> >> def padString(x,chars,length): >> n = int(math.log10(x) + 1) >> result_str = ''.join(random.choice(chars) for i in range(length-n)) >> + str(x) >> return result_str >> >> def padSingleChar(chars,length): >> result_str = ''.join(chars for i in range(length)) >> return result_str >> >> def println(lst): >> for ll in lst: >> print(ll[0]) >> >> In the main.py(PyCharm) I have this code which is failing >> >> from pyspark import SparkContext, SparkConf >> >> from pyspark.sql import SQLContext >> >> from pyspark.sql import HiveContext >> >> from pyspark.sql import SparkSession >> >> from pyspark.sql import Row >> >> from pyspark.sql.types import StringType, ArrayType >> >> from pyspark.sql.functions import udf, col, max as max, to_date, >> date_add, \ >> >> add_months >> >> from datetime import datetime, timedelta >> >> import os >> >> from os.path import join, abspath >> >> from typing import Optional >> >> import logging >> >> import random >> >> import string >> >> import math >> >> import mathOperations as mo >> >> import UsedFunctions as uf >> >> ##import test_oracle as to >> >> >> class main: >> >> rec = {} >> >> settings = [ >> >> ("hive.exec.dynamic.partition", "true"), >> >> ("hive.exec.dynamic.partition.mode", "nonstrict"), >> >> ("spark.sql.orc.filterPushdown", "true"), >> >> ("hive.msck.path.validation", "ignore"), >> >> ("spark.sql.caseSensitive", "true"), >> >> ("spark.speculation", "false"), >> >> ("hive.metastore.authorization.storage.checks", "false"), >> >> ("hive.metastore.client.connect.retry.delay", "5s"), >> >> ("hive.metastore.client.socket.timeout", "1800s"), >> >> ("hive.metastore.connect.retries", "12"), >> >> ("hive.metastore.execute.setugi", "false"), >> >> ("hive.metastore.failure.retries", "12"), >> >> ("hive.metastore.schema.verification", "false"), >> >> ("hive.metastore.schema.verification.record.version", >> "false"), >> >> ("hive.metastore.server.max.threads", "100000"), >> >> ("hive.metastore.authorization.storage.checks", >> "/apps/hive/warehouse") >> >> ] >> >> configs = {"DB":"pycharm", >> >> "tableName":"randomDataPy"} >> >> DB = "pycharm" >> >> tableName = "randomDataPy" >> >> fullyQualifiedTableName = DB +"."+tableName >> >> spark = SparkSession.builder \ >> >> .appName("app1") \ >> >> .enableHiveSupport() \ >> >> .getOrCreate() >> >> >> spark.sparkContext._conf.setAll(settings) >> >> >> sc = SparkContext.getOrCreate() >> >> print(sc.getConf().getAll()) >> >> sqlContext = SQLContext(sc) >> >> HiveContext = HiveContext(sc) >> >> lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy >> HH:mm:ss.ss') ")).collect() >> >> print("\nStarted at");uf.println(lst) >> >> >> numRows = 100000 ## do in increment of 50K rows otherwise you blow up >> driver memory! >> >> # >> >> ## Check if table exist otherwise create it >> >> >> rows = 0 >> >> sqltext = "" >> >> if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): >> >> rows = spark.sql(f"""SELECT COUNT(1) FROM >> {fullyQualifiedTableName}""").collect()[0][0] >> >> print ("number of rows is ",rows) >> >> else: >> >> print(f"\nTable {fullyQualifiedTableName} does not exist, creating >> table ") >> >> sqltext = """ >> >> CREATE TABLE {DB}.{tableName}( >> >> ID INT >> >> , CLUSTERED INT >> >> , SCATTERED INT >> >> , RANDOMISED INT >> >> , RANDOM_STRING VARCHAR(50) >> >> , SMALL_VC VARCHAR(50) >> >> , PADDING VARCHAR(4000) >> >> ) >> >> STORED AS PARQUET >> >> """ >> >> spark.sql(sqltext) >> >> >> start = 0 >> >> if (rows == 0): >> >> start = 1 >> >> else: >> >> maxID = spark.sql(f"SELECT MAX(id) FROM >> {fullyQualifiedTableName}").collect()[0][0] >> >> start = maxID + 1 >> >> end = start + numRows - 1 >> >> print ("starting at ID = ",start, ",ending on = ",end) >> >> Range = range(start, end+1) >> >> ## This traverses through the Range and increment "x" by one unit each >> time, and that x value is used in the code to generate random data through >> Python functions in a class >> >> print(numRows) >> >> print(uf.clustered(200,numRows)) >> >> rdd = sc.parallelize(Range). \ >> >> map(lambda x: (x, uf.clustered(x, numRows), \ >> >> uf.scattered(x,10000), \ >> >> uf.randomised(x,10000), \ >> >> uf.randomString(50), \ >> >> uf.padString(x," ",50), \ >> >> uf.padSingleChar("x",4000))) >> >> df = rdd.toDF(). \ >> >> withColumnRenamed("_1","ID"). \ >> >> withColumnRenamed("_2", "CLUSTERED"). \ >> >> withColumnRenamed("_3", "SCATTERED"). \ >> >> withColumnRenamed("_4", "RANDOMISED"). \ >> >> withColumnRenamed("_5", "RANDOM_STRING"). \ >> >> withColumnRenamed("_6", "SMALL_VC"). \ >> >> withColumnRenamed("_7", "PADDING") >> >> df.write.mode("overwrite").saveAsTable("pycharm.ABCD") >> >> df.printSchema() >> >> df.explain() >> >> df.createOrReplaceTempView("tmp") >> >> sqltext = f""" >> >> INSERT INTO TABLE {fullyQualifiedTableName} >> >> SELECT >> >> ID >> >> , CLUSTERED >> >> , SCATTERED >> >> , RANDOMISED >> >> , RANDOM_STRING >> >> , SMALL_VC >> >> , PADDING >> >> FROM tmp >> >> """ >> >> spark.sql(sqltext) >> >> spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM >> {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False) >> >> ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY >> id""").show(n=20,truncate=False,vertical=False) >> >> lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy >> HH:mm:ss.ss') ")).collect() >> >> print("\nFinished at");usedFunctions.println(lst) >> >> >> >> >> >> On Fri, 11 Dec 2020 at 18:04, Sofia’s World <mmistr...@gmail.com> wrote: >> >>> copying and pasting your code code in a jup notebook works fine. that >>> is, using my own version of Range which is simply a list of numbers >>> >>> how bout this.. does this work fine? >>> list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4])) >>> >>> If it does, i'd look in what's inside your Range and what you get out of >>> it. I suspect something wrong in there >>> >>> If there was something with the clustered function, then you should be >>> able to take it out of the map() and still have the code working.. >>> Could you try that as well? >>> kr >>> >>> >>> On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Sorry, part of the code is not that visible >>>> >>>> rdd = sc.parallelize(Range). \ >>>> map(lambda x: (x, uf.clustered(x, numRows), \ >>>> uf.scattered(x,10000), \ >>>> uf.randomised(x,10000), \ >>>> uf.randomString(50), \ >>>> uf.padString(x," ",50), \ >>>> uf.padSingleChar("x",4000))) >>>> >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Fri, 11 Dec 2020 at 16:56, Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> Thanks Sean, >>>>> >>>>> This is the code >>>>> >>>>> numRows = 100000 ## do in increment of 50K rows otherwise you blow up >>>>> driver memory! >>>>> # >>>>> ## Check if table exist otherwise create it >>>>> >>>>> >>>>> rows = 0 >>>>> sqltext = "" >>>>> if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): >>>>> rows = spark.sql(f"""SELECT COUNT(1) FROM >>>>> {fullyQualifiedTableName}""").collect()[0][0] >>>>> print ("number of rows is ",rows) >>>>> else: >>>>> print(f"\nTable {fullyQualifiedTableName} does not exist, creating >>>>> table ") >>>>> sqltext = """ >>>>> CREATE TABLE {DB}.{tableName}( >>>>> ID INT >>>>> , CLUSTERED INT >>>>> , SCATTERED INT >>>>> , RANDOMISED INT >>>>> , RANDOM_STRING VARCHAR(50) >>>>> , SMALL_VC VARCHAR(50) >>>>> , PADDING VARCHAR(4000) >>>>> ) >>>>> STORED AS PARQUET >>>>> """ >>>>> spark.sql(sqltext) >>>>> >>>>> start = 0 >>>>> if (rows == 0): >>>>> start = 1 >>>>> else: >>>>> maxID = spark.sql(f"SELECT MAX(id) FROM >>>>> {fullyQualifiedTableName}").collect()[0][0] >>>>> start = maxID + 1 >>>>> end = start + numRows - 1 >>>>> print ("starting at ID = ",start, ",ending on = ",end) >>>>> Range = range(start, end+1) >>>>> ## This traverses through the Range and increment "x" by one unit each >>>>> time, and that x value is used in the code to generate random data >>>>> through Python functions in a class >>>>> print(numRows) >>>>> print(uf.clustered(200,numRows)) >>>>> rdd = sc.parallelize(Range). \ >>>>> map(lambda x: (x, uf.clustered(x, numRows), \ >>>>> uf.scattered(x,10000), \ >>>>> uf.randomised(x,10000), \ >>>>> uf.randomString(50), \ >>>>> uf.padString(x," ",50), \ >>>>> uf.padSingleChar("x",4000))) >>>>> df = rdd.toDF(). \ >>>>> withColumnRenamed("_1","ID"). \ >>>>> withColumnRenamed("_2", "CLUSTERED"). \ >>>>> withColumnRenamed("_3", "SCATTERED"). \ >>>>> withColumnRenamed("_4", "RANDOMISED"). \ >>>>> withColumnRenamed("_5", "RANDOM_STRING"). \ >>>>> withColumnRenamed("_6", "SMALL_VC"). \ >>>>> withColumnRenamed("_7", "PADDING") >>>>> >>>>> >>>>> And this is the run with error >>>>> >>>>> >>>>> Started at >>>>> >>>>> 11/12/2020 14:42:45.45 >>>>> >>>>> number of rows is 4500000 >>>>> >>>>> starting at ID = 4500001 ,ending on = 4600000 >>>>> >>>>> 100000 >>>>> >>>>> 0.00199 >>>>> >>>>> 20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 >>>>> (TID 33) >>>>> >>>>> org.apache.spark.api.python.PythonException: Traceback (most recent >>>>> call last): >>>>> >>>>> File >>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", >>>>> line 605, in main >>>>> >>>>> File >>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", >>>>> line 597, in process >>>>> >>>>> File >>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", >>>>> line 271, in dump_stream >>>>> >>>>> vs = list(itertools.islice(iterator, batch)) >>>>> >>>>> File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, >>>>> in takeUpToNumLeft >>>>> >>>>> yield next(iterator) >>>>> >>>>> File >>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", >>>>> line >>>>> 107, in wrapper >>>>> >>>>> return f(*args, **kwargs) >>>>> >>>>> File >>>>> "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line >>>>> 101, in <lambda> >>>>> >>>>> map(lambda x: (x, uf.clustered(x, numRows), \ >>>>> >>>>> NameError: name 'numRows' is not defined >>>>> >>>>> Regards, >>>>> >>>>> Mich >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, 11 Dec 2020 at 16:47, Sean Owen <sro...@gmail.com> wrote: >>>>> >>>>>> Looks like a simple Python error - you haven't shown the code that >>>>>> produces it. Indeed, I suspect you'll find there is no such symbol. >>>>>> >>>>>> On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> This used to work but not anymore. >>>>>>> >>>>>>> I have UsedFunctions.py file that has these functions >>>>>>> >>>>>>> import random >>>>>>> import string >>>>>>> import math >>>>>>> >>>>>>> def randomString(length): >>>>>>> letters = string.ascii_letters >>>>>>> result_str = ''.join(random.choice(letters) for i in range(length)) >>>>>>> return result_str >>>>>>> >>>>>>> def clustered(x,numRows): >>>>>>> return math.floor(x -1)/numRows >>>>>>> >>>>>>> def scattered(x,numRows): >>>>>>> return abs((x -1 % numRows))* 1.0 >>>>>>> >>>>>>> def randomised(seed,numRows): >>>>>>> random.seed(seed) >>>>>>> return abs(random.randint(0, numRows) % numRows) * 1.0 >>>>>>> >>>>>>> def padString(x,chars,length): >>>>>>> n = int(math.log10(x) + 1) >>>>>>> result_str = ''.join(random.choice(chars) for i in range(length-n)) >>>>>>> + str(x) >>>>>>> return result_str >>>>>>> >>>>>>> def padSingleChar(chars,length): >>>>>>> result_str = ''.join(chars for i in range(length)) >>>>>>> return result_str >>>>>>> >>>>>>> def println(lst): >>>>>>> for ll in lst: >>>>>>> print(ll[0]) >>>>>>> >>>>>>> Now in the main().py module I import this file as follows: >>>>>>> >>>>>>> import UsedFunctions as uf >>>>>>> >>>>>>> Then I try the following >>>>>>> >>>>>>> import UsedFunctions as uf >>>>>>> >>>>>>> numRows = 100000 ## do in increment of 100K rows >>>>>>> rdd = sc.parallelize(Range). \ >>>>>>> map(lambda x: (x, uf.clustered(x, numRows), \ >>>>>>> uf.scattered(x,10000), \ >>>>>>> uf.randomised(x,10000), \ >>>>>>> uf.randomString(50), \ >>>>>>> uf.padString(x," ",50), \ >>>>>>> uf.padSingleChar("x",4000))) >>>>>>> The problem is that now it throws error for numRows as below >>>>>>> >>>>>>> >>>>>>> File >>>>>>> "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line >>>>>>> 101, in <lambda> >>>>>>> map(lambda x: (x, uf.clustered(x, numRows), \ >>>>>>> NameError: name 'numRows' is not defined >>>>>>> >>>>>>> I don't know why this error is coming! >>>>>>> >>>>>>> Appreciate any ideas >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Mich >>>>>>> >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>> for any loss, damage or destruction of data or any other property which >>>>>>> may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages >>>>>>> arising from such loss, damage or destruction. >>>>>>> >>>>>>> >>>>>>> >>>>>>