Hi Sean and Gourav Thanks for the suggestions. I thought that both the sql and dataframe apis are wrappers around the same frame work? Ie. catalysts.
I tend to mix and match my code. Sometimes I find it easier to write using sql some times dataframes. What is considered best practices? Here is an example Case 1 for i in range( 1, len( self.sampleNamesList ) ): # iterate 16000 times! sampleName = self.sampleNamesList[i] sampleDF= quantSparkDFList[i] sampleSDF.createOrReplaceTempView( "sample" ) # the sample name must be quoted else column names with a '-' # like GTEX-1117F-0426-SM-5EGHI will generate an error # spark thinks the '-' is an expression. '_' is also # a special char for the sql like operator # https://stackoverflow.com/a/63899306/4586180 sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\ from \n\ rawCounts as rc, \n\ sample \n\ where \n\ rc.Name == sample.Name \n'.format( sampleName ) rawCountsSDF = self.spark.sql( sqlStmt ) rawCountsSDF.createOrReplaceTempView( "rawCounts" case 2 for i in range( 1, len(dfList) ): df2 = dfList[i] retDF = retDF.join( df2.selectExpr("*"), on=["Name"] ) I think my out of memory exception maybe because the query plan is huge. I have not figure out how to figure out if that is my bug or not. My untested work around is organize the data so that each massive join is run on 1/5 of the total data set, then union them all together. Each “part” will still need to iterate 16000 times In general I assume we want to avoid for loops. I assume Spark is unable to optimize them. It would be nice if spark provide some sort of join all function even if it used a for loop to hide this from me Happy holidays Andy From: Sean Owen <sro...@gmail.com> Date: Friday, December 24, 2021 at 8:30 AM To: Gourav Sengupta <gourav.sengu...@gmail.com> Cc: Andrew Davidson <aedav...@ucsc.edu.invalid>, Nicholas Gustafson <njgustaf...@gmail.com>, User <user@spark.apache.org> Subject: Re: AnalysisException: Trouble using select() to append multiple columns (that's not the situation below we are commenting on) On Fri, Dec 24, 2021, 9:28 AM Gourav Sengupta <gourav.sengu...@gmail.com<mailto:gourav.sengu...@gmail.com>> wrote: Hi, try to write several withColumns in a dataframe with functions and then see the UI for time differences. This should be done with large data sets of course, in order of a around 200GB + With scenarios involving nested queries and joins the time differences shown in the UI becomes a bit more visible. Regards, Gourav Sengupta On Fri, Dec 24, 2021 at 2:48 PM Sean Owen <sro...@gmail.com<mailto:sro...@gmail.com>> wrote: Nah, it's going to translate to the same plan as the equivalent SQL. On Fri, Dec 24, 2021, 5:09 AM Gourav Sengupta <gourav.sengu...@gmail.com<mailto:gourav.sengu...@gmail.com>> wrote: Hi, please note that using SQL is much more performant, and easier to manage these kind of issues. You might want to look at the SPARK UI to see the advantage of using SQL over dataframes API. Regards, Gourav Sengupta On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson <aedav...@ucsc.edu.invalid> wrote: Thanks Nicholas Andy From: Nicholas Gustafson <njgustaf...@gmail.com<mailto:njgustaf...@gmail.com>> Date: Friday, December 17, 2021 at 6:12 PM To: Andrew Davidson <aedav...@ucsc.edu.invalid> Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: AnalysisException: Trouble using select() to append multiple columns Since df1 and df2 are different DataFrames, you will need to use a join. For example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”), on=[“Name”]) On Dec 17, 2021, at 16:25, Andrew Davidson <aedav...@ucsc.edu.invalid> wrote: Hi I am a newbie I have 16,000 data files, all files have the same number of rows and columns. The row ids are identical and are in the same order. I want to create a new data frame that contains the 3rd column from each data file I wrote a test program that uses a for loop and Join. It works with my small test set. I get an OOM when I try to run using the all the data files. I realize that join ( map reduce) is probably not a great solution for my problem Recently I found several articles that take about the challenge with using withColumn() and talk about how to use select() to append columns https://mungingdata.com/pyspark/select-add-columns-withcolumn/ https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop I am using pyspark spark-3.1.2-bin-hadoop3.2 I wrote a little test program. It am able to append columns created using pyspark.sql.function.lit(). I am not able to append columns from other data frames df1 DataFrame[Name: string, ctrl_1: double] +-------+------+ | Name|ctrl_1| +-------+------+ | txId_1| 0.0| | txId_2| 11.0| | txId_3| 12.0| | txId_4| 13.0| | txId_5| 14.0| | txId_6| 15.0| | txId_7| 16.0| | txId_8| 17.0| | txId_9| 18.0| |txId_10| 19.0| +-------+------+ # use select to append multiple literals allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"), pyf.lit("mn0").alias("y")] ) allDF3 DataFrame[Name: string, ctrl_1: double, x: string, y: string] +-------+------+---+---+ | Name|ctrl_1| x| y| +-------+------+---+---+ | txId_1| 0.0|abc|mn0| | txId_2| 11.0|abc|mn0| | txId_3| 12.0|abc|mn0| | txId_4| 13.0|abc|mn0| | txId_5| 14.0|abc|mn0| | txId_6| 15.0|abc|mn0| | txId_7| 16.0|abc|mn0| | txId_8| 17.0|abc|mn0| | txId_9| 18.0|abc|mn0| |txId_10| 19.0|abc|mn0| +-------+------+---+---+ df2 DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, NumReads: double] +-------+------+---------------+----+--------+ | Name|Length|EffectiveLength| TPM|NumReads| +-------+------+---------------+----+--------+ | txId_1| 1500| 1234.5|12.1| 0.1| | txId_2| 1510| 1244.5|13.1| 11.1| | txId_3| 1520| 1254.5|14.1| 12.1| | txId_4| 1530| 1264.5|15.1| 13.1| | txId_5| 1540| 1274.5|16.1| 14.1| | txId_6| 1550| 1284.5|17.1| 15.1| | txId_7| 1560| 1294.5|18.1| 16.1| | txId_8| 1570| 1304.5|19.1| 17.1| | txId_9| 1580| 1314.5|20.1| 18.1| |txId_10| 1590| 1324.5|21.1| 19.1| +-------+------+---------------+----+--------+ s2Col = df2["NumReads"].alias( 'ctrl_2' ) print("type(s2Col) = {}".format(type(s2Col)) ) type(s2Col) = <class 'pyspark.sql.column.Column'> allDF4 = df1.select( ["*", s2Col] ) ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/dataframe.py in select(self, *cols) 1667 [Row(name='Alice', age=12), Row(name='Bob', age=15)] 1668 """ -> 1669 jdf = self._jdf.select(self._jcols(*cols)) 1670 return DataFrame(jdf, self.sql_ctx) 1671 ../../sparkBin/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 1303 answer = self.gateway_client.send_command(command) 1304 return_value = get_return_value( -> 1305 answer, self.gateway_client, self.target_id, self.name<http://self.name>) 1306 1307 for temp_arg in temp_args: ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py in deco(*a, **kw) 115 # Hide where the exception came from that shows a non-Pythonic 116 # JVM exception message. --> 117 raise converted from None 118 else: 119 raise AnalysisException: Resolved attribute(s) NumReads#14 missing from Name#0,ctrl_1#2447 in operator !Project [Name#0, ctrl_1#2447, NumReads#14 AS ctrl_2#2550].; !Project [Name#0, ctrl_1#2447, NumReads#14 AS ctrl_2#2550] +- Project [Name#0, NumReads#4 AS ctrl_1#2447] +- Project [Name#0, NumReads#4] +- Relation[Name#0,Length#1,EffectiveLength#2,TPM#3,NumReads#4] csv Any idea what my bug is? Kind regards Andy