Hi, yeah I think that in practice you will always find that dataframes can give issues regarding a lot of things, and then you can argue. In the SPARK conference, I think last year, it was shown that more than 92% or 95% use the SPARK SQL API, if I am not mistaken.
I think that you can do the entire processing at one single go. Can you please write down the end to end SQL and share without the 16000 iterations? Regards, Gourav Sengupta On Fri, Dec 24, 2021 at 5:16 PM Andrew Davidson <aedav...@ucsc.edu> wrote: > Hi Sean and Gourav > > > > Thanks for the suggestions. I thought that both the sql and dataframe apis > are wrappers around the same frame work? Ie. catalysts. > > > > I tend to mix and match my code. Sometimes I find it easier to write using > sql some times dataframes. What is considered best practices? > > > > Here is an example > > > > Case 1 > > for i in range( 1, len( self.sampleNamesList ) ): # iterate 16000 > times! > > sampleName = self.sampleNamesList[i] > > sampleDF= quantSparkDFList[i] > > sampleSDF.createOrReplaceTempView( "sample" ) > > > > # the sample name must be quoted else column names with a '-' > > # like GTEX-1117F-0426-SM-5EGHI will generate an error > > # spark thinks the '-' is an expression. '_' is also > > # a special char for the sql like operator > > # https://stackoverflow.com/a/63899306/4586180 > > sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\ > > from \n\ > > rawCounts as rc, \n\ > > sample \n\ > > where \n\ > > rc.Name == sample.Name \n'.format( > sampleName ) > > > > rawCountsSDF = self.spark.sql( sqlStmt ) > > rawCountsSDF.createOrReplaceTempView( "rawCounts" > > > > case 2 > > for i in range( 1, len(dfList) ): > > df2 = dfList[i] > > retDF = retDF.join( df2.selectExpr("*"), > on=["Name"] ) > > > > > > I think my out of memory exception maybe because the query plan is huge. I > have not figure out how to figure out if that is my bug or not. My untested > work around is organize the data so that each massive join is run on 1/5 of > the total data set, then union them all together. Each “part” will still > need to iterate 16000 times > > > > In general I assume we want to avoid for loops. I assume Spark is unable > to optimize them. It would be nice if spark provide some sort of join all > function even if it used a for loop to hide this from me > > > > Happy holidays > > > > Andy > > > > > > > > *From: *Sean Owen <sro...@gmail.com> > *Date: *Friday, December 24, 2021 at 8:30 AM > *To: *Gourav Sengupta <gourav.sengu...@gmail.com> > *Cc: *Andrew Davidson <aedav...@ucsc.edu.invalid>, Nicholas Gustafson < > njgustaf...@gmail.com>, User <user@spark.apache.org> > *Subject: *Re: AnalysisException: Trouble using select() to append > multiple columns > > > > (that's not the situation below we are commenting on) > > On Fri, Dec 24, 2021, 9:28 AM Gourav Sengupta <gourav.sengu...@gmail.com> > wrote: > > Hi, > > > > try to write several withColumns in a dataframe with functions and then > see the UI for time differences. This should be done with large data sets > of course, in order of a around 200GB + > > > > With scenarios involving nested queries and joins the time differences > shown in the UI becomes a bit more visible. > > > > Regards, > > Gourav Sengupta > > > > On Fri, Dec 24, 2021 at 2:48 PM Sean Owen <sro...@gmail.com> wrote: > > Nah, it's going to translate to the same plan as the equivalent SQL. > > > > On Fri, Dec 24, 2021, 5:09 AM Gourav Sengupta <gourav.sengu...@gmail.com> > wrote: > > Hi, > > > > please note that using SQL is much more performant, and easier to manage > these kind of issues. You might want to look at the SPARK UI to see the > advantage of using SQL over dataframes API. > > > > > > Regards, > > Gourav Sengupta > > > > On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson <aedav...@ucsc.edu.invalid> > wrote: > > Thanks Nicholas > > > > Andy > > > > *From: *Nicholas Gustafson <njgustaf...@gmail.com> > *Date: *Friday, December 17, 2021 at 6:12 PM > *To: *Andrew Davidson <aedav...@ucsc.edu.invalid> > *Cc: *"user@spark.apache.org" <user@spark.apache.org> > *Subject: *Re: AnalysisException: Trouble using select() to append > multiple columns > > > > Since df1 and df2 are different DataFrames, you will need to use a join. > For example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”), > on=[“Name”]) > > > > On Dec 17, 2021, at 16:25, Andrew Davidson <aedav...@ucsc.edu.invalid> > wrote: > > > > Hi I am a newbie > > > > I have 16,000 data files, all files have the same number of rows and > columns. The row ids are identical and are in the same order. I want to > create a new data frame that contains the 3rd column from each data file > > > > I wrote a test program that uses a for loop and Join. It works with my > small test set. I get an OOM when I try to run using the all the data > files. I realize that join ( map reduce) is probably not a great solution > for my problem > > > > Recently I found several articles that take about the challenge with using > withColumn() and talk about how to use select() to append columns > > > > https://mungingdata.com/pyspark/select-add-columns-withcolumn/ > > > https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop > > > > I am using pyspark spark-3.1.2-bin-hadoop3.2 > > > > I wrote a little test program. It am able to append columns created using > pyspark.sql.function.lit(). I am not able to append columns from other data > frames > > > > df1 > > DataFrame[Name: string, ctrl_1: double] > > +-------+------+ > > | Name|ctrl_1| > > +-------+------+ > > | txId_1| 0.0| > > | txId_2| 11.0| > > | txId_3| 12.0| > > | txId_4| 13.0| > > | txId_5| 14.0| > > | txId_6| 15.0| > > | txId_7| 16.0| > > | txId_8| 17.0| > > | txId_9| 18.0| > > |txId_10| 19.0| > > +-------+------+ > > > > # use select to append multiple literals > > allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"), > pyf.lit("mn0").alias("y")] ) > > > > allDF3 > > DataFrame[Name: string, ctrl_1: double, x: string, y: string] > > +-------+------+---+---+ > > | Name|ctrl_1| x| y| > > +-------+------+---+---+ > > | txId_1| 0.0|abc|mn0| > > | txId_2| 11.0|abc|mn0| > > | txId_3| 12.0|abc|mn0| > > | txId_4| 13.0|abc|mn0| > > | txId_5| 14.0|abc|mn0| > > | txId_6| 15.0|abc|mn0| > > | txId_7| 16.0|abc|mn0| > > | txId_8| 17.0|abc|mn0| > > | txId_9| 18.0|abc|mn0| > > |txId_10| 19.0|abc|mn0| > > +-------+------+---+---+ > > > > df2 > > DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, > NumReads: double] > > +-------+------+---------------+----+--------+ > > | Name|Length|EffectiveLength| TPM|NumReads| > > +-------+------+---------------+----+--------+ > > | txId_1| 1500| 1234.5|12.1| 0.1| > > | txId_2| 1510| 1244.5|13.1| 11.1| > > | txId_3| 1520| 1254.5|14.1| 12.1| > > | txId_4| 1530| 1264.5|15.1| 13.1| > > | txId_5| 1540| 1274.5|16.1| 14.1| > > | txId_6| 1550| 1284.5|17.1| 15.1| > > | txId_7| 1560| 1294.5|18.1| 16.1| > > | txId_8| 1570| 1304.5|19.1| 17.1| > > | txId_9| 1580| 1314.5|20.1| 18.1| > > |txId_10| 1590| 1324.5|21.1| 19.1| > > +-------+------+---------------+----+--------+ > > > > s2Col = df2["NumReads"].alias( 'ctrl_2' ) > > print("type(s2Col) = {}".format(type(s2Col)) ) > > > > type(s2Col) = <class 'pyspark.sql.column.Column'> > > > > allDF4 = df1.select( ["*", s2Col] ) > > ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/dataframe.py > in select(self, *cols) > > * 1667* [Row(name='Alice', age=12), Row(name='Bob', age=15)] > > * 1668* """ > > -> 1669 jdf = self._jdf.select(self._jcols(*cols)) > > * 1670* return DataFrame(jdf, self.sql_ctx) > > * 1671* > > > > ../../sparkBin/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > in __call__(self, *args) > > * 1303* answer = self.gateway_client.send_command(command) > > * 1304* return_value = get_return_value( > > -> 1305 answer, self.gateway_client, self.target_id, self.name) > > * 1306* > > * 1307* for temp_arg in temp_args: > > > > ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py > in deco(*a, **kw) > > * 115* # Hide where the exception came from that shows a > non-Pythonic > > * 116* # JVM exception message. > > --> 117 raise converted from None > > * 118* else: > > * 119* raise > > > > AnalysisException: Resolved attribute(s) NumReads#14 missing from > Name#0,ctrl_1#2447 in operator !Project [Name#0, ctrl_1#2447, NumReads#14 AS > ctrl_2#2550].; > > !Project [Name#0, ctrl_1#2447, NumReads#14 AS ctrl_2#2550] > > +- Project [Name#0, NumReads#4 AS ctrl_1#2447] > > +- Project [Name#0, NumReads#4] > > +- Relation[Name#0,Length#1,EffectiveLength#2,TPM#3,NumReads#4] csv > > > > Any idea what my bug is? > > > > Kind regards > > > > Andy > >