Something like this use listcomprihension doc_types = ["AB", "AA", "AC"]
result = df.groupBy("code").agg( *[F.sum(F.when(F.col("doc_type") == dt, F.col("amount"))).alias(f"{dt}_amnt") for dt in doc_types], F.first("load_date").alias("load_date") ) and it dont use pivot for it. søn. 9. mars 2025 kl. 21:23 skrev Mich Talebzadeh <mich.talebza...@gmail.com >: > Well I tried using windowing functions with pivot() and it did not work. > From your reply, you are looking for a function that would ideally combine > the conciseness of pivot() with the flexibility of explicit aggregations. > While Spark provides powerful tools, there is not a single built-in > function that perfectly encapsulates this. > > The existing approach that I mentioned (i.e. grouping with conditional > aggregations) is the recommended way to achieve this because it prioritizes > things like clarity and control. > From my experience > > - Readability Matters: Explicit code is often better than overly > complex or obscure code. As the famous saying goes, there are two ways of > constructing a software design: One way is to make it so simple that there > are obviously no deficiencies and the other way is to make it so > complicated that there are no obvious deficiencies. > - You need control over how each column is aggregated. > - Spark's query optimizer is generally good at handling aggregations, > even with when() conditions. > > Therefore, while the verbosity of the current solution might seem less > than ideal, it is the most practical and reliable way to achieve your > specific requirements in Spark. > > HTH > > > <https://medium.com/@manutej/mastering-sql-window-functions-guide-e6dc17eb1995#:~:text=Window%20functions%20can%20perform%20a,related%20to%20the%20current%20row.> > Dr Mich Talebzadeh, > Architect | Data Science | Financial Crime | Forensic Analysis | GDPR > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > > > On Sun, 9 Mar 2025 at 18:25, Dhruv Singla <dvsingla...@gmail.com> wrote: > >> Hey, I already know this and have written the same in my question. I know >> formatting can make the code a lot simpler and easier to understand, but >> I'm looking if there is already a function or a spark built-in for this. >> Thanks for the help though. >> >> On Sun, Mar 9, 2025 at 11:42 PM Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> import pyspark >>> from pyspark import SparkConf, SparkContext >>> from pyspark.sql import SparkSession >>> from pyspark.sql import SQLContext >>> from pyspark.sql.functions import struct >>> from pyspark.sql import functions as F >>> from pyspark.sql.types import StructType, StructField, IntegerType, >>> StringType, DateType >>> >>> spark = SparkSession.builder.appName("testme").getOrCreate() >>> sc = spark.sparkContext >>> # Set the log level to ERROR to reduce verbosity >>> sc.setLogLevel("ERROR") >>> >>> # Define the schema >>> schema = StructType([ >>> StructField("code", IntegerType(), True), >>> StructField("doc_type", StringType(), True), >>> StructField("amount", IntegerType(), True), >>> StructField("load_date", StringType(), True) >>> ]) >>> >>> # Create the DataFrame >>> data = [ >>> [1, 'AB', 12, '2022-01-01'], >>> [1, 'AA', 22, '2022-01-10'], >>> [1, 'AC', 11, '2022-01-11'], >>> [2, 'AB', 22, '2022-02-01'], >>> [2, 'AA', 28, '2022-02-10'], >>> [2, 'AC', 25, '2022-02-22'] >>> ] >>> >>> df = spark.createDataFrame(data, schema=schema) >>> >>> df = df.withColumn('load_date', F.to_date('load_date')) >>> >>> grouped_df = df.groupBy('code') >>> >>> pivot_aggs = [ >>> F.sum(F.when(F.col('doc_type') == doc_type, >>> F.col('amount'))).alias(f'{doc_type}_amnt') >>> for doc_type in ['AB', 'AA', 'AC'] # Dynamically define pivot >>> columns >>> ] >>> >>> non_pivot_aggs = [ >>> F.first('load_date').alias('load_date') # Or any other aggregation >>> like min, max... >>> ] >>> >>> all_aggs = pivot_aggs + non_pivot_aggs >>> >>> df = grouped_df.agg(*all_aggs) >>> >>> df.printSchema() >>> df.show(20, False) >>> >>> Output >>> >>> root >>> |-- code: integer (nullable = true) >>> |-- AB_amnt: long (nullable = true) >>> |-- AA_amnt: long (nullable = true) >>> |-- AC_amnt: long (nullable = true) >>> |-- load_date: date (nullable = true) >>> >>> +----+-------+-------+-------+----------+ >>> |code|AB_amnt|AA_amnt|AC_amnt|load_date | >>> +----+-------+-------+-------+----------+ >>> |1 |12 |22 |11 |2022-01-01| >>> |2 |22 |28 |25 |2022-02-01| >>> +----+-------+-------+-------+----------+ >>> >>> HTH >>> >>> Dr Mich Talebzadeh, >>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> >>> >>> >>> On Sun, 9 Mar 2025 at 17:23, Dhruv Singla <dvsingla...@gmail.com> wrote: >>> >>>> Yes, this is it. I want to form this using a simple short command. The >>>> way I mentioned is a lengthy one. >>>> >>>> On Sun, Mar 9, 2025 at 10:16 PM Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> Is this what you are expecting? >>>>> >>>>> root >>>>> |-- code: integer (nullable = true) >>>>> |-- AB_amnt: long (nullable = true) >>>>> |-- AA_amnt: long (nullable = true) >>>>> |-- AC_amnt: long (nullable = true) >>>>> |-- load_date: date (nullable = true) >>>>> >>>>> +----+-------+-------+-------+----------+ >>>>> |code|AB_amnt|AA_amnt|AC_amnt|load_date | >>>>> +----+-------+-------+-------+----------+ >>>>> |1 |12 |22 |11 |2022-01-01| >>>>> |2 |22 |28 |25 |2022-02-01| >>>>> +----+-------+-------+-------+----------+ >>>>> >>>>> Dr Mich Talebzadeh, >>>>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Sun, 9 Mar 2025 at 14:12, Dhruv Singla <dvsingla...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Everyone >>>>>> >>>>>> Hope you are doing well >>>>>> >>>>>> I have the following dataframe. >>>>>> >>>>>> df = spark.createDataFrame( >>>>>> [ >>>>>> [1, 'AB', 12, '2022-01-01'] >>>>>> , [1, 'AA', 22, '2022-01-10'] >>>>>> , [1, 'AC', 11, '2022-01-11'] >>>>>> , [2, 'AB', 22, '2022-02-01'] >>>>>> , [2, 'AA', 28, '2022-02-10'] >>>>>> , [2, 'AC', 25, '2022-02-22'] >>>>>> ] >>>>>> , 'code: int, doc_type: string, amount: int, load_date: string' >>>>>> ) >>>>>> df = df.withColumn('load_date', F.to_date('load_date')) >>>>>> >>>>>> I want to pivot the amount but just want the first value from the >>>>>> date. This is what I tried and it is not giving me the desried results. >>>>>> >>>>>> ( >>>>>> df.groupBy('code') >>>>>> .pivot('doc_type', ['AB', 'AA', 'AC']) >>>>>> .agg(F.sum('amount').alias('amnt'), >>>>>> F.first('load_date').alias('ldt')) >>>>>> .show() >>>>>> ) >>>>>> >>>>>> +----+-------+----------+-------+----------+-------+----------+ >>>>>> |code|AB_amnt| AB_ldt|AA_amnt| AA_ldt|AC_amnt| AC_ldt| >>>>>> +----+-------+----------+-------+----------+-------+----------+ >>>>>> | 1| 12|2022-01-01| 22|2022-01-10| 11|2022-01-11| >>>>>> | 2| 22|2022-02-01| 28|2022-02-10| 25|2022-02-22| >>>>>> +----+-------+----------+-------+----------+-------+----------+ >>>>>> >>>>>> This is what I want. >>>>>> >>>>>> ( >>>>>> df.groupBy('code') >>>>>> .agg( >>>>>> F.sum(F.when(F.col('doc_type') == 'AB', >>>>>> F.col('amount'))).alias('AB_amnt') >>>>>> , F.sum(F.when(F.col('doc_type') == 'AA', >>>>>> F.col('amount'))).alias('AA_amnt') >>>>>> , F.sum(F.when(F.col('doc_type') == 'AC', >>>>>> F.col('amount'))).alias('AC_amnt') >>>>>> , F.first('load_date').alias('load_date') >>>>>> ) >>>>>> .show() >>>>>> ) >>>>>> >>>>>> +----+-------+-------+-------+----------+ >>>>>> |code|AB_amnt|AA_amnt|AC_amnt| load_date| >>>>>> +----+-------+-------+-------+----------+ >>>>>> | 1| 12| 22| 11|2022-01-01| >>>>>> | 2| 22| 28| 25|2022-02-01| >>>>>> +----+-------+-------+-------+----------+ >>>>>> >>>>>> Is there any simpler way to do it? I have more than one column to put >>>>>> into pivot and also to put into non pivot. >>>>>> >>>>>> I am using Databricks 14.3 LTS with Spark 3.5.0 >>>>>> >>>>>> Thanks & Regards >>>>>> Dhruv >>>>>> >>>>> -- Bjørn Jørgensen Vestre Aspehaug 4, 6010 Ålesund Norge +47 480 94 297