import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import struct
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType,
StringType, DateType

spark = SparkSession.builder.appName("testme").getOrCreate()
sc = spark.sparkContext
# Set the log level to ERROR to reduce verbosity
sc.setLogLevel("ERROR")

# Define the schema
schema = StructType([
    StructField("code", IntegerType(), True),
    StructField("doc_type", StringType(), True),
    StructField("amount", IntegerType(), True),
    StructField("load_date", StringType(), True)
])

# Create the DataFrame
data = [
    [1, 'AB', 12, '2022-01-01'],
    [1, 'AA', 22, '2022-01-10'],
    [1, 'AC', 11, '2022-01-11'],
    [2, 'AB', 22, '2022-02-01'],
    [2, 'AA', 28, '2022-02-10'],
    [2, 'AC', 25, '2022-02-22']
]

df = spark.createDataFrame(data, schema=schema)

df = df.withColumn('load_date', F.to_date('load_date'))

grouped_df = df.groupBy('code')

pivot_aggs = [
    F.sum(F.when(F.col('doc_type') == doc_type,
F.col('amount'))).alias(f'{doc_type}_amnt')
    for doc_type in ['AB', 'AA', 'AC']  # Dynamically define pivot columns
]

non_pivot_aggs = [
    F.first('load_date').alias('load_date')  # Or any other aggregation
like min, max...
]

all_aggs = pivot_aggs + non_pivot_aggs

df = grouped_df.agg(*all_aggs)

df.printSchema()
df.show(20, False)

Output

root
 |-- code: integer (nullable = true)
 |-- AB_amnt: long (nullable = true)
 |-- AA_amnt: long (nullable = true)
 |-- AC_amnt: long (nullable = true)
 |-- load_date: date (nullable = true)

+----+-------+-------+-------+----------+
|code|AB_amnt|AA_amnt|AC_amnt|load_date |
+----+-------+-------+-------+----------+
|1   |12     |22     |11     |2022-01-01|
|2   |22     |28     |25     |2022-02-01|
+----+-------+-------+-------+----------+

HTH

Dr Mich Talebzadeh,
Architect | Data Science | Financial Crime | Forensic Analysis | GDPR

   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>





On Sun, 9 Mar 2025 at 17:23, Dhruv Singla <dvsingla...@gmail.com> wrote:

> Yes, this is it. I want to form this using a simple short command. The way
> I mentioned is a lengthy one.
>
> On Sun, Mar 9, 2025 at 10:16 PM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Is this what you are expecting?
>>
>> root
>>  |-- code: integer (nullable = true)
>>  |-- AB_amnt: long (nullable = true)
>>  |-- AA_amnt: long (nullable = true)
>>  |-- AC_amnt: long (nullable = true)
>>  |-- load_date: date (nullable = true)
>>
>> +----+-------+-------+-------+----------+
>> |code|AB_amnt|AA_amnt|AC_amnt|load_date |
>> +----+-------+-------+-------+----------+
>> |1   |12     |22     |11     |2022-01-01|
>> |2   |22     |28     |25     |2022-02-01|
>> +----+-------+-------+-------+----------+
>>
>> Dr Mich Talebzadeh,
>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>>
>>
>> On Sun, 9 Mar 2025 at 14:12, Dhruv Singla <dvsingla...@gmail.com> wrote:
>>
>>> Hi Everyone
>>>
>>> Hope you are doing well
>>>
>>> I have the following dataframe.
>>>
>>> df = spark.createDataFrame(
>>>     [
>>>         [1, 'AB', 12, '2022-01-01']
>>>         , [1, 'AA', 22, '2022-01-10']
>>>         , [1, 'AC', 11, '2022-01-11']
>>>         , [2, 'AB', 22, '2022-02-01']
>>>         , [2, 'AA', 28, '2022-02-10']
>>>         , [2, 'AC', 25, '2022-02-22']
>>>     ]
>>>     , 'code: int, doc_type: string, amount: int, load_date: string'
>>> )
>>> df = df.withColumn('load_date', F.to_date('load_date'))
>>>
>>> I want to pivot the amount but just want the first value from the date.
>>> This is what I tried and it is not giving me the desried results.
>>>
>>> (
>>>     df.groupBy('code')
>>>     .pivot('doc_type', ['AB', 'AA', 'AC'])
>>>     .agg(F.sum('amount').alias('amnt'),
>>> F.first('load_date').alias('ldt'))
>>>     .show()
>>> )
>>>
>>> +----+-------+----------+-------+----------+-------+----------+
>>> |code|AB_amnt|    AB_ldt|AA_amnt|    AA_ldt|AC_amnt|    AC_ldt|
>>> +----+-------+----------+-------+----------+-------+----------+
>>> |   1|     12|2022-01-01|     22|2022-01-10|     11|2022-01-11|
>>> |   2|     22|2022-02-01|     28|2022-02-10|     25|2022-02-22|
>>> +----+-------+----------+-------+----------+-------+----------+
>>>
>>> This is what I want.
>>>
>>> (
>>>     df.groupBy('code')
>>>     .agg(
>>>         F.sum(F.when(F.col('doc_type') == 'AB',
>>> F.col('amount'))).alias('AB_amnt')
>>>         , F.sum(F.when(F.col('doc_type') == 'AA',
>>> F.col('amount'))).alias('AA_amnt')
>>>         , F.sum(F.when(F.col('doc_type') == 'AC',
>>> F.col('amount'))).alias('AC_amnt')
>>>         , F.first('load_date').alias('load_date')
>>>     )
>>>     .show()
>>> )
>>>
>>> +----+-------+-------+-------+----------+
>>> |code|AB_amnt|AA_amnt|AC_amnt| load_date|
>>> +----+-------+-------+-------+----------+
>>> |   1|     12|     22|     11|2022-01-01|
>>> |   2|     22|     28|     25|2022-02-01|
>>> +----+-------+-------+-------+----------+
>>>
>>> Is there any simpler way to do it? I have more than one column to put
>>> into pivot and also to put into non pivot.
>>>
>>> I am using Databricks 14.3 LTS with Spark 3.5.0
>>>
>>> Thanks & Regards
>>> Dhruv
>>>
>>

Reply via email to