Hi, I am working at finding the root cause of a bug where rows in dataframes seem to have misaligned data. My dataframes have two types of columns: columns from data and columns from UDFs. I seem to be having trouble where for a given row, the row data doesn't match the data used to compute the UDF.
In my case, I am calculating click through rates, so the columns of interest are items sent to users and items clicked by users. Below is an example of the problem that I am encountering. Column Schema: clicked_items (Array of Long), sent_items (Array of Long), ctr (Double), str_items Data: [[1, 2], [1, 2, 3], 1, "[5] | [5]"] ctr_udf = UserDefinedFunction(lambda x, y: 1.0 * len(x) / len(y), DoubleType()) str_udf = UserDefinedFunction(lambda x, y: "{0} | {1}".format(str(x), str(y)), StringType()) My dataframe looks something like this: df.select(df.clicked_items, df.sent_items, ctr_udf(df.clicked_items, df.sent_items), str_udf(df.clicked_items, df.sent_items)) As you can see above, the row data doesn't match what was input to the UDF. To eliminate the possibility that there was an issue with UDFs, I tested the code below and it works fine: ______________________________________________ test_data_schema = StructType([ StructField('numerator', ArrayType(IntegerType())), StructField('denominator', ArrayType(IntegerType())), StructField('label', StringType()) ]) test_data = sc.parallelize([ (range(3), range(4), 'a'), ([], range(5), 'b'), ([], range(3), 'c'), (range(4), range(5), 'd'), (range(3), range(4), 'e'), (range(1), range(3), 'f') ], 3) td_df = sql.createDataFrame(test_data, test_data_schema) def compute_func(num, den, label): return 1.0 * len(num) / len(den) compute_udf = UserDefinedFunction(compute_func, DoubleType()) td_df.select(td_df.numerator, td_df.denominator, td_df.label, compute_udf(td_df.numerator, td_df.denominator, td_df.label)).take(10) _______________________________________________ Since the above works, I thought that this might be an issue with the data, not the dataframe/udf. To test this, I wrote the data to hdfs using parquet, then reread the data in. This fixed the issue, so I think it points to an issue with lineage/data pipeline/hdfs. Any thoughts on this would be great. My data roughly follows this flow 1. Read raw logs from HDFS 2. Transform raw logs into dataframe, register as temp table 3. Group data by unique identifier, reflatten the data with the identifier as a column and additional computed values, register as temp table 4. Perform a broadcast join into a column of the table 5. Regroup the table by the prior unique identifier plus a new one, then flatten once more (deduping records using the data from the broadcast join). 6. Do UDF calculation. My next step will be to try extending the example I wrote above to mimic the data flow that I have in hopes of reproducing the bug outside our codebase. Any intuition on next step would be great. Thanks, Pedro Rodriguez Trulia CU Boulder PhD Student -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Misaligned-Rows-with-UDF-tp23837.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org