HI Gourav
When I join I get OOM. To address this my thought was to split my tables into
small batches of rows. And then join the batch together then use union. My
assumption is the union is a narrow transform and as such require fewer
resources. Let say I have 5 data frames I want to join together and each has
300 rows
I want to create 15 data frames.
Set1 = {11, 12, 13, 14, 15}
Set2 = {21, 22, 23, 24, 25}
Set3 = {31, 32, 33, 34, 35)
The joined the “batch
S1joinDF = 11.join(12).join(13).join(14).join(15)
S2joinDF = 21.join(22).join(23).join(24).join(25)
S3joinDF = 31.join(32).join(33).join(34).join(35)
resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF )
The I originally wrote my code is as follows. Based on my unit test it turns
out I need to call orderBy on every iteration of the for loop. I tried sorting
outside of the while loop, did not resolve problem Given the size of my
dataframes that is going crush performance. My unit test works. I never ran it
on my real data set.
# Create a copy of original dataframe
copyDF = df.orderBy("Name")
# copyDF.show()
i = 0
while i < numberOfSplits:
self.logger.warn("i:{}".format(i))
# Get the top `numRows` number of rows
# note take() is an action
# limit() is a transformation
topDF = copyDF.limit( numRows )
# Truncate the `copy_df` to remove
# the contents fetched for `temp_df`
# original quant.sf files are sorted by name however
# we must use order by, else the row names between
# GTEx sample will not be the same
# we can not simply sort or orderBy once. we have to
# do this on every iteration
copyDF = copyDF.subtract(topDF).orderBy( "Name" )
retList[i] = topDF
# Increment the split number
i += 1
if remainingRows > 0 :
self.logger.info("AEDWIP writing last i:{}
len(retList):{}".format(i, len(retList)))
retList[i] = copyDF
#copyDF.show()
#retList[i].show()
okay so that the background. Rather than use order by. I thought if I could add
a row number I could easily split up mydata frames. My code would look a lot
like what I would write in pandas or R
while i < numBatches:
start = i * numRows
end = start + numRows
print("\ni:{} start:{} end:{}".format(i, start,end))
df = trainDF.iloc[ start:end ]
There does not seem to be an easy way to do this.
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html
The generated ID is guaranteed to be monotonically increasing and unique, but
not consecutive.
Comments and suggestions appreciated
Andy
From: Gourav Sengupta <[email protected]>
Date: Monday, January 10, 2022 at 11:03 AM
To: Andrew Davidson <[email protected]>
Cc: "user @spark" <[email protected]>
Subject: Re: How to add a row number column with out reordering my data frame
Hi,
I am a bit confused here, it is not entirely clear to me why are you creating
the row numbers, and how creating the row numbers helps you with the joins?
Can you please explain with some sample data?
Regards,
Gourav
On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson <[email protected]>
wrote:
Hi
I am trying to work through a OOM error. I have 10411 files. I want to select a
single column from each file and then join them into a single table.
The files have a row unique id. However it is a very long string. The data file
with just the name and column of interest is about 470 M. The column of
interest alone is 21 m. it is a column over 5 million real numbers.
So I thought I would save a lot of memory if I can join over row numbers.
# create dummy variable to orderby https://www.py4u.net/discuss/1840945
w = Window().orderBy(lit('A'))
sampleDF = sampleDF.select( ["NumReads"] )\
.withColumnRenamed( "NumReads", sampleName )\
.withColumn( "tid",row_number().over(w) )
This code seem pretty complicated as someone coming from pandas an R
dataframes. My unit test works however it generates the following warning.
WARN WindowExec: No Partition Defined for Window operation! Moving all data to
a single partition, this can cause serious performance degradation.
Is there a better way to create a row number with our reordering my data? The
order is important
Kind regards
Andy