Hi Sean and Gourav

Thanks for the suggestions. I thought that both the sql and dataframe apis are 
wrappers around the same frame work? Ie. catalysts.

I tend to mix and match my code. Sometimes I find it easier to write using sql 
some times dataframes. What is considered best practices?

Here is an example

Case 1
       for i in range( 1, len( self.sampleNamesList ) ): # iterate 16000 times!
            sampleName = self.sampleNamesList[i]
            sampleDF= quantSparkDFList[i]
       sampleSDF.createOrReplaceTempView( "sample" )

            # the sample name must be quoted else column names with a '-'
            # like GTEX-1117F-0426-SM-5EGHI will generate an error
            # spark thinks the '-' is an expression. '_' is also
            # a special char for the sql like operator
            # https://stackoverflow.com/a/63899306/4586180
            sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\
                            from \n\
                               rawCounts as rc, \n\
                               sample  \n\
                            where \n\
                                rc.Name == sample.Name \n'.format( sampleName )

                        rawCountsSDF = self.spark.sql( sqlStmt )
            rawCountsSDF.createOrReplaceTempView( "rawCounts"

case 2
       for i in range( 1, len(dfList) ):
                         df2 = dfList[i]
                         retDF = retDF.join( df2.selectExpr("*"), on=["Name"] )


I think my out of memory exception maybe because the query plan is huge. I have 
not figure out how to figure out if that is my bug or not. My untested work 
around is organize the data so that each massive join is run on 1/5 of the 
total data set, then union them all together. Each “part” will still need to 
iterate 16000 times

In general I assume we want to avoid for loops. I assume Spark is unable to 
optimize them. It would be nice if spark provide some sort of join all function 
even if it used a for loop to hide this from me

Happy holidays

Andy



From: Sean Owen <sro...@gmail.com>
Date: Friday, December 24, 2021 at 8:30 AM
To: Gourav Sengupta <gourav.sengu...@gmail.com>
Cc: Andrew Davidson <aedav...@ucsc.edu.invalid>, Nicholas Gustafson 
<njgustaf...@gmail.com>, User <user@spark.apache.org>
Subject: Re: AnalysisException: Trouble using select() to append multiple 
columns

(that's not the situation below we are commenting on)
On Fri, Dec 24, 2021, 9:28 AM Gourav Sengupta 
<gourav.sengu...@gmail.com<mailto:gourav.sengu...@gmail.com>> wrote:
Hi,

try to write several withColumns in a dataframe with functions and then see the 
UI for time differences. This should be done with large data sets of course, in 
order of a around 200GB +

With scenarios involving nested queries and joins the time differences shown in 
the UI becomes a bit more visible.

Regards,
Gourav Sengupta

On Fri, Dec 24, 2021 at 2:48 PM Sean Owen 
<sro...@gmail.com<mailto:sro...@gmail.com>> wrote:
Nah, it's going to translate to the same plan as the equivalent SQL.

On Fri, Dec 24, 2021, 5:09 AM Gourav Sengupta 
<gourav.sengu...@gmail.com<mailto:gourav.sengu...@gmail.com>> wrote:
Hi,

please note that using SQL is much more performant, and easier to manage these 
kind of issues. You might want to look at the SPARK UI to see the advantage of 
using SQL over dataframes API.


Regards,
Gourav Sengupta

On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson <aedav...@ucsc.edu.invalid> 
wrote:
Thanks Nicholas

Andy

From: Nicholas Gustafson <njgustaf...@gmail.com<mailto:njgustaf...@gmail.com>>
Date: Friday, December 17, 2021 at 6:12 PM
To: Andrew Davidson <aedav...@ucsc.edu.invalid>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: AnalysisException: Trouble using select() to append multiple 
columns

Since df1 and df2 are different DataFrames, you will need to use a join. For 
example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”), on=[“Name”])

On Dec 17, 2021, at 16:25, Andrew Davidson <aedav...@ucsc.edu.invalid> wrote:

Hi I am a newbie

I have 16,000 data files, all files have the same number of rows and columns. 
The row ids are identical and are in the same order. I want to create a new 
data frame that contains the 3rd column from each data file

I wrote a test program that uses a for loop and Join. It works with my small 
test set. I get an OOM when I try to run using the all the data files. I 
realize that join ( map reduce) is probably not a great solution for my problem

Recently I found several articles that take about the challenge with using 
withColumn() and talk about how to use select() to append columns

https://mungingdata.com/pyspark/select-add-columns-withcolumn/
https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop


I am using pyspark spark-3.1.2-bin-hadoop3.2

I wrote a little test program. It am able to append columns created using 
pyspark.sql.function.lit(). I am not able to append columns from other data 
frames


df1

DataFrame[Name: string, ctrl_1: double]

+-------+------+

|   Name|ctrl_1|

+-------+------+

| txId_1|   0.0|

| txId_2|  11.0|

| txId_3|  12.0|

| txId_4|  13.0|

| txId_5|  14.0|

| txId_6|  15.0|

| txId_7|  16.0|

| txId_8|  17.0|

| txId_9|  18.0|

|txId_10|  19.0|

+-------+------+

# use select to append multiple literals
allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"), 
pyf.lit("mn0").alias("y")] )

allDF3
DataFrame[Name: string, ctrl_1: double, x: string, y: string]
+-------+------+---+---+
|   Name|ctrl_1|  x|  y|
+-------+------+---+---+
| txId_1|   0.0|abc|mn0|
| txId_2|  11.0|abc|mn0|
| txId_3|  12.0|abc|mn0|
| txId_4|  13.0|abc|mn0|
| txId_5|  14.0|abc|mn0|
| txId_6|  15.0|abc|mn0|
| txId_7|  16.0|abc|mn0|
| txId_8|  17.0|abc|mn0|
| txId_9|  18.0|abc|mn0|
|txId_10|  19.0|abc|mn0|
+-------+------+---+---+


df2

DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, 
NumReads: double]

+-------+------+---------------+----+--------+

|   Name|Length|EffectiveLength| TPM|NumReads|

+-------+------+---------------+----+--------+

| txId_1|  1500|         1234.5|12.1|     0.1|

| txId_2|  1510|         1244.5|13.1|    11.1|

| txId_3|  1520|         1254.5|14.1|    12.1|

| txId_4|  1530|         1264.5|15.1|    13.1|

| txId_5|  1540|         1274.5|16.1|    14.1|

| txId_6|  1550|         1284.5|17.1|    15.1|

| txId_7|  1560|         1294.5|18.1|    16.1|

| txId_8|  1570|         1304.5|19.1|    17.1|

| txId_9|  1580|         1314.5|20.1|    18.1|

|txId_10|  1590|         1324.5|21.1|    19.1|

+-------+------+---------------+----+--------+



s2Col = df2["NumReads"].alias( 'ctrl_2' )

print("type(s2Col) = {}".format(type(s2Col)) )



type(s2Col) = <class 'pyspark.sql.column.Column'>

allDF4 = df1.select( ["*", s2Col] )

~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/dataframe.py
 in select(self, *cols)

   1667         [Row(name='Alice', age=12), Row(name='Bob', age=15)]

   1668         """

-> 1669         jdf = self._jdf.select(self._jcols(*cols))

   1670         return DataFrame(jdf, self.sql_ctx)

   1671



../../sparkBin/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
 in __call__(self, *args)

   1303         answer = self.gateway_client.send_command(command)

   1304         return_value = get_return_value(

-> 1305             answer, self.gateway_client, self.target_id, 
self.name<http://self.name>)

   1306

   1307         for temp_arg in temp_args:



~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py
 in deco(*a, **kw)

    115                 # Hide where the exception came from that shows a 
non-Pythonic

    116                 # JVM exception message.

--> 117                 raise converted from None

    118             else:

    119                 raise



AnalysisException: Resolved attribute(s) NumReads#14 missing from 
Name#0,ctrl_1#2447 in operator !Project [Name#0, ctrl_1#2447, NumReads#14 AS 
ctrl_2#2550].;

!Project [Name#0, ctrl_1#2447, NumReads#14 AS ctrl_2#2550]

+- Project [Name#0, NumReads#4 AS ctrl_1#2447]

   +- Project [Name#0, NumReads#4]

      +- Relation[Name#0,Length#1,EffectiveLength#2,TPM#3,NumReads#4] csv

Any idea what my bug is?

Kind regards

Andy

Reply via email to