Hi I had a similar problem. For me, using the rdd stat counter helped a lot. Check out http://stackoverflow.com/questions/41169873/spark-dynamic-dag-is-a-lot-slower-and-different-from-hard-coded-dag and http://stackoverflow.com/questions/41445571/spark-migrate-sql-window-function-to-rdd-for-better-performance
I did not calculate a lag, but a percentage. Having had a brief look at your code, you seem to get min max count for the column so my links should help you making these calculations more efficient. Regards Georg Javier Rey <jre...@gmail.com> schrieb am So. 16. Apr. 2017 um 05:34: > Hi guys, > > I have this situation: > > 1. Data frame with 22 columns > 2. I need to add some columns (feature engineering) using existing > columns, 12 columns will be add by each column in list. > 3. I created a loop, but in the 5 item(col) on the loop this starts to go > very slow in the join part, I can observe that the execution plan is > getting bigger. > 4. I tried to save to parquet by each iteration, but parquet is immutable, > I got an error. > 5. I really appreciate any help > > Here the code: > > def create_lag_columns(df, months, columns_to_lag): > > columns_aggregate = [] > data_with_period = df > w = Window().partitionBy("idpersona").orderBy("idpersona", "fecha") > for column_lag in columns_to_lag: > > print("Calculating lag for column: " + column_lag) > > # Create lag columns > for i in range(1,months + 1): > column_name_lag = column_lag + "_t_" + str(i) > data_with_period = > data_with_period.withColumn(column_name_lag, lag(column_lag, i).over(w)) > columns_aggregate.append(column_name_lag) > > # Convert to long it's convenience to do aggregate operations > df_long = data_with_period.select('idpersona', "fecha", > > explode(array(columns_aggregate)).alias('values')) > # Aggregate operations > df_agg = (df_long.groupBy("idpersona", "fecha") > .agg(F.min("values").alias("min_" + column_lag), > F.sum("values").alias("sum_" + column_lag), > F.max("values").alias("max_" + column_lag), > F.avg("values").alias("avg_" + column_lag), > F.count("values").alias("count_" + > column_lag), > F.stddev("values").alias("std_" + > column_lag)) > ) > > # Merge with result > data_with_period = (data_with_period.join(df_agg, ['idpersona', > "fecha"])) > > # Set null for next loop > columns_aggregate = [] > > return data_with_period > > ----------------------------------------- > > lag_columns = ["indice_charlson", "pam", "framingham", "tfg", > "perimetro_abdominal", > "presion_sistolica", "presion_diastolica", "imc", > "peso", "talla", > "frecuencia_cardiaca", "saturacion_oxigeno", > "porcentaje_grasa"] > > > ------------------------------------------ > df = create_lag_columns(df, 6, columns_to_lag) > > Thanks, > > Javier Rey > > >