Thanks Sean,

This is the code

numRows = 100000   ## do in increment of 50K rows otherwise you blow
up driver memory!
#
## Check if table exist otherwise create it


rows = 0
sqltext  = ""
if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
  rows = spark.sql(f"""SELECT COUNT(1) FROM
{fullyQualifiedTableName}""").collect()[0][0]
  print ("number of rows is ",rows)
else:
  print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")
  sqltext = """
  CREATE TABLE {DB}.{tableName}(
  ID INT
  , CLUSTERED INT
  , SCATTERED INT
  , RANDOMISED INT
  , RANDOM_STRING VARCHAR(50)
  , SMALL_VC VARCHAR(50)
  , PADDING  VARCHAR(4000)
  )
  STORED AS PARQUET
  """
  spark.sql(sqltext)

start = 0
if (rows == 0):
  start = 1
else:
  maxID = spark.sql(f"SELECT MAX(id) FROM
{fullyQualifiedTableName}").collect()[0][0]
  start = maxID + 1
  end = start + numRows - 1
print ("starting at ID = ",start, ",ending on = ",end)
Range = range(start, end+1)
## This traverses through the Range and increment "x" by one unit each
time, and that x value is used in the code to generate random data
through Python functions in a class
print(numRows)
print(uf.clustered(200,numRows))
rdd = sc.parallelize(Range). \
         map(lambda x: (x, uf.clustered(x, numRows), \
                           uf.scattered(x,10000), \
                           uf.randomised(x,10000), \
                           uf.randomString(50), \
                           uf.padString(x," ",50), \
                           uf.padSingleChar("x",4000)))
df = rdd.toDF(). \
     withColumnRenamed("_1","ID"). \
     withColumnRenamed("_2", "CLUSTERED"). \
     withColumnRenamed("_3", "SCATTERED"). \
     withColumnRenamed("_4", "RANDOMISED"). \
     withColumnRenamed("_5", "RANDOM_STRING"). \
     withColumnRenamed("_6", "SMALL_VC"). \
     withColumnRenamed("_7", "PADDING")


And this is the run with error


Started at

11/12/2020 14:42:45.45

number of rows is  4500000

starting at ID =  4500001 ,ending on =  4600000

100000

0.00199

20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID
33)

org.apache.spark.api.python.PythonException: Traceback (most recent call
last):

  File
"C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
line 605, in main

  File
"C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
line 597, in process

  File
"C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py",
line 271, in dump_stream

    vs = list(itertools.islice(iterator, batch))

  File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in
takeUpToNumLeft

    yield next(iterator)

  File
"C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line
107, in wrapper

    return f(*args, **kwargs)

  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py",
line 101, in <lambda>

    map(lambda x: (x, uf.clustered(x, numRows), \

NameError: name 'numRows' is not defined

Regards,

Mich


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 11 Dec 2020 at 16:47, Sean Owen <sro...@gmail.com> wrote:

> Looks like a simple Python error - you haven't shown the code that
> produces it. Indeed, I suspect you'll find there is no such symbol.
>
> On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Hi,
>>
>> This used to work but not anymore.
>>
>> I have UsedFunctions.py file that has these functions
>>
>> import random
>> import string
>> import math
>>
>> def randomString(length):
>>     letters = string.ascii_letters
>>     result_str = ''.join(random.choice(letters) for i in range(length))
>>     return result_str
>>
>> def clustered(x,numRows):
>>     return math.floor(x -1)/numRows
>>
>> def scattered(x,numRows):
>>     return abs((x -1 % numRows))* 1.0
>>
>> def randomised(seed,numRows):
>>     random.seed(seed)
>>     return abs(random.randint(0, numRows) % numRows) * 1.0
>>
>> def padString(x,chars,length):
>>     n = int(math.log10(x) + 1)
>>     result_str = ''.join(random.choice(chars) for i in range(length-n)) + 
>> str(x)
>>     return result_str
>>
>> def padSingleChar(chars,length):
>>     result_str = ''.join(chars for i in range(length))
>>     return result_str
>>
>> def println(lst):
>>     for ll in lst:
>>       print(ll[0])
>>
>> Now in the main().py module I import this file as follows:
>>
>> import UsedFunctions as uf
>>
>> Then I try the following
>>
>> import UsedFunctions as uf
>>
>>  numRows = 100000   ## do in increment of 100K rows
>>  rdd = sc.parallelize(Range). \
>>            map(lambda x: (x, uf.clustered(x, numRows), \
>>                              uf.scattered(x,10000), \
>>                              uf.randomised(x,10000), \
>>                              uf.randomString(50), \
>>                              uf.padString(x," ",50), \
>>                              uf.padSingleChar("x",4000)))
>> The problem is that now it throws error for numRows as below
>>
>>
>>   File
>> "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line
>> 101, in <lambda>
>>     map(lambda x: (x, uf.clustered(x, numRows), \
>> NameError: name 'numRows' is not defined
>>
>> I don't know why this error is coming!
>>
>> Appreciate any ideas
>>
>> Thanks,
>>
>> Mich
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>

Reply via email to