Hi,

I am working at finding the root cause of a bug where rows in dataframes
seem to have misaligned data. My dataframes have two types of columns:
columns from data and columns from UDFs. I seem to be having trouble where
for a given row, the row data doesn't match the data used to compute the
UDF.

In my case, I am calculating click through rates, so the columns of interest
are items sent to users and items clicked by users. Below is an example of
the problem that I am encountering.

Column Schema: clicked_items (Array of Long), sent_items (Array of Long),
ctr (Double), str_items
Data: [[1, 2], [1, 2, 3], 1, "[5] | [5]"]
ctr_udf = UserDefinedFunction(lambda x, y: 1.0 * len(x) / len(y),
DoubleType())
str_udf = UserDefinedFunction(lambda x, y: "{0} | {1}".format(str(x),
str(y)), StringType())

My dataframe looks something like this:

df.select(df.clicked_items, df.sent_items, ctr_udf(df.clicked_items,
df.sent_items), str_udf(df.clicked_items, df.sent_items))

As you can see above, the row data doesn't match what was input to the UDF.

To eliminate the possibility that there was an issue with UDFs, I tested the
code below and it works fine:
______________________________________________
test_data_schema = StructType([
        StructField('numerator', ArrayType(IntegerType())),
        StructField('denominator', ArrayType(IntegerType())),
        StructField('label', StringType())
    ])
test_data = sc.parallelize([
        (range(3), range(4), 'a'),
        ([], range(5), 'b'),
        ([], range(3), 'c'),
        (range(4), range(5), 'd'),
        (range(3), range(4), 'e'),
        (range(1), range(3), 'f')
    ], 3)
td_df = sql.createDataFrame(test_data, test_data_schema)
def compute_func(num, den, label):
    return 1.0 * len(num) / len(den)
compute_udf = UserDefinedFunction(compute_func, DoubleType())
td_df.select(td_df.numerator, td_df.denominator, td_df.label,
compute_udf(td_df.numerator, td_df.denominator, td_df.label)).take(10)
_______________________________________________

Since the above works, I thought that this might be an issue with the data,
not the dataframe/udf. To test this, I wrote the data to hdfs using parquet,
then reread the data in. This fixed the issue, so I think it points to an
issue with lineage/data pipeline/hdfs. Any thoughts on this would be great.

My data roughly follows this flow
1. Read raw logs from HDFS
2. Transform raw logs into dataframe, register as temp table
3. Group data by unique identifier, reflatten the data with the identifier
as a column and additional computed values, register as temp table
4. Perform a broadcast join into a column of the table
5. Regroup the table by the prior unique identifier plus a new one, then
flatten once more (deduping records using the data from the broadcast join).
6. Do UDF calculation.

My next step will be to try extending the example I wrote above to mimic the
data flow that I have in hopes of reproducing the bug outside our codebase.
Any intuition on next step would be great.

Thanks,
Pedro Rodriguez
Trulia
CU Boulder PhD Student





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Misaligned-Rows-with-UDF-tp23837.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to