Hi

I had a similar problem. For me, using the rdd stat counter helped a lot.
Check out
http://stackoverflow.com/questions/41169873/spark-dynamic-dag-is-a-lot-slower-and-different-from-hard-coded-dag
and
http://stackoverflow.com/questions/41445571/spark-migrate-sql-window-function-to-rdd-for-better-performance

I did not calculate a lag, but a percentage. Having had a brief look at
your code, you seem to get min max count for the column so my links should
help you making these calculations more efficient.

Regards Georg
Javier Rey <jre...@gmail.com> schrieb am So. 16. Apr. 2017 um 05:34:

> Hi guys,
>
> I have this situation:
>
> 1. Data frame with 22 columns
> 2. I need to add some columns (feature engineering) using existing
> columns, 12 columns will be add by each column in list.
> 3. I created a loop, but in the 5 item(col) on the loop this starts to go
> very slow in the join part, I can observe that the execution plan is
> getting bigger.
> 4. I tried to save to parquet by each iteration, but parquet is immutable,
> I got an error.
> 5. I really appreciate any help
>
> Here the code:
>
> def create_lag_columns(df, months, columns_to_lag):
>
>     columns_aggregate = []
>     data_with_period = df
>     w = Window().partitionBy("idpersona").orderBy("idpersona", "fecha")
>     for column_lag in columns_to_lag:
>
>         print("Calculating lag for column: " + column_lag)
>
>         # Create lag columns
>         for i in range(1,months + 1):
>             column_name_lag = column_lag + "_t_" + str(i)
>             data_with_period =
> data_with_period.withColumn(column_name_lag, lag(column_lag, i).over(w))
>             columns_aggregate.append(column_name_lag)
>
>         # Convert to long it's convenience to do aggregate operations
>         df_long = data_with_period.select('idpersona', "fecha",
>
> explode(array(columns_aggregate)).alias('values'))
>         # Aggregate operations
>         df_agg = (df_long.groupBy("idpersona", "fecha")
>                          .agg(F.min("values").alias("min_" + column_lag),
>                               F.sum("values").alias("sum_" + column_lag),
>                               F.max("values").alias("max_" + column_lag),
>                               F.avg("values").alias("avg_" + column_lag),
>                               F.count("values").alias("count_" +
> column_lag),
>                               F.stddev("values").alias("std_" +
> column_lag))
>                   )
>
>         # Merge with result
>         data_with_period = (data_with_period.join(df_agg, ['idpersona',
> "fecha"]))
>
>         # Set null for next loop
>         columns_aggregate = []
>
>     return data_with_period
>
> -----------------------------------------
>
> lag_columns = ["indice_charlson", "pam", "framingham", "tfg",
> "perimetro_abdominal",
>                    "presion_sistolica", "presion_diastolica", "imc",
> "peso", "talla",
>                    "frecuencia_cardiaca", "saturacion_oxigeno",
> "porcentaje_grasa"]
>
>
> ------------------------------------------
> df = create_lag_columns(df, 6, columns_to_lag)
>
> Thanks,
>
> Javier Rey
>
>
>

Reply via email to