Hi guys, I have this situation:
1. Data frame with 22 columns 2. I need to add some columns (feature engineering) using existing columns, 12 columns will be add by each column in list. 3. I created a loop, but in the 5 item(col) on the loop this starts to go very slow in the join part, I can observe that the execution plan is getting bigger. 4. I tried to save to parquet by each iteration, but parquet is immutable, I got an error. 5. I really appreciate any help Here the code: def create_lag_columns(df, months, columns_to_lag): columns_aggregate = [] data_with_period = df w = Window().partitionBy("idpersona").orderBy("idpersona", "fecha") for column_lag in columns_to_lag: print("Calculating lag for column: " + column_lag) # Create lag columns for i in range(1,months + 1): column_name_lag = column_lag + "_t_" + str(i) data_with_period = data_with_period.withColumn(column_name_lag, lag(column_lag, i).over(w)) columns_aggregate.append(column_name_lag) # Convert to long it's convenience to do aggregate operations df_long = data_with_period.select('idpersona', "fecha", explode(array(columns_aggregate)).alias('values')) # Aggregate operations df_agg = (df_long.groupBy("idpersona", "fecha") .agg(F.min("values").alias("min_" + column_lag), F.sum("values").alias("sum_" + column_lag), F.max("values").alias("max_" + column_lag), F.avg("values").alias("avg_" + column_lag), F.count("values").alias("count_" + column_lag), F.stddev("values").alias("std_" + column_lag)) ) # Merge with result data_with_period = (data_with_period.join(df_agg, ['idpersona', "fecha"])) # Set null for next loop columns_aggregate = [] return data_with_period ----------------------------------------- lag_columns = ["indice_charlson", "pam", "framingham", "tfg", "perimetro_abdominal", "presion_sistolica", "presion_diastolica", "imc", "peso", "talla", "frecuencia_cardiaca", "saturacion_oxigeno", "porcentaje_grasa"] ------------------------------------------ df = create_lag_columns(df, 6, columns_to_lag) Thanks, Javier Rey