Something like this
use listcomprihension
doc_types = ["AB", "AA", "AC"]
result = df.groupBy("code").agg(
*[F.sum(F.when(F.col("doc_type") == dt,
F.col("amount"))).alias(f"{dt}_amnt")
for dt in doc_types],
F.first("load_date").alias("load_date")
)
and it dont use pivot for it.
søn. 9. mars 2025 kl. 21:23 skrev Mich Talebzadeh <[email protected]
>:
> Well I tried using windowing functions with pivot() and it did not work.
> From your reply, you are looking for a function that would ideally combine
> the conciseness of pivot() with the flexibility of explicit aggregations.
> While Spark provides powerful tools, there is not a single built-in
> function that perfectly encapsulates this.
>
> The existing approach that I mentioned (i.e. grouping with conditional
> aggregations) is the recommended way to achieve this because it prioritizes
> things like clarity and control.
> From my experience
>
> - Readability Matters: Explicit code is often better than overly
> complex or obscure code. As the famous saying goes, there are two ways of
> constructing a software design: One way is to make it so simple that there
> are obviously no deficiencies and the other way is to make it so
> complicated that there are no obvious deficiencies.
> - You need control over how each column is aggregated.
> - Spark's query optimizer is generally good at handling aggregations,
> even with when() conditions.
>
> Therefore, while the verbosity of the current solution might seem less
> than ideal, it is the most practical and reliable way to achieve your
> specific requirements in Spark.
>
> HTH
>
>
> <https://medium.com/@manutej/mastering-sql-window-functions-guide-e6dc17eb1995#:~:text=Window%20functions%20can%20perform%20a,related%20to%20the%20current%20row.>
> Dr Mich Talebzadeh,
> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR
>
> view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
>
>
> On Sun, 9 Mar 2025 at 18:25, Dhruv Singla <[email protected]> wrote:
>
>> Hey, I already know this and have written the same in my question. I know
>> formatting can make the code a lot simpler and easier to understand, but
>> I'm looking if there is already a function or a spark built-in for this.
>> Thanks for the help though.
>>
>> On Sun, Mar 9, 2025 at 11:42 PM Mich Talebzadeh <
>> [email protected]> wrote:
>>
>>> import pyspark
>>> from pyspark import SparkConf, SparkContext
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql import SQLContext
>>> from pyspark.sql.functions import struct
>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import StructType, StructField, IntegerType,
>>> StringType, DateType
>>>
>>> spark = SparkSession.builder.appName("testme").getOrCreate()
>>> sc = spark.sparkContext
>>> # Set the log level to ERROR to reduce verbosity
>>> sc.setLogLevel("ERROR")
>>>
>>> # Define the schema
>>> schema = StructType([
>>> StructField("code", IntegerType(), True),
>>> StructField("doc_type", StringType(), True),
>>> StructField("amount", IntegerType(), True),
>>> StructField("load_date", StringType(), True)
>>> ])
>>>
>>> # Create the DataFrame
>>> data = [
>>> [1, 'AB', 12, '2022-01-01'],
>>> [1, 'AA', 22, '2022-01-10'],
>>> [1, 'AC', 11, '2022-01-11'],
>>> [2, 'AB', 22, '2022-02-01'],
>>> [2, 'AA', 28, '2022-02-10'],
>>> [2, 'AC', 25, '2022-02-22']
>>> ]
>>>
>>> df = spark.createDataFrame(data, schema=schema)
>>>
>>> df = df.withColumn('load_date', F.to_date('load_date'))
>>>
>>> grouped_df = df.groupBy('code')
>>>
>>> pivot_aggs = [
>>> F.sum(F.when(F.col('doc_type') == doc_type,
>>> F.col('amount'))).alias(f'{doc_type}_amnt')
>>> for doc_type in ['AB', 'AA', 'AC'] # Dynamically define pivot
>>> columns
>>> ]
>>>
>>> non_pivot_aggs = [
>>> F.first('load_date').alias('load_date') # Or any other aggregation
>>> like min, max...
>>> ]
>>>
>>> all_aggs = pivot_aggs + non_pivot_aggs
>>>
>>> df = grouped_df.agg(*all_aggs)
>>>
>>> df.printSchema()
>>> df.show(20, False)
>>>
>>> Output
>>>
>>> root
>>> |-- code: integer (nullable = true)
>>> |-- AB_amnt: long (nullable = true)
>>> |-- AA_amnt: long (nullable = true)
>>> |-- AC_amnt: long (nullable = true)
>>> |-- load_date: date (nullable = true)
>>>
>>> +----+-------+-------+-------+----------+
>>> |code|AB_amnt|AA_amnt|AC_amnt|load_date |
>>> +----+-------+-------+-------+----------+
>>> |1 |12 |22 |11 |2022-01-01|
>>> |2 |22 |28 |25 |2022-02-01|
>>> +----+-------+-------+-------+----------+
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh,
>>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR
>>>
>>> view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>>
>>>
>>> On Sun, 9 Mar 2025 at 17:23, Dhruv Singla <[email protected]> wrote:
>>>
>>>> Yes, this is it. I want to form this using a simple short command. The
>>>> way I mentioned is a lengthy one.
>>>>
>>>> On Sun, Mar 9, 2025 at 10:16 PM Mich Talebzadeh <
>>>> [email protected]> wrote:
>>>>
>>>>> Is this what you are expecting?
>>>>>
>>>>> root
>>>>> |-- code: integer (nullable = true)
>>>>> |-- AB_amnt: long (nullable = true)
>>>>> |-- AA_amnt: long (nullable = true)
>>>>> |-- AC_amnt: long (nullable = true)
>>>>> |-- load_date: date (nullable = true)
>>>>>
>>>>> +----+-------+-------+-------+----------+
>>>>> |code|AB_amnt|AA_amnt|AC_amnt|load_date |
>>>>> +----+-------+-------+-------+----------+
>>>>> |1 |12 |22 |11 |2022-01-01|
>>>>> |2 |22 |28 |25 |2022-02-01|
>>>>> +----+-------+-------+-------+----------+
>>>>>
>>>>> Dr Mich Talebzadeh,
>>>>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR
>>>>>
>>>>> view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sun, 9 Mar 2025 at 14:12, Dhruv Singla <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Everyone
>>>>>>
>>>>>> Hope you are doing well
>>>>>>
>>>>>> I have the following dataframe.
>>>>>>
>>>>>> df = spark.createDataFrame(
>>>>>> [
>>>>>> [1, 'AB', 12, '2022-01-01']
>>>>>> , [1, 'AA', 22, '2022-01-10']
>>>>>> , [1, 'AC', 11, '2022-01-11']
>>>>>> , [2, 'AB', 22, '2022-02-01']
>>>>>> , [2, 'AA', 28, '2022-02-10']
>>>>>> , [2, 'AC', 25, '2022-02-22']
>>>>>> ]
>>>>>> , 'code: int, doc_type: string, amount: int, load_date: string'
>>>>>> )
>>>>>> df = df.withColumn('load_date', F.to_date('load_date'))
>>>>>>
>>>>>> I want to pivot the amount but just want the first value from the
>>>>>> date. This is what I tried and it is not giving me the desried results.
>>>>>>
>>>>>> (
>>>>>> df.groupBy('code')
>>>>>> .pivot('doc_type', ['AB', 'AA', 'AC'])
>>>>>> .agg(F.sum('amount').alias('amnt'),
>>>>>> F.first('load_date').alias('ldt'))
>>>>>> .show()
>>>>>> )
>>>>>>
>>>>>> +----+-------+----------+-------+----------+-------+----------+
>>>>>> |code|AB_amnt| AB_ldt|AA_amnt| AA_ldt|AC_amnt| AC_ldt|
>>>>>> +----+-------+----------+-------+----------+-------+----------+
>>>>>> | 1| 12|2022-01-01| 22|2022-01-10| 11|2022-01-11|
>>>>>> | 2| 22|2022-02-01| 28|2022-02-10| 25|2022-02-22|
>>>>>> +----+-------+----------+-------+----------+-------+----------+
>>>>>>
>>>>>> This is what I want.
>>>>>>
>>>>>> (
>>>>>> df.groupBy('code')
>>>>>> .agg(
>>>>>> F.sum(F.when(F.col('doc_type') == 'AB',
>>>>>> F.col('amount'))).alias('AB_amnt')
>>>>>> , F.sum(F.when(F.col('doc_type') == 'AA',
>>>>>> F.col('amount'))).alias('AA_amnt')
>>>>>> , F.sum(F.when(F.col('doc_type') == 'AC',
>>>>>> F.col('amount'))).alias('AC_amnt')
>>>>>> , F.first('load_date').alias('load_date')
>>>>>> )
>>>>>> .show()
>>>>>> )
>>>>>>
>>>>>> +----+-------+-------+-------+----------+
>>>>>> |code|AB_amnt|AA_amnt|AC_amnt| load_date|
>>>>>> +----+-------+-------+-------+----------+
>>>>>> | 1| 12| 22| 11|2022-01-01|
>>>>>> | 2| 22| 28| 25|2022-02-01|
>>>>>> +----+-------+-------+-------+----------+
>>>>>>
>>>>>> Is there any simpler way to do it? I have more than one column to put
>>>>>> into pivot and also to put into non pivot.
>>>>>>
>>>>>> I am using Databricks 14.3 LTS with Spark 3.5.0
>>>>>>
>>>>>> Thanks & Regards
>>>>>> Dhruv
>>>>>>
>>>>>
--
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge
+47 480 94 297