Something like this
use listcomprihension

doc_types = ["AB", "AA", "AC"]

result = df.groupBy("code").agg(
    *[F.sum(F.when(F.col("doc_type") == dt,
F.col("amount"))).alias(f"{dt}_amnt")
      for dt in doc_types],
    F.first("load_date").alias("load_date")
)
and it dont use pivot for it.


søn. 9. mars 2025 kl. 21:23 skrev Mich Talebzadeh <mich.talebza...@gmail.com
>:

> Well I tried using windowing functions with pivot() and it did not work.
> From your reply, you are looking for a function that would ideally combine
> the conciseness of pivot() with the flexibility of explicit aggregations.
> While Spark provides powerful tools, there is not a single built-in
> function that perfectly encapsulates this.
>
> The existing approach that I mentioned (i.e. grouping with conditional
> aggregations) is the recommended way to achieve this because it prioritizes
> things like clarity and control.
> From my experience
>
>    - Readability Matters: Explicit code is often better than overly
>    complex or obscure code. As the famous saying goes, there are two ways of
>    constructing a software design: One way is to make it so simple that there
>    are obviously no deficiencies and the other way is to make it so
>    complicated that there are no obvious deficiencies.
>    - You need control over how each column is aggregated.
>    - Spark's query optimizer is generally good at handling aggregations,
>    even with when() conditions.
>
> Therefore, while the verbosity of the current solution might seem less
> than ideal, it is the most practical and reliable way to achieve your
> specific requirements in Spark.
>
> HTH
>
>
> <https://medium.com/@manutej/mastering-sql-window-functions-guide-e6dc17eb1995#:~:text=Window%20functions%20can%20perform%20a,related%20to%20the%20current%20row.>
> Dr Mich Talebzadeh,
> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
>
>
> On Sun, 9 Mar 2025 at 18:25, Dhruv Singla <dvsingla...@gmail.com> wrote:
>
>> Hey, I already know this and have written the same in my question. I know
>> formatting can make the code a lot simpler and easier to understand, but
>> I'm looking if there is already a function or a spark built-in for this.
>> Thanks for the help though.
>>
>> On Sun, Mar 9, 2025 at 11:42 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> import pyspark
>>> from pyspark import SparkConf, SparkContext
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql import SQLContext
>>> from pyspark.sql.functions import struct
>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import StructType, StructField, IntegerType,
>>> StringType, DateType
>>>
>>> spark = SparkSession.builder.appName("testme").getOrCreate()
>>> sc = spark.sparkContext
>>> # Set the log level to ERROR to reduce verbosity
>>> sc.setLogLevel("ERROR")
>>>
>>> # Define the schema
>>> schema = StructType([
>>>     StructField("code", IntegerType(), True),
>>>     StructField("doc_type", StringType(), True),
>>>     StructField("amount", IntegerType(), True),
>>>     StructField("load_date", StringType(), True)
>>> ])
>>>
>>> # Create the DataFrame
>>> data = [
>>>     [1, 'AB', 12, '2022-01-01'],
>>>     [1, 'AA', 22, '2022-01-10'],
>>>     [1, 'AC', 11, '2022-01-11'],
>>>     [2, 'AB', 22, '2022-02-01'],
>>>     [2, 'AA', 28, '2022-02-10'],
>>>     [2, 'AC', 25, '2022-02-22']
>>> ]
>>>
>>> df = spark.createDataFrame(data, schema=schema)
>>>
>>> df = df.withColumn('load_date', F.to_date('load_date'))
>>>
>>> grouped_df = df.groupBy('code')
>>>
>>> pivot_aggs = [
>>>     F.sum(F.when(F.col('doc_type') == doc_type,
>>> F.col('amount'))).alias(f'{doc_type}_amnt')
>>>     for doc_type in ['AB', 'AA', 'AC']  # Dynamically define pivot
>>> columns
>>> ]
>>>
>>> non_pivot_aggs = [
>>>     F.first('load_date').alias('load_date')  # Or any other aggregation
>>> like min, max...
>>> ]
>>>
>>> all_aggs = pivot_aggs + non_pivot_aggs
>>>
>>> df = grouped_df.agg(*all_aggs)
>>>
>>> df.printSchema()
>>> df.show(20, False)
>>>
>>> Output
>>>
>>> root
>>>  |-- code: integer (nullable = true)
>>>  |-- AB_amnt: long (nullable = true)
>>>  |-- AA_amnt: long (nullable = true)
>>>  |-- AC_amnt: long (nullable = true)
>>>  |-- load_date: date (nullable = true)
>>>
>>> +----+-------+-------+-------+----------+
>>> |code|AB_amnt|AA_amnt|AC_amnt|load_date |
>>> +----+-------+-------+-------+----------+
>>> |1   |12     |22     |11     |2022-01-01|
>>> |2   |22     |28     |25     |2022-02-01|
>>> +----+-------+-------+-------+----------+
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh,
>>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>>
>>>
>>> On Sun, 9 Mar 2025 at 17:23, Dhruv Singla <dvsingla...@gmail.com> wrote:
>>>
>>>> Yes, this is it. I want to form this using a simple short command. The
>>>> way I mentioned is a lengthy one.
>>>>
>>>> On Sun, Mar 9, 2025 at 10:16 PM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Is this what you are expecting?
>>>>>
>>>>> root
>>>>>  |-- code: integer (nullable = true)
>>>>>  |-- AB_amnt: long (nullable = true)
>>>>>  |-- AA_amnt: long (nullable = true)
>>>>>  |-- AC_amnt: long (nullable = true)
>>>>>  |-- load_date: date (nullable = true)
>>>>>
>>>>> +----+-------+-------+-------+----------+
>>>>> |code|AB_amnt|AA_amnt|AC_amnt|load_date |
>>>>> +----+-------+-------+-------+----------+
>>>>> |1   |12     |22     |11     |2022-01-01|
>>>>> |2   |22     |28     |25     |2022-02-01|
>>>>> +----+-------+-------+-------+----------+
>>>>>
>>>>> Dr Mich Talebzadeh,
>>>>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sun, 9 Mar 2025 at 14:12, Dhruv Singla <dvsingla...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Everyone
>>>>>>
>>>>>> Hope you are doing well
>>>>>>
>>>>>> I have the following dataframe.
>>>>>>
>>>>>> df = spark.createDataFrame(
>>>>>>     [
>>>>>>         [1, 'AB', 12, '2022-01-01']
>>>>>>         , [1, 'AA', 22, '2022-01-10']
>>>>>>         , [1, 'AC', 11, '2022-01-11']
>>>>>>         , [2, 'AB', 22, '2022-02-01']
>>>>>>         , [2, 'AA', 28, '2022-02-10']
>>>>>>         , [2, 'AC', 25, '2022-02-22']
>>>>>>     ]
>>>>>>     , 'code: int, doc_type: string, amount: int, load_date: string'
>>>>>> )
>>>>>> df = df.withColumn('load_date', F.to_date('load_date'))
>>>>>>
>>>>>> I want to pivot the amount but just want the first value from the
>>>>>> date. This is what I tried and it is not giving me the desried results.
>>>>>>
>>>>>> (
>>>>>>     df.groupBy('code')
>>>>>>     .pivot('doc_type', ['AB', 'AA', 'AC'])
>>>>>>     .agg(F.sum('amount').alias('amnt'),
>>>>>> F.first('load_date').alias('ldt'))
>>>>>>     .show()
>>>>>> )
>>>>>>
>>>>>> +----+-------+----------+-------+----------+-------+----------+
>>>>>> |code|AB_amnt|    AB_ldt|AA_amnt|    AA_ldt|AC_amnt|    AC_ldt|
>>>>>> +----+-------+----------+-------+----------+-------+----------+
>>>>>> |   1|     12|2022-01-01|     22|2022-01-10|     11|2022-01-11|
>>>>>> |   2|     22|2022-02-01|     28|2022-02-10|     25|2022-02-22|
>>>>>> +----+-------+----------+-------+----------+-------+----------+
>>>>>>
>>>>>> This is what I want.
>>>>>>
>>>>>> (
>>>>>>     df.groupBy('code')
>>>>>>     .agg(
>>>>>>         F.sum(F.when(F.col('doc_type') == 'AB',
>>>>>> F.col('amount'))).alias('AB_amnt')
>>>>>>         , F.sum(F.when(F.col('doc_type') == 'AA',
>>>>>> F.col('amount'))).alias('AA_amnt')
>>>>>>         , F.sum(F.when(F.col('doc_type') == 'AC',
>>>>>> F.col('amount'))).alias('AC_amnt')
>>>>>>         , F.first('load_date').alias('load_date')
>>>>>>     )
>>>>>>     .show()
>>>>>> )
>>>>>>
>>>>>> +----+-------+-------+-------+----------+
>>>>>> |code|AB_amnt|AA_amnt|AC_amnt| load_date|
>>>>>> +----+-------+-------+-------+----------+
>>>>>> |   1|     12|     22|     11|2022-01-01|
>>>>>> |   2|     22|     28|     25|2022-02-01|
>>>>>> +----+-------+-------+-------+----------+
>>>>>>
>>>>>> Is there any simpler way to do it? I have more than one column to put
>>>>>> into pivot and also to put into non pivot.
>>>>>>
>>>>>> I am using Databricks 14.3 LTS with Spark 3.5.0
>>>>>>
>>>>>> Thanks & Regards
>>>>>> Dhruv
>>>>>>
>>>>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Reply via email to