(that's not the situation below we are commenting on) On Fri, Dec 24, 2021, 9:28 AM Gourav Sengupta <gourav.sengu...@gmail.com> wrote:
> Hi, > > try to write several withColumns in a dataframe with functions and then > see the UI for time differences. This should be done with large data sets > of course, in order of a around 200GB + > > With scenarios involving nested queries and joins the time differences > shown in the UI becomes a bit more visible. > > Regards, > Gourav Sengupta > > On Fri, Dec 24, 2021 at 2:48 PM Sean Owen <sro...@gmail.com> wrote: > >> Nah, it's going to translate to the same plan as the equivalent SQL. >> >> On Fri, Dec 24, 2021, 5:09 AM Gourav Sengupta <gourav.sengu...@gmail.com> >> wrote: >> >>> Hi, >>> >>> please note that using SQL is much more performant, and easier to manage >>> these kind of issues. You might want to look at the SPARK UI to see the >>> advantage of using SQL over dataframes API. >>> >>> >>> Regards, >>> Gourav Sengupta >>> >>> On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson >>> <aedav...@ucsc.edu.invalid> wrote: >>> >>>> Thanks Nicholas >>>> >>>> >>>> >>>> Andy >>>> >>>> >>>> >>>> *From: *Nicholas Gustafson <njgustaf...@gmail.com> >>>> *Date: *Friday, December 17, 2021 at 6:12 PM >>>> *To: *Andrew Davidson <aedav...@ucsc.edu.invalid> >>>> *Cc: *"user@spark.apache.org" <user@spark.apache.org> >>>> *Subject: *Re: AnalysisException: Trouble using select() to append >>>> multiple columns >>>> >>>> >>>> >>>> Since df1 and df2 are different DataFrames, you will need to use a >>>> join. For example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”), >>>> on=[“Name”]) >>>> >>>> >>>> >>>> On Dec 17, 2021, at 16:25, Andrew Davidson <aedav...@ucsc.edu.invalid> >>>> wrote: >>>> >>>> >>>> >>>> Hi I am a newbie >>>> >>>> >>>> >>>> I have 16,000 data files, all files have the same number of rows and >>>> columns. The row ids are identical and are in the same order. I want to >>>> create a new data frame that contains the 3rd column from each data >>>> file >>>> >>>> >>>> >>>> I wrote a test program that uses a for loop and Join. It works with my >>>> small test set. I get an OOM when I try to run using the all the data >>>> files. I realize that join ( map reduce) is probably not a great solution >>>> for my problem >>>> >>>> >>>> >>>> Recently I found several articles that take about the challenge with >>>> using withColumn() and talk about how to use select() to append columns >>>> >>>> >>>> >>>> https://mungingdata.com/pyspark/select-add-columns-withcolumn/ >>>> >>>> >>>> https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop >>>> >>>> >>>> >>>> I am using pyspark spark-3.1.2-bin-hadoop3.2 >>>> >>>> >>>> >>>> I wrote a little test program. It am able to append columns created >>>> using pyspark.sql.function.lit(). I am not able to append columns from >>>> other data frames >>>> >>>> >>>> >>>> df1 >>>> >>>> DataFrame[Name: string, ctrl_1: double] >>>> >>>> +-------+------+ >>>> >>>> | Name|ctrl_1| >>>> >>>> +-------+------+ >>>> >>>> | txId_1| 0.0| >>>> >>>> | txId_2| 11.0| >>>> >>>> | txId_3| 12.0| >>>> >>>> | txId_4| 13.0| >>>> >>>> | txId_5| 14.0| >>>> >>>> | txId_6| 15.0| >>>> >>>> | txId_7| 16.0| >>>> >>>> | txId_8| 17.0| >>>> >>>> | txId_9| 18.0| >>>> >>>> |txId_10| 19.0| >>>> >>>> +-------+------+ >>>> >>>> >>>> >>>> # use select to append multiple literals >>>> >>>> allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"), >>>> pyf.lit("mn0").alias("y")] ) >>>> >>>> >>>> >>>> allDF3 >>>> >>>> DataFrame[Name: string, ctrl_1: double, x: string, y: string] >>>> >>>> +-------+------+---+---+ >>>> >>>> | Name|ctrl_1| x| y| >>>> >>>> +-------+------+---+---+ >>>> >>>> | txId_1| 0.0|abc|mn0| >>>> >>>> | txId_2| 11.0|abc|mn0| >>>> >>>> | txId_3| 12.0|abc|mn0| >>>> >>>> | txId_4| 13.0|abc|mn0| >>>> >>>> | txId_5| 14.0|abc|mn0| >>>> >>>> | txId_6| 15.0|abc|mn0| >>>> >>>> | txId_7| 16.0|abc|mn0| >>>> >>>> | txId_8| 17.0|abc|mn0| >>>> >>>> | txId_9| 18.0|abc|mn0| >>>> >>>> |txId_10| 19.0|abc|mn0| >>>> >>>> +-------+------+---+---+ >>>> >>>> >>>> >>>> df2 >>>> >>>> DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, >>>> NumReads: double] >>>> >>>> +-------+------+---------------+----+--------+ >>>> >>>> | Name|Length|EffectiveLength| TPM|NumReads| >>>> >>>> +-------+------+---------------+----+--------+ >>>> >>>> | txId_1| 1500| 1234.5|12.1| 0.1| >>>> >>>> | txId_2| 1510| 1244.5|13.1| 11.1| >>>> >>>> | txId_3| 1520| 1254.5|14.1| 12.1| >>>> >>>> | txId_4| 1530| 1264.5|15.1| 13.1| >>>> >>>> | txId_5| 1540| 1274.5|16.1| 14.1| >>>> >>>> | txId_6| 1550| 1284.5|17.1| 15.1| >>>> >>>> | txId_7| 1560| 1294.5|18.1| 16.1| >>>> >>>> | txId_8| 1570| 1304.5|19.1| 17.1| >>>> >>>> | txId_9| 1580| 1314.5|20.1| 18.1| >>>> >>>> |txId_10| 1590| 1324.5|21.1| 19.1| >>>> >>>> +-------+------+---------------+----+--------+ >>>> >>>> >>>> >>>> s2Col = df2["NumReads"].alias( 'ctrl_2' ) >>>> >>>> print("type(s2Col) = {}".format(type(s2Col)) ) >>>> >>>> >>>> >>>> type(s2Col) = <class 'pyspark.sql.column.Column'> >>>> >>>> >>>> >>>> allDF4 = df1.select( ["*", s2Col] ) >>>> >>>> ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/dataframe.py >>>> in select(self, *cols) >>>> >>>> * 1667* [Row(name='Alice', age=12), Row(name='Bob', age=15)] >>>> >>>> * 1668* """ >>>> >>>> -> 1669 jdf = self._jdf.select(self._jcols(*cols)) >>>> >>>> * 1670* return DataFrame(jdf, self.sql_ctx) >>>> >>>> * 1671* >>>> >>>> >>>> >>>> ../../sparkBin/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >>>> in __call__(self, *args) >>>> >>>> * 1303* answer = self.gateway_client.send_command(command) >>>> >>>> * 1304* return_value = get_return_value( >>>> >>>> -> 1305 answer, self.gateway_client, self.target_id, self.name) >>>> >>>> * 1306* >>>> >>>> * 1307* for temp_arg in temp_args: >>>> >>>> >>>> >>>> ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py >>>> in deco(*a, **kw) >>>> >>>> * 115* # Hide where the exception came from that shows >>>> a non-Pythonic >>>> >>>> * 116* # JVM exception message. >>>> >>>> --> 117 raise converted from None >>>> >>>> * 118* else: >>>> >>>> * 119* raise >>>> >>>> >>>> >>>> AnalysisException: Resolved attribute(s) NumReads#14 missing from >>>> Name#0,ctrl_1#2447 in operator !Project [Name#0, ctrl_1#2447, NumReads#14 >>>> AS ctrl_2#2550].; >>>> >>>> !Project [Name#0, ctrl_1#2447, NumReads#14 AS ctrl_2#2550] >>>> >>>> +- Project [Name#0, NumReads#4 AS ctrl_1#2447] >>>> >>>> +- Project [Name#0, NumReads#4] >>>> >>>> +- Relation[Name#0,Length#1,EffectiveLength#2,TPM#3,NumReads#4] csv >>>> >>>> >>>> >>>> Any idea what my bug is? >>>> >>>> >>>> >>>> Kind regards >>>> >>>> >>>> >>>> Andy >>>> >>>>