import pyspark from pyspark import SparkConf, SparkContext from pyspark.sql import SparkSession from pyspark.sql import SQLContext from pyspark.sql.functions import struct from pyspark.sql import functions as F from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
spark = SparkSession.builder.appName("testme").getOrCreate() sc = spark.sparkContext # Set the log level to ERROR to reduce verbosity sc.setLogLevel("ERROR") # Define the schema schema = StructType([ StructField("code", IntegerType(), True), StructField("doc_type", StringType(), True), StructField("amount", IntegerType(), True), StructField("load_date", StringType(), True) ]) # Create the DataFrame data = [ [1, 'AB', 12, '2022-01-01'], [1, 'AA', 22, '2022-01-10'], [1, 'AC', 11, '2022-01-11'], [2, 'AB', 22, '2022-02-01'], [2, 'AA', 28, '2022-02-10'], [2, 'AC', 25, '2022-02-22'] ] df = spark.createDataFrame(data, schema=schema) df = df.withColumn('load_date', F.to_date('load_date')) grouped_df = df.groupBy('code') pivot_aggs = [ F.sum(F.when(F.col('doc_type') == doc_type, F.col('amount'))).alias(f'{doc_type}_amnt') for doc_type in ['AB', 'AA', 'AC'] # Dynamically define pivot columns ] non_pivot_aggs = [ F.first('load_date').alias('load_date') # Or any other aggregation like min, max... ] all_aggs = pivot_aggs + non_pivot_aggs df = grouped_df.agg(*all_aggs) df.printSchema() df.show(20, False) Output root |-- code: integer (nullable = true) |-- AB_amnt: long (nullable = true) |-- AA_amnt: long (nullable = true) |-- AC_amnt: long (nullable = true) |-- load_date: date (nullable = true) +----+-------+-------+-------+----------+ |code|AB_amnt|AA_amnt|AC_amnt|load_date | +----+-------+-------+-------+----------+ |1 |12 |22 |11 |2022-01-01| |2 |22 |28 |25 |2022-02-01| +----+-------+-------+-------+----------+ HTH Dr Mich Talebzadeh, Architect | Data Science | Financial Crime | Forensic Analysis | GDPR view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> On Sun, 9 Mar 2025 at 17:23, Dhruv Singla <dvsingla...@gmail.com> wrote: > Yes, this is it. I want to form this using a simple short command. The way > I mentioned is a lengthy one. > > On Sun, Mar 9, 2025 at 10:16 PM Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> Is this what you are expecting? >> >> root >> |-- code: integer (nullable = true) >> |-- AB_amnt: long (nullable = true) >> |-- AA_amnt: long (nullable = true) >> |-- AC_amnt: long (nullable = true) >> |-- load_date: date (nullable = true) >> >> +----+-------+-------+-------+----------+ >> |code|AB_amnt|AA_amnt|AC_amnt|load_date | >> +----+-------+-------+-------+----------+ >> |1 |12 |22 |11 |2022-01-01| >> |2 |22 |28 |25 |2022-02-01| >> +----+-------+-------+-------+----------+ >> >> Dr Mich Talebzadeh, >> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> >> >> On Sun, 9 Mar 2025 at 14:12, Dhruv Singla <dvsingla...@gmail.com> wrote: >> >>> Hi Everyone >>> >>> Hope you are doing well >>> >>> I have the following dataframe. >>> >>> df = spark.createDataFrame( >>> [ >>> [1, 'AB', 12, '2022-01-01'] >>> , [1, 'AA', 22, '2022-01-10'] >>> , [1, 'AC', 11, '2022-01-11'] >>> , [2, 'AB', 22, '2022-02-01'] >>> , [2, 'AA', 28, '2022-02-10'] >>> , [2, 'AC', 25, '2022-02-22'] >>> ] >>> , 'code: int, doc_type: string, amount: int, load_date: string' >>> ) >>> df = df.withColumn('load_date', F.to_date('load_date')) >>> >>> I want to pivot the amount but just want the first value from the date. >>> This is what I tried and it is not giving me the desried results. >>> >>> ( >>> df.groupBy('code') >>> .pivot('doc_type', ['AB', 'AA', 'AC']) >>> .agg(F.sum('amount').alias('amnt'), >>> F.first('load_date').alias('ldt')) >>> .show() >>> ) >>> >>> +----+-------+----------+-------+----------+-------+----------+ >>> |code|AB_amnt| AB_ldt|AA_amnt| AA_ldt|AC_amnt| AC_ldt| >>> +----+-------+----------+-------+----------+-------+----------+ >>> | 1| 12|2022-01-01| 22|2022-01-10| 11|2022-01-11| >>> | 2| 22|2022-02-01| 28|2022-02-10| 25|2022-02-22| >>> +----+-------+----------+-------+----------+-------+----------+ >>> >>> This is what I want. >>> >>> ( >>> df.groupBy('code') >>> .agg( >>> F.sum(F.when(F.col('doc_type') == 'AB', >>> F.col('amount'))).alias('AB_amnt') >>> , F.sum(F.when(F.col('doc_type') == 'AA', >>> F.col('amount'))).alias('AA_amnt') >>> , F.sum(F.when(F.col('doc_type') == 'AC', >>> F.col('amount'))).alias('AC_amnt') >>> , F.first('load_date').alias('load_date') >>> ) >>> .show() >>> ) >>> >>> +----+-------+-------+-------+----------+ >>> |code|AB_amnt|AA_amnt|AC_amnt| load_date| >>> +----+-------+-------+-------+----------+ >>> | 1| 12| 22| 11|2022-01-01| >>> | 2| 22| 28| 25|2022-02-01| >>> +----+-------+-------+-------+----------+ >>> >>> Is there any simpler way to do it? I have more than one column to put >>> into pivot and also to put into non pivot. >>> >>> I am using Databricks 14.3 LTS with Spark 3.5.0 >>> >>> Thanks & Regards >>> Dhruv >>> >>