Hi guys,

I have this situation:

1. Data frame with 22 columns
2. I need to add some columns (feature engineering) using existing columns,
12 columns will be add by each column in list.
3. I created a loop, but in the 5 item(col) on the loop this starts to go
very slow in the join part, I can observe that the execution plan is
getting bigger.
4. I tried to save to parquet by each iteration, but parquet is immutable,
I got an error.
5. I really appreciate any help

Here the code:

def create_lag_columns(df, months, columns_to_lag):

    columns_aggregate = []
    data_with_period = df
    w = Window().partitionBy("idpersona").orderBy("idpersona", "fecha")
    for column_lag in columns_to_lag:

        print("Calculating lag for column: " + column_lag)

        # Create lag columns
        for i in range(1,months + 1):
            column_name_lag = column_lag + "_t_" + str(i)
            data_with_period = data_with_period.withColumn(column_name_lag,
lag(column_lag, i).over(w))
            columns_aggregate.append(column_name_lag)

        # Convert to long it's convenience to do aggregate operations
        df_long = data_with_period.select('idpersona', "fecha",

explode(array(columns_aggregate)).alias('values'))
        # Aggregate operations
        df_agg = (df_long.groupBy("idpersona", "fecha")
                         .agg(F.min("values").alias("min_" + column_lag),
                              F.sum("values").alias("sum_" + column_lag),
                              F.max("values").alias("max_" + column_lag),
                              F.avg("values").alias("avg_" + column_lag),
                              F.count("values").alias("count_" +
column_lag),
                              F.stddev("values").alias("std_" + column_lag))
                  )

        # Merge with result
        data_with_period = (data_with_period.join(df_agg, ['idpersona',
"fecha"]))

        # Set null for next loop
        columns_aggregate = []

    return data_with_period

-----------------------------------------

lag_columns = ["indice_charlson", "pam", "framingham", "tfg",
"perimetro_abdominal",
                   "presion_sistolica", "presion_diastolica", "imc",
"peso", "talla",
                   "frecuencia_cardiaca", "saturacion_oxigeno",
"porcentaje_grasa"]


------------------------------------------
df = create_lag_columns(df, 6, columns_to_lag)

Thanks,

Javier Rey

Reply via email to